add uart demo
authorJacob Lifshay <programmerjake@gmail.com>
Thu, 21 Apr 2022 04:32:36 +0000 (21:32 -0700)
committerJacob Lifshay <programmerjake@gmail.com>
Thu, 21 Apr 2022 04:32:36 +0000 (21:32 -0700)
.gitignore
.gitlab-ci.yml
uart_demo.py [new file with mode: 0755]

index a34da978334d33321e5c7c9e803b70dce9f9018a..fb99fe6493dea2edd5b0aa3be94cdcd3893a08bd 100644 (file)
@@ -1,2 +1,3 @@
 .gitlab-runner-ccache
 .vscode
+sim_test_out
\ No newline at end of file
index e9e2b413e777b8c8c2886cc2001d4071a5a00572..ea7feca01c8d3b42a33bd1892d382bca4e898d9a 100644 (file)
@@ -106,4 +106,9 @@ build:
         - export PATH=/usr/local/nextpnr-xilinx/bin:$PATH
         - export XRAY_DIR=/usr/local/nextpnr-xilinx
 
+        # run unit tests
+        - python3 uart_demo.py
+
+        # program fpga
         - python3 blinky.py
+        - python3 uart_demo.py program
diff --git a/uart_demo.py b/uart_demo.py
new file mode 100755 (executable)
index 0000000..89c40e7
--- /dev/null
@@ -0,0 +1,271 @@
+#!/usr/bin/env python3
+import itertools
+import sys
+import unittest
+from nmigen_boards.arty_a7 import ArtyA7_100Platform
+from nmigen.hdl.dsl import Elaboratable, Module
+from nmigen.hdl.ast import Signal, Array, Const
+from nmigen.hdl.mem import Memory
+from nmigen.sim import Tick
+from nmigen.build import ResourceError
+from nmutil.sim_util import do_sim
+import enum
+
+
+def get_all_resources(platform, name):
+    if platform is None:
+        # simulating
+        return []
+    resources = []
+    for number in itertools.count():
+        try:
+            resources.append(platform.request(name, number))
+        except ResourceError:
+            break
+    return resources
+
+
+SIM_CLOCK_FREQ = 1e6
+
+
+class TickGenerator(Elaboratable):
+    """Generates a tick exactly `rate` times per second"""
+
+    def __init__(self, rate):
+        assert isinstance(rate, int) and rate > 0, "unsupported rate"
+        self.rate = rate
+        self.tick = Signal()
+
+    def elaborate(self, platform):
+        m = Module()
+        if platform is None:
+            orig_clk_freq = SIM_CLOCK_FREQ
+        else:
+            orig_clk_freq = platform.default_clk_frequency
+        clk_freq = int(orig_clk_freq)
+        assert clk_freq == orig_clk_freq, \
+            "non-integer clock frequencies are unsupported"
+        assert self.rate <= clk_freq, \
+            "rate can't be higher than the clock frequency"
+        counter = Signal(range(clk_freq))
+        next_count = Signal(range(clk_freq))
+        m.d.sync += counter.eq(next_count)
+        underflow = Signal()
+        m.d.comb += underflow.eq(counter < self.rate)
+        m.d.sync += self.tick.eq(underflow)
+        with m.If(underflow):
+            m.d.comb += next_count.eq(counter + (clk_freq - self.rate))
+        with m.Else():
+            m.d.comb += next_count.eq(counter - self.rate)
+        return m
+
+
+class SimpleUART(Elaboratable):
+    """Simple transmit-only UART"""
+
+    def __init__(self, baud_rate=9600):
+        self.__tick_gen = TickGenerator(baud_rate)
+        self.data_in = Signal(8)
+        self.data_in_valid = Signal()
+        self.data_in_ready = Signal()
+        self.tx = Signal(reset=1)
+
+    @property
+    def baud_rate(self):
+        return self.__tick_gen.rate
+
+    def elaborate(self, platform):
+        m = Module()
+        m.submodules.tick_gen = self.__tick_gen
+        data = Signal.like(self.data_in)
+        data_full = Signal(reset=0)
+        m.d.comb += self.data_in_ready.eq(~data_full)
+        with m.If(self.data_in_ready & self.data_in_valid):
+            m.d.sync += data.eq(self.data_in)
+            m.d.sync += data_full.eq(True)
+
+        tx_sequence = [Const(0, 1), *data, Const(1, 1)]
+        current_bit_num = Signal(range(len(tx_sequence)),
+                                 reset=0)
+        with m.If(self.__tick_gen.tick & data_full):
+            m.d.sync += self.tx.eq(Array(tx_sequence)[current_bit_num])
+            with m.If(current_bit_num == len(tx_sequence) - 1):
+                m.d.sync += [
+                    current_bit_num.eq(0),
+                    data_full.eq(False),
+                ]
+            with m.Else():
+                m.d.sync += current_bit_num.eq(current_bit_num + 1)
+        return m
+
+
+class UartDemo(Elaboratable):
+    def __init__(self, text):
+        self.simple_uart = SimpleUART()
+        self.text = str(text)
+        self.text_bytes = list(self.text.encode())
+
+    def elaborate(self, platform):
+        m = Module()
+        m.submodules.simple_uart = self.simple_uart
+
+        for uart in get_all_resources(platform, "uart"):
+            m.d.comb += uart.tx.o.eq(self.simple_uart.tx)
+
+        text_rom = Memory(width=8, depth=len(self.text_bytes),
+                          init=self.text_bytes)
+        text_read = text_rom.read_port()
+        m.submodules.text_read = text_read
+        addr = Signal(range(len(self.text_bytes)), reset=0)
+        valid = Signal(reset=0)
+        m.d.comb += [
+            text_read.addr.eq(addr),
+            self.simple_uart.data_in.eq(text_read.data),
+            self.simple_uart.data_in_valid.eq(valid),
+        ]
+
+        with m.If(self.simple_uart.data_in_ready
+                  & self.simple_uart.data_in_valid):
+            with m.If(addr == len(self.text_bytes) - 1):
+                m.d.sync += addr.eq(0)
+            with m.Else():
+                m.d.sync += addr.eq(addr + 1)
+            m.d.sync += valid.eq(0)  # wait for it to propagate through memory
+        with m.Else():
+            m.d.sync += valid.eq(1)
+
+        return m
+
+
+class TestUartDemo(unittest.TestCase):
+    def test_uart_demo(self):
+        class ExpectedState(enum.Enum):
+            DATA0 = 0
+            DATA1 = 1
+            DATA2 = 2
+            DATA3 = 3
+            DATA4 = 4
+            DATA5 = 5
+            DATA6 = 6
+            DATA7 = 7
+            START = enum.auto()
+            STOP = enum.auto()
+
+        m = Module()
+        dut = UartDemo("test text")
+        sample_event = Signal()
+        expected_state = Signal(ExpectedState)
+        m.submodules.dut = dut
+        with do_sim(self, m, [
+            dut.simple_uart.tx,
+            sample_event,
+            expected_state,
+        ]) as sim:
+            expected_bit_tick_count = round(
+                SIM_CLOCK_FREQ / dut.simple_uart.baud_rate)
+
+            def read_bit(is_initial=False):
+                yield sample_event.eq(1)
+                start_value = yield dut.simple_uart.tx
+                transition = None
+                for i in range(expected_bit_tick_count):
+                    yield Tick()
+                    yield sample_event.eq(0)
+                    value = yield dut.simple_uart.tx
+                    if value != start_value:
+                        transition = i
+                        break
+                if transition is not None:
+                    delta = expected_bit_tick_count if is_initial else 1
+                    self.assertAlmostEqual(
+                        transition, expected_bit_tick_count / 2, delta=delta)
+                    for i in range(expected_bit_tick_count // 2):
+                        yield Tick()
+                        yield sample_event.eq(0)
+                        value = yield dut.simple_uart.tx
+                        self.assertNotEqual(value, start_value,
+                                            "two transitions in one bit time")
+                return start_value
+
+            def process():
+                yield expected_state.eq(ExpectedState.START)
+                start_bit = yield from read_bit(True)
+                for i in range(3):
+                    if start_bit == 0:
+                        break
+                    start_bit = yield from read_bit(True)
+                for i in range(3):
+                    for expected_byte in dut.text_bytes:
+                        with self.subTest(i=i,
+                                          expected_byte=hex(expected_byte)):
+                            self.assertEqual(start_bit, 0, "missing start bit")
+                            for bit_index in range(8):
+                                with self.subTest(bit_index=bit_index):
+                                    yield expected_state.eq(
+                                        ExpectedState(bit_index))
+                                    data_bit = yield from read_bit()
+                                    expected = (expected_byte >> bit_index) & 1
+                                    self.assertEqual(data_bit, expected,
+                                                     "wrong data bit")
+                            yield expected_state.eq(ExpectedState.STOP)
+                            stop_bit = yield from read_bit()
+                            self.assertEqual(stop_bit, 1, "missing stop bit")
+                            yield expected_state.eq(ExpectedState.START)
+                            start_bit = yield from read_bit()
+
+            sim.add_process(process)
+            sim.add_clock(1 / SIM_CLOCK_FREQ)
+            sim.run()
+
+
+def build(platform, do_program):
+    platform.build(UartDemo("Hello World!\n"), do_program=do_program)
+
+
+PLATFORMS = {
+    "ArtyA7_100": ArtyA7_100Platform,
+    # TODO: add more
+}
+
+DEFAULT_PLATFORM = next(iter(PLATFORMS.keys()))
+DEFAULT_TOOLCHAIN = "yosys_nextpnr"
+DEFAULT_TEXT = "Hello World!\n"
+
+PLATFORMS_TEXT = '\n'.join(PLATFORMS.keys())
+HELP_TEXT = f"""
+usage: {sys.argv[0]} program|build [<platform> [<toolchain> [<text>]]]
+
+generate a FPGA bitstream. If `program` is specified, also program that
+bitstream to the FPGA plugged into this computer.
+
+<platform> the FPGA platform. Defaults to {DEFAULT_PLATFORM}.
+<toolchain> the toolchain used. Defaults to {DEFAULT_TOOLCHAIN}.
+<text> the text that will be repeatedly sent out all FPGA uarts.
+Defaults to {DEFAULT_TEXT!r}.
+
+Supported FPGA platforms:
+{PLATFORMS_TEXT}
+
+unittest usage:
+""".lstrip()
+
+if __name__ == "__main__":
+    if "-h" in sys.argv or "--help" in sys.argv:
+        print(HELP_TEXT)
+        unittest.main()  # get unittest's help too
+    elif 1 < len(sys.argv) and (sys.argv[1] == "build"
+                                or sys.argv[1] == "program"):
+        platform_str = sys.argv[2] if 2 < len(sys.argv) else DEFAULT_PLATFORM
+        assert platform_str in PLATFORMS, (
+            f"unsupported platform {platform_str}:\n"
+            f"valid platforms: {list(PLATFORMS.keys())}")
+        platform_cls = PLATFORMS[platform_str]
+        toolchain = sys.argv[3] if 3 < len(sys.argv) else DEFAULT_TOOLCHAIN
+        text = sys.argv[4] if 4 < len(sys.argv) else DEFAULT_TEXT
+        assert text != "", "empty text not supported"
+        platform = platform_cls(toolchain=toolchain)
+        top = UartDemo(text)
+        platform.build(top,
+                       do_program=sys.argv[1] == "program")
+    else:
+        unittest.main()