hdl.mem: add simulation model for memory.
authorwhitequark <whitequark@whitequark.org>
Fri, 21 Dec 2018 11:00:42 +0000 (11:00 +0000)
committerwhitequark <whitequark@whitequark.org>
Fri, 21 Dec 2018 11:54:32 +0000 (11:54 +0000)
nmigen/hdl/mem.py
nmigen/test/test_sim.py

index c8518dbca806904d35848bfe8f4ee4b8b80b8a51..f60cdcea3ebd927238bbbb0faea57e0699daedd3 100644 (file)
@@ -22,16 +22,21 @@ class Memory:
                 name = tracer.get_var_name(depth=2)
             except tracer.NameNotFound:
                 name = "$memory"
-        self.name  = name
+        self.name = name
 
         self.width = width
         self.depth = depth
-        self.init  = None if init is None else list(init)
 
-        if self.init is not None and len(self.init) > self.depth:
+        self.init = [] if init is None else list(init)
+        if len(self.init) > self.depth:
             raise ValueError("Memory initialization value count exceed memory depth ({} > {})"
                              .format(len(self.init), self.depth))
 
+        # Array of signals for simulation.
+        self._array = Array()
+        for addr, data in enumerate(self.init + [0 for _ in range(self.depth - len(self.init))]):
+            self._array.append(Signal(self.width, reset=data, name="{}[{}]".format(name, addr)))
+
     def read_port(self, domain="sync", synchronous=True, transparent=True):
         if not synchronous and not transparent:
             raise ValueError("Read port cannot be simultaneously asynchronous and non-transparent")
@@ -51,6 +56,10 @@ class Memory:
             raise ValueError("Write port granularity must divide memory width evenly")
         return WritePort(self, domain, priority, granularity)
 
+    def __getitem__(self, index):
+        """Simulation only."""
+        return self._array[index]
+
 
 class ReadPort:
     def __init__(self, memory, domain, synchronous, transparent):
@@ -67,7 +76,7 @@ class ReadPort:
             self.en = Const(1)
 
     def get_fragment(self, platform):
