hdl.ast: implement abs() on values.
[nmigen.git] / nmigen / test / test_sim.py
index d5eac8a9f40e28c27b15e7c24b59cb1a1fcd0f5c..7996bec45bdef95b6811b57c4f6fda5d49cf8dd3 100644 (file)
@@ -1,7 +1,8 @@
+import os
 from contextlib import contextmanager
 
-from .tools import *
-from ..tools import flatten, union
+from .utils import *
+from .._utils import flatten, union
 from ..hdl.ast import *
 from ..hdl.cd import  *
 from ..hdl.mem import *
@@ -25,16 +26,14 @@ class SimulatorUnitTestCase(FHDLTestCase):
         for signal in flatten(s._lhs_signals() for s in Statement.cast(stmt)):
             frag.add_driver(signal)
 
-        with Simulator(frag,
-                vcd_file =open("test.vcd",  "w"),
-                gtkw_file=open("test.gtkw", "w"),
-                traces=[*isigs, osig]) as sim:
-            def process():
-                for isig, input in zip(isigs, inputs):
-                    yield isig.eq(input)
-                yield Delay()
-                self.assertEqual((yield osig), output.value)
-            sim.add_process(process)
+        sim = Simulator(frag)
+        def process():
+            for isig, input in zip(isigs, inputs):
+                yield isig.eq(input)
+            yield Settle()
+            self.assertEqual((yield osig), output.value)
+        sim.add_process(process)
+        with sim.write_vcd("test.vcd", "test.gtkw", traces=[*isigs, osig]):
             sim.run()
 
     def test_invert(self):
@@ -57,6 +56,16 @@ class SimulatorUnitTestCase(FHDLTestCase):
         self.assertStatement(stmt, [C(1, 4)], C(1))
         self.assertStatement(stmt, [C(2, 4)], C(1))
 
+    def test_as_unsigned(self):
+        stmt = lambda y, a, b: y.eq(a.as_unsigned() == b)
+        self.assertStatement(stmt, [C(0b01, signed(2)), C(0b0001, unsigned(4))], C(1))
+        self.assertStatement(stmt, [C(0b11, signed(2)), C(0b0011, unsigned(4))], C(1))
+
+    def test_as_signed(self):
+        stmt = lambda y, a, b: y.eq(a.as_signed() == b)
+        self.assertStatement(stmt, [C(0b01, unsigned(2)), C(0b0001, signed(4))], C(1))
+        self.assertStatement(stmt, [C(0b11, unsigned(2)), C(0b1111, signed(4))], C(1))
+
     def test_any(self):
         stmt = lambda y, a: y.eq(a.any())
         self.assertStatement(stmt, [C(0b00, 2)], C(0))
@@ -101,6 +110,13 @@ class SimulatorUnitTestCase(FHDLTestCase):
         self.assertStatement(stmt, [C(2,  4), C(2,  4)], C(1,   8))
         self.assertStatement(stmt, [C(7,  4), C(2,  4)], C(3,   8))
 
+    def test_mod(self):
+        stmt = lambda y, a, b: y.eq(a % b)
+        self.assertStatement(stmt, [C(2,  4), C(0,  4)], C(0,   8))
+        self.assertStatement(stmt, [C(2,  4), C(1,  4)], C(0,   8))
+        self.assertStatement(stmt, [C(2,  4), C(2,  4)], C(0,   8))
+        self.assertStatement(stmt, [C(7,  4), C(2,  4)], C(1,   8))
+
     def test_and(self):
         stmt = lambda y, a, b: y.eq(a & b)
         self.assertStatement(stmt, [C(0b1100, 4), C(0b1010, 4)], C(0b1000, 4))
@@ -117,13 +133,11 @@ class SimulatorUnitTestCase(FHDLTestCase):
         stmt = lambda y, a, b: y.eq(a << b)
         self.assertStatement(stmt, [C(0b1001, 4), C(0)],  C(0b1001,    5))
         self.assertStatement(stmt, [C(0b1001, 4), C(3)],  C(0b1001000, 7))
-        self.assertStatement(stmt, [C(0b1001, 4), C(-2)], C(0b10,      7))
 
     def test_shr(self):
         stmt = lambda y, a, b: y.eq(a >> b)
         self.assertStatement(stmt, [C(0b1001, 4), C(0)],  C(0b1001,    4))
         self.assertStatement(stmt, [C(0b1001, 4), C(2)],  C(0b10,      4))
-        self.assertStatement(stmt, [C(0b1001, 4), C(-2)], C(0b100100,  5))
 
     def test_eq(self):
         stmt = lambda y, a, b: y.eq(a == b)
@@ -166,6 +180,13 @@ class SimulatorUnitTestCase(FHDLTestCase):
         self.assertStatement(stmt, [C(2, 4), C(3, 4), C(0)], C(3, 4))
         self.assertStatement(stmt, [C(2, 4), C(3, 4), C(1)], C(2, 4))
 
