--- /dev/null
+#!/usr/bin/env python3
+import itertools
+import sys
+import unittest
+from nmigen_boards.arty_a7 import ArtyA7_100Platform
+from nmigen.hdl.dsl import Elaboratable, Module
+from nmigen.hdl.ast import Signal, Array, Const
+from nmigen.hdl.mem import Memory
+from nmigen.sim import Tick
+from nmigen.build import ResourceError
+from nmutil.sim_util import do_sim
+import enum
+
+
+def get_all_resources(platform, name):
+ if platform is None:
+ # simulating
+ return []
+ resources = []
+ for number in itertools.count():
+ try:
+ resources.append(platform.request(name, number))
+ except ResourceError:
+ break
+ return resources
+
+
+SIM_CLOCK_FREQ = 1e6
+
+
+class TickGenerator(Elaboratable):
+ """Generates a tick exactly `rate` times per second"""
+
+ def __init__(self, rate):
+ assert isinstance(rate, int) and rate > 0, "unsupported rate"
+ self.rate = rate
+ self.tick = Signal()
+
+ def elaborate(self, platform):
+ m = Module()
+ if platform is None:
+ orig_clk_freq = SIM_CLOCK_FREQ
+ else:
+ orig_clk_freq = platform.default_clk_frequency
+ clk_freq = int(orig_clk_freq)
+ assert clk_freq == orig_clk_freq, \
+ "non-integer clock frequencies are unsupported"
+ assert self.rate <= clk_freq, \
+ "rate can't be higher than the clock frequency"
+ counter = Signal(range(clk_freq))
+ next_count = Signal(range(clk_freq))
+ m.d.sync += counter.eq(next_count)
+ underflow = Signal()
+ m.d.comb += underflow.eq(counter < self.rate)
+ m.d.sync += self.tick.eq(underflow)
+ with m.If(underflow):
+ m.d.comb += next_count.eq(counter + (clk_freq - self.rate))
+ with m.Else():
+ m.d.comb += next_count.eq(counter - self.rate)
+ return m
+
+
+class SimpleUART(Elaboratable):
+ """Simple transmit-only UART"""
+
+ def __init__(self, baud_rate=9600):
+ self.__tick_gen = TickGenerator(baud_rate)
+ self.data_in = Signal(8)
+ self.data_in_valid = Signal()
+ self.data_in_ready = Signal()
+ self.tx = Signal(reset=1)
+
+ @property
+ def baud_rate(self):
+ return self.__tick_gen.rate
+
+ def elaborate(self, platform):
+ m = Module()
+ m.submodules.tick_gen = self.__tick_gen
+ data = Signal.like(self.data_in)
+ data_full = Signal(reset=0)
+ m.d.comb += self.data_in_ready.eq(~data_full)
+ with m.If(self.data_in_ready & self.data_in_valid):
+ m.d.sync += data.eq(self.data_in)
+ m.d.sync += data_full.eq(True)
+
+ tx_sequence = [Const(0, 1), *data, Const(1, 1)]
+ current_bit_num = Signal(range(len(tx_sequence)),
+ reset=0)
+ with m.If(self.__tick_gen.tick & data_full):
+ m.d.sync += self.tx.eq(Array(tx_sequence)[current_bit_num])
+ with m.If(current_bit_num == len(tx_sequence) - 1):
+ m.d.sync += [
+ current_bit_num.eq(0),
+ data_full.eq(False),
+ ]
+ with m.Else():
+ m.d.sync += current_bit_num.eq(current_bit_num + 1)
+ return m
+
+
+class UartDemo(Elaboratable):
+ def __init__(self, text):
+ self.simple_uart = SimpleUART()
+ self.text = str(text)
+ self.text_bytes = list(self.text.encode())
+
+ def elaborate(self, platform):
+ m = Module()
+ m.submodules.simple_uart = self.simple_uart
+
+ for uart in get_all_resources(platform, "uart"):
+ m.d.comb += uart.tx.o.eq(self.simple_uart.tx)
+
+ text_rom = Memory(width=8, depth=len(self.text_bytes),
+ init=self.text_bytes)
+ text_read = text_rom.read_port()
+ m.submodules.text_read = text_read
+ addr = Signal(range(len(self.text_bytes)), reset=0)
+ valid = Signal(reset=0)
+ m.d.comb += [
+ text_read.addr.eq(addr),
+ self.simple_uart.data_in.eq(text_read.data),
+ self.simple_uart.data_in_valid.eq(valid),
+ ]
+
+ with m.If(self.simple_uart.data_in_ready
+ & self.simple_uart.data_in_valid):
+ with m.If(addr == len(self.text_bytes) - 1):
+ m.d.sync += addr.eq(0)
+ with m.Else():
+ m.d.sync += addr.eq(addr + 1)
+ m.d.sync += valid.eq(0) # wait for it to propagate through memory
+ with m.Else():
+ m.d.sync += valid.eq(1)
+
+ return m
+
+
+class TestUartDemo(unittest.TestCase):
+ def test_uart_demo(self):
+ class ExpectedState(enum.Enum):
+ DATA0 = 0
+ DATA1 = 1
+ DATA2 = 2
+ DATA3 = 3
+ DATA4 = 4
+ DATA5 = 5
+ DATA6 = 6
+ DATA7 = 7
+ START = enum.auto()
+ STOP = enum.auto()
+
+ m = Module()
+ dut = UartDemo("test text")
+ sample_event = Signal()
+ expected_state = Signal(ExpectedState)
+ m.submodules.dut = dut
+ with do_sim(self, m, [
+ dut.simple_uart.tx,
+ sample_event,
+ expected_state,
+ ]) as sim:
+ expected_bit_tick_count = round(
+ SIM_CLOCK_FREQ / dut.simple_uart.baud_rate)
+
+ def read_bit(is_initial=False):
+ yield sample_event.eq(1)
+ start_value = yield dut.simple_uart.tx
+ transition = None
+ for i in range(expected_bit_tick_count):
+ yield Tick()
+ yield sample_event.eq(0)
+ value = yield dut.simple_uart.tx
+ if value != start_value:
+ transition = i
+ break
+ if transition is not None:
+ delta = expected_bit_tick_count if is_initial else 1
+ self.assertAlmostEqual(
+ transition, expected_bit_tick_count / 2, delta=delta)
+ for i in range(expected_bit_tick_count // 2):
+ yield Tick()
+ yield sample_event.eq(0)
+ value = yield dut.simple_uart.tx
+ self.assertNotEqual(value, start_value,
+ "two transitions in one bit time")
+ return start_value
+
+ def process():
+ yield expected_state.eq(ExpectedState.START)
+ start_bit = yield from read_bit(True)
+ for i in range(3):
+ if start_bit == 0:
+ break
+ start_bit = yield from read_bit(True)
+ for i in range(3):
+ for expected_byte in dut.text_bytes:
+ with self.subTest(i=i,
+ expected_byte=hex(expected_byte)):
+ self.assertEqual(start_bit, 0, "missing start bit")
+ for bit_index in range(8):
+ with self.subTest(bit_index=bit_index):
+ yield expected_state.eq(
+ ExpectedState(bit_index))
+ data_bit = yield from read_bit()
+ expected = (expected_byte >> bit_index) & 1
+ self.assertEqual(data_bit, expected,
+ "wrong data bit")
+ yield expected_state.eq(ExpectedState.STOP)
+ stop_bit = yield from read_bit()
+ self.assertEqual(stop_bit, 1, "missing stop bit")
+ yield expected_state.eq(ExpectedState.START)
+ start_bit = yield from read_bit()
+
+ sim.add_process(process)
+ sim.add_clock(1 / SIM_CLOCK_FREQ)
+ sim.run()
+
+
+def build(platform, do_program):
+ platform.build(UartDemo("Hello World!\n"), do_program=do_program)
+
+
+PLATFORMS = {
+ "ArtyA7_100": ArtyA7_100Platform,
+ # TODO: add more
+}
+
+DEFAULT_PLATFORM = next(iter(PLATFORMS.keys()))
+DEFAULT_TOOLCHAIN = "yosys_nextpnr"
+DEFAULT_TEXT = "Hello World!\n"
+
+PLATFORMS_TEXT = '\n'.join(PLATFORMS.keys())
+HELP_TEXT = f"""
+usage: {sys.argv[0]} program|build [<platform> [<toolchain> [<text>]]]
+
+generate a FPGA bitstream. If `program` is specified, also program that
+bitstream to the FPGA plugged into this computer.
+
+<platform> the FPGA platform. Defaults to {DEFAULT_PLATFORM}.
+<toolchain> the toolchain used. Defaults to {DEFAULT_TOOLCHAIN}.
+<text> the text that will be repeatedly sent out all FPGA uarts.
+Defaults to {DEFAULT_TEXT!r}.
+
+Supported FPGA platforms:
+{PLATFORMS_TEXT}
+
+unittest usage:
+""".lstrip()
+
+if __name__ == "__main__":
+ if "-h" in sys.argv or "--help" in sys.argv:
+ print(HELP_TEXT)
+ unittest.main() # get unittest's help too
+ elif 1 < len(sys.argv) and (sys.argv[1] == "build"
+ or sys.argv[1] == "program"):
+ platform_str = sys.argv[2] if 2 < len(sys.argv) else DEFAULT_PLATFORM
+ assert platform_str in PLATFORMS, (
+ f"unsupported platform {platform_str}:\n"
+ f"valid platforms: {list(PLATFORMS.keys())}")
+ platform_cls = PLATFORMS[platform_str]
+ toolchain = sys.argv[3] if 3 < len(sys.argv) else DEFAULT_TOOLCHAIN
+ text = sys.argv[4] if 4 < len(sys.argv) else DEFAULT_TEXT
+ assert text != "", "empty text not supported"
+ platform = platform_cls(toolchain=toolchain)
+ top = UartDemo(text)
+ platform.build(top,
+ do_program=sys.argv[1] == "program")
+ else:
+ unittest.main()