-        return Instance("$memrd",
+        f = Instance("$memrd",
             p_MEMID=self.memory,
             p_ABITS=self.addr.nbits,
             p_WIDTH=self.data.nbits,
@@ -79,6 +88,26 @@ class ReadPort:
             i_ADDR=self.addr,
             o_DATA=self.data,
         )
+        read_data = self.data.eq(self.memory._array[self.addr])
+        if self.synchronous and not self.transparent:
+            # Synchronous, read-before-write port
+            f.add_statements(Switch(self.en, { 1: read_data }))
+            f.add_driver(self.data, self.domain)
+        elif self.synchronous:
+            # Synchronous, write-through port
+            # This model is a bit unconventional. We model transparent ports as asynchronous ports
+            # that are latched when the clock is high. This isn't exactly correct, but it is very
+            # close to the correct behavior of a transparent port, and the difference should only
+            # be observable in pathological cases of clock gating.
+            f.add_statements(Switch(ClockSignal(self.domain),
+                { 1: self.data.eq(self.data), 0: read_data }))
+            f.add_driver(self.data)
+        else:
+            # Asynchronous port
+            f.add_statements(read_data)
+            f.add_driver(self.data)
+        return f
+
 
 class WritePort:
     def __init__(self, memory, domain, priority, granularity):
@@ -92,7 +121,7 @@ class WritePort:
         self.en   = Signal(memory.width // granularity)
 
     def get_fragment(self, platform):
-        return Instance("$memwr",
+        f = Instance("$memwr",
             p_MEMID=self.memory,
             p_ABITS=self.addr.nbits,
             p_WIDTH=self.data.nbits,
@@ -104,3 +133,15 @@ class WritePort:
             i_ADDR=self.addr,
             i_DATA=self.data,
         )
+        if len(self.en) > 1:
+            for index, en_bit in enumerate(self.en):
+                offset = index * self.granularity
+                bits   = slice(offset, offset + self.granularity)
+                write_data = self.memory._array[self.addr][bits].eq(self.data[bits])
+                f.add_statements(Switch(en_bit, { 1: write_data }))
+        else:
+            write_data = self.memory._array[self.addr].eq(self.data)
+            f.add_statements(Switch(self.en, { 1: write_data }))
+        for signal in self.memory._array:
+            f.add_driver(signal, self.domain)
+        return f
index f91c932f9d0d0469277de96296e4e1a2d45e1811..963652a5d24658b1c2365f227b3fe05c1000fdb6 100644 (file)
@@ -4,6 +4,7 @@ from .tools import *
 from ..tools import flatten, union
 from ..hdl.ast import *
 from ..hdl.cd import  *
+from ..hdl.mem import *
 from ..hdl.dsl import  *
 from ..hdl.ir import *
 from ..back.pysim import *
@@ -390,3 +391,113 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
                     yield 1
                 yield Delay()
             sim.add_process(process)
+
+    def setUp_memory(self, rd_synchronous=True, rd_transparent=True, wr_granularity=None):
+        self.m = Module()
+        self.memory = Memory(width=8, depth=4, init=[0xaa, 0x55])
+        self.m.submodules.rdport = self.rdport = \
+            self.memory.read_port(synchronous=rd_synchronous, transparent=rd_transparent)
+        self.m.submodules.wrport = self.wrport = \
+            self.memory.write_port(granularity=wr_granularity)
+
+    def test_memory_init(self):
+        self.setUp_memory()
+        with self.assertSimulation(self.m) as sim:
+            def process():
+                yield
+                self.assertEqual((yield self.rdport.data), 0xaa)
+                yield self.rdport.addr.eq(1)
+                yield
+                self.assertEqual((yield self.rdport.data), 0x55)
+                yield self.rdport.addr.eq(2)
+                yield
+                self.assertEqual((yield self.rdport.data), 0x00)
+            sim.add_clock(1e-6)
+            sim.add_sync_process(process)
+
+    def test_memory_write(self):
+        self.setUp_memory()
+        with self.assertSimulation(self.m) as sim:
+            def process():
+                yield self.wrport.addr.eq(4)
+                yield self.wrport.data.eq(0x33)
+                yield self.wrport.en.eq(1)
+                yield
+                yield self.wrport.en.eq(0)
+                yield self.rdport.addr.eq(4)
+                yield
+                self.assertEqual((yield self.rdport.data), 0x33)
+            sim.add_clock(1e-6)
+            sim.add_sync_process(process)
+
+    def test_memory_write_granularity(self):
+        self.setUp_memory(wr_granularity=4)
+        with self.assertSimulation(self.m) as sim:
+            def process():
+                yield self.wrport.data.eq(0x50)
+                yield self.wrport.en.eq(0b00)
+                yield
+                yield self.wrport.en.eq(0)
+                yield
+                self.assertEqual((yield self.rdport.data), 0xaa)
+                yield self.wrport.en.eq(0b10)
+                yield
+                yield self.wrport.en.eq(0)
+                yield
+                self.assertEqual((yield self.rdport.data), 0x5a)
+                yield self.wrport.data.eq(0x33)
+                yield self.wrport.en.eq(0b01)
+                yield
+                yield self.wrport.en.eq(0)
+                yield
+                self.assertEqual((yield self.rdport.data), 0x53)
+            sim.add_clock(1e-6)
+            sim.add_sync_process(process)
+
+    def test_memory_read_before_write(self):
+        self.setUp_memory(rd_transparent=False)
+        with self.assertSimulation(self.m) as sim:
+            def process():
+                yield self.wrport.data.eq(0x33)
+                yield self.wrport.en.eq(1)
+                yield self.rdport.en.eq(1)
+                yield
+                self.assertEqual((yield self.rdport.data), 0xaa)
+                yield Delay(1e-6) # let comb propagate
+                self.assertEqual((yield self.rdport.data), 0xaa)
+            sim.add_clock(1e-6)
+            sim.add_sync_process(process)
+
+    def test_memory_write_through(self):
+        self.setUp_memory(rd_transparent=True)
+        with self.assertSimulation(self.m) as sim:
+            def process():
+                yield self.wrport.data.eq(0x33)
+                yield self.wrport.en.eq(1)
+                yield
+                self.assertEqual((yield self.rdport.data), 0xaa)
+                yield Delay(1e-6) # let comb propagate
+                self.assertEqual((yield self.rdport.data), 0x33)
+            sim.add_clock(1e-6)
+            sim.add_sync_process(process)
+
+    def test_memory_async_read_write(self):
+        self.setUp_memory(rd_synchronous=False)
+        with self.assertSimulation(self.m) as sim:
+            def process():
+                yield self.rdport.addr.eq(0)
+                yield Delay()
+                self.assertEqual((yield self.rdport.data), 0xaa)
+                yield self.rdport.addr.eq(1)
+                yield Delay()
+                self.assertEqual((yield self.rdport.data), 0x55)
+                yield self.rdport.addr.eq(0)
+                yield self.wrport.addr.eq(0)
+                yield self.wrport.data.eq(0x33)
+                yield self.wrport.en.eq(1)
+                yield Tick("sync")
+                self.assertEqual((yield self.rdport.data), 0xaa)
+                yield Delay(1e-6) # let comb propagate
+                self.assertEqual((yield self.rdport.data), 0x33)
+            sim.add_clock(1e-6)
+            sim.add_process(process)