+    def test_abs(self):
+        stmt = lambda y, a: y.eq(abs(a))
+        self.assertStatement(stmt, [C(3,  unsigned(8))], C(3,  unsigned(8)))
+        self.assertStatement(stmt, [C(-3, unsigned(8))], C(-3, unsigned(8)))
+        self.assertStatement(stmt, [C(3,  signed(8))],   C(3,  signed(8)))
+        self.assertStatement(stmt, [C(-3, signed(8))],   C(3,  signed(8)))
+
     def test_slice(self):
         stmt1 = lambda y, a: y.eq(a[2])
         self.assertStatement(stmt1, [C(0b10110100, 8)], C(0b1,  1))
@@ -213,6 +234,13 @@ class SimulatorUnitTestCase(FHDLTestCase):
         stmt = lambda y, a: [Cat(l, m, n).eq(a), y.eq(Cat(n, m, l))]
         self.assertStatement(stmt, [C(0b100101110, 9)], C(0b110101100, 9))
 
+    def test_nested_cat_lhs(self):
+        l = Signal(3)
+        m = Signal(3)
+        n = Signal(3)
+        stmt = lambda y, a: [Cat(Cat(l, Cat(m)), n).eq(a), y.eq(Cat(n, m, l))]
+        self.assertStatement(stmt, [C(0b100101110, 9)], C(0b110101100, 9))
+
     def test_record(self):
         rec = Record([
             ("l", 1),
@@ -277,8 +305,9 @@ class SimulatorUnitTestCase(FHDLTestCase):
 class SimulatorIntegrationTestCase(FHDLTestCase):
     @contextmanager
     def assertSimulation(self, module, deadline=None):
-        with Simulator(module) as sim:
-            yield sim
+        sim = Simulator(module)
+        yield sim
+        with sim.write_vcd("test.vcd", "test.gtkw"):
             if deadline is None:
                 sim.run()
             else:
@@ -300,11 +329,15 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
                 yield Delay(1e-6)
                 self.assertEqual((yield self.count), 4)
                 yield self.sync.clk.eq(1)
+                self.assertEqual((yield self.count), 4)
+                yield Settle()
                 self.assertEqual((yield self.count), 5)
                 yield Delay(1e-6)
                 self.assertEqual((yield self.count), 5)
                 yield self.sync.clk.eq(0)
                 self.assertEqual((yield self.count), 5)
+                yield Settle()
+                self.assertEqual((yield self.count), 5)
                 for _ in range(3):
                     yield Delay(1e-6)
                     yield self.sync.clk.eq(1)
@@ -328,6 +361,26 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
                 self.assertEqual((yield self.count), 0)
             sim.add_sync_process(process)
 
+    def test_reset(self):
+        self.setUp_counter()
+        sim = Simulator(self.m)
+        sim.add_clock(1e-6)
+        times = 0
+        def process():
+            nonlocal times
+            self.assertEqual((yield self.count), 4)
+            yield
+            self.assertEqual((yield self.count), 5)
+            yield
+            self.assertEqual((yield self.count), 6)
+            yield
+            times += 1
+        sim.add_sync_process(process)
+        sim.run()
+        sim.reset()
+        sim.run()
+        self.assertEqual(times, 2)
+
     def setUp_alu(self):
         self.a = Signal(8)
         self.b = Signal(8)
@@ -406,7 +459,7 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
             def process():
                 yield self.i.eq(0b10101010)
                 yield self.i[:4].eq(-1)
-                yield Delay()
+                yield Settle()
                 self.assertEqual((yield self.i[:4]), 0b1111)
                 self.assertEqual((yield self.i), 0b10101111)
             sim.add_process(process)
@@ -426,10 +479,18 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
     def test_add_process_wrong(self):
         with self.assertSimulation(Module()) as sim:
             with self.assertRaises(TypeError,
-                    msg="Cannot add a process 1 because it is not a generator or "
-                        "a generator function"):
+                    msg="Cannot add a process 1 because it is not a generator function"):
                 sim.add_process(1)
 
+    def test_add_process_wrong_generator(self):
+        with self.assertSimulation(Module()) as sim:
+            with self.assertWarns(DeprecationWarning,
+                    msg="instead of generators, use generator functions as processes; "
+                        "this allows the simulator to be repeatedly reset"):
+                def process():
+                    yield Delay()
+                sim.add_process(process())
+
     def test_add_clock_wrong_twice(self):
         m = Module()
         s = Signal()
@@ -452,37 +513,18 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
         with self.assertSimulation(m) as sim:
             sim.add_clock(1, if_exists=True)
 
-    def test_eq_signal_unused_wrong(self):
-        self.setUp_lhs_rhs()
-        self.s = Signal()
-        with self.assertSimulation(self.m) as sim:
-            def process():
-                with self.assertRaisesRegex(ValueError,
-                        regex=r"Process .+? sent a request to set signal \(sig s\), "
-                              r"which is not a part of simulation"):
-                    yield self.s.eq(0)
-                yield Delay()
-            sim.add_process(process)
-
-    def test_eq_signal_comb_wrong(self):
-        self.setUp_lhs_rhs()
-        with self.assertSimulation(self.m) as sim:
-            def process():
-                with self.assertRaisesRegex(ValueError,
-                        regex=r"Process .+? sent a request to set signal \(sig o\), "
-                              r"which is a part of combinatorial assignment in simulation"):
-                    yield self.o.eq(0)
-                yield Delay()
-            sim.add_process(process)
-
     def test_command_wrong(self):
+        survived = False
         with self.assertSimulation(Module()) as sim:
             def process():
+                nonlocal survived
                 with self.assertRaisesRegex(TypeError,
                         regex=r"Received unsupported command 1 from process .+?"):
                     yield 1
-                yield Delay()
+                yield Settle()
+                survived = True
             sim.add_process(process)
+        self.assertTrue(survived)
 
     def setUp_memory(self, rd_synchronous=True, rd_transparent=True, wr_granularity=None):
         self.m = Module()
@@ -558,7 +600,7 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
                 self.assertEqual((yield self.rdport.data), 0xaa)
                 yield
                 self.assertEqual((yield self.rdport.data), 0xaa)
-                yield Delay(1e-6) # let comb propagate
+                yield Settle()
                 self.assertEqual((yield self.rdport.data), 0x33)
             sim.add_clock(1e-6)
             sim.add_sync_process(process)
@@ -571,11 +613,11 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
                 yield self.wrport.en.eq(1)
                 yield
                 self.assertEqual((yield self.rdport.data), 0xaa)
-                yield Delay(1e-6) # let comb propagate
+                yield Settle()
                 self.assertEqual((yield self.rdport.data), 0x33)
                 yield
                 yield self.rdport.addr.eq(1)
-                yield Delay(1e-6) # let comb propagate
+                yield Settle()
                 self.assertEqual((yield self.rdport.data), 0x33)
             sim.add_clock(1e-6)
             sim.add_sync_process(process)
@@ -585,10 +627,10 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
         with self.assertSimulation(self.m) as sim:
             def process():
                 yield self.rdport.addr.eq(0)
-                yield Delay()
+                yield Settle()
                 self.assertEqual((yield self.rdport.data), 0xaa)
                 yield self.rdport.addr.eq(1)
-                yield Delay()
+                yield Settle()
                 self.assertEqual((yield self.rdport.data), 0x55)
                 yield self.rdport.addr.eq(0)
                 yield self.wrport.addr.eq(0)
@@ -596,7 +638,7 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
                 yield self.wrport.en.eq(1)
                 yield Tick("sync")
                 self.assertEqual((yield self.rdport.data), 0xaa)
-                yield Delay(1e-6) # let comb propagate
+                yield Settle()
                 self.assertEqual((yield self.rdport.data), 0x33)
             sim.add_clock(1e-6)
             sim.add_process(process)
@@ -661,8 +703,38 @@ class SimulatorIntegrationTestCase(FHDLTestCase):
             sim.add_sync_process(process_gen)
             sim.add_sync_process(process_check)
 
-    def test_wrong_not_run(self):
-        with self.assertWarns(UserWarning,
-                msg="Simulation created, but not run"):
-            with Simulator(Fragment()) as sim:
+    def test_vcd_wrong_nonzero_time(self):
+        s = Signal()
+        m = Module()
+        m.d.sync += s.eq(s)
+        sim = Simulator(m)
+        sim.add_clock(1e-6)
+        sim.run_until(1e-5)
+        with self.assertRaisesRegex(ValueError,
+                regex=r"^Cannot start writing waveforms after advancing simulation time$"):
+            with sim.write_vcd(open(os.path.devnull, "wt")):
                 pass
+
+    def test_vcd_wrong_twice(self):
+        s = Signal()
+        m = Module()
+        m.d.sync += s.eq(s)
+        sim = Simulator(m)
+        sim.add_clock(1e-6)
+        with self.assertRaisesRegex(ValueError,
+                regex=r"^Already writing waveforms to .+$"):
+            with sim.write_vcd(open(os.path.devnull, "wt")):
+                with sim.write_vcd(open(os.path.devnull, "wt")):
+                    pass
+
+
+class SimulatorRegressionTestCase(FHDLTestCase):
+    def test_bug_325(self):
+        dut = Module()
+        dut.d.comb += Signal().eq(Cat())
+        Simulator(dut).run()
+
+    def test_bug_325_bis(self):
+        dut = Module()
+        dut.d.comb += Signal().eq(Repl(Const(1), 0))
+        Simulator(dut).run()