+import os
from contextlib import contextmanager
-from .tools import *
-from ..tools import flatten, union
+from .utils import *
+from .._utils import flatten, union
from ..hdl.ast import *
from ..hdl.cd import *
from ..hdl.mem import *
for signal in flatten(s._lhs_signals() for s in Statement.cast(stmt)):
frag.add_driver(signal)
- with Simulator(frag,
- vcd_file =open("test.vcd", "w"),
- gtkw_file=open("test.gtkw", "w"),
- traces=[*isigs, osig]) as sim:
- def process():
- for isig, input in zip(isigs, inputs):
- yield isig.eq(input)
- yield Delay()
- self.assertEqual((yield osig), output.value)
- sim.add_process(process)
+ sim = Simulator(frag)
+ def process():
+ for isig, input in zip(isigs, inputs):
+ yield isig.eq(input)
+ yield Settle()
+ self.assertEqual((yield osig), output.value)
+ sim.add_process(process)
+ with sim.write_vcd("test.vcd", "test.gtkw", traces=[*isigs, osig]):
sim.run()
def test_invert(self):
self.assertStatement(stmt, [C(1, 4)], C(1))
self.assertStatement(stmt, [C(2, 4)], C(1))
+ def test_as_unsigned(self):
+ stmt = lambda y, a, b: y.eq(a.as_unsigned() == b)
+ self.assertStatement(stmt, [C(0b01, signed(2)), C(0b0001, unsigned(4))], C(1))
+ self.assertStatement(stmt, [C(0b11, signed(2)), C(0b0011, unsigned(4))], C(1))
+
+ def test_as_signed(self):
+ stmt = lambda y, a, b: y.eq(a.as_signed() == b)
+ self.assertStatement(stmt, [C(0b01, unsigned(2)), C(0b0001, signed(4))], C(1))
+ self.assertStatement(stmt, [C(0b11, unsigned(2)), C(0b1111, signed(4))], C(1))
+
def test_any(self):
stmt = lambda y, a: y.eq(a.any())
self.assertStatement(stmt, [C(0b00, 2)], C(0))
self.assertStatement(stmt, [C(2, 4), C(2, 4)], C(1, 8))
self.assertStatement(stmt, [C(7, 4), C(2, 4)], C(3, 8))
+ def test_mod(self):
+ stmt = lambda y, a, b: y.eq(a % b)
+ self.assertStatement(stmt, [C(2, 4), C(0, 4)], C(0, 8))
+ self.assertStatement(stmt, [C(2, 4), C(1, 4)], C(0, 8))
+ self.assertStatement(stmt, [C(2, 4), C(2, 4)], C(0, 8))
+ self.assertStatement(stmt, [C(7, 4), C(2, 4)], C(1, 8))
+
def test_and(self):
stmt = lambda y, a, b: y.eq(a & b)
self.assertStatement(stmt, [C(0b1100, 4), C(0b1010, 4)], C(0b1000, 4))
stmt = lambda y, a, b: y.eq(a << b)
self.assertStatement(stmt, [C(0b1001, 4), C(0)], C(0b1001, 5))
self.assertStatement(stmt, [C(0b1001, 4), C(3)], C(0b1001000, 7))
- self.assertStatement(stmt, [C(0b1001, 4), C(-2)], C(0b10, 7))
def test_shr(self):
stmt = lambda y, a, b: y.eq(a >> b)
self.assertStatement(stmt, [C(0b1001, 4), C(0)], C(0b1001, 4))
self.assertStatement(stmt, [C(0b1001, 4), C(2)], C(0b10, 4))
- self.assertStatement(stmt, [C(0b1001, 4), C(-2)], C(0b100100, 5))
def test_eq(self):
stmt = lambda y, a, b: y.eq(a == b)
self.assertStatement(stmt, [C(2, 4), C(3, 4), C(0)], C(3, 4))
self.assertStatement(stmt, [C(2, 4), C(3, 4), C(1)], C(2, 4))
+ def test_abs(self):
+ stmt = lambda y, a: y.eq(abs(a))
+ self.assertStatement(stmt, [C(3, unsigned(8))], C(3, unsigned(8)))
+ self.assertStatement(stmt, [C(-3, unsigned(8))], C(-3, unsigned(8)))
+ self.assertStatement(stmt, [C(3, signed(8))], C(3, signed(8)))
+ self.assertStatement(stmt, [C(-3, signed(8))], C(3, signed(8)))
+
def test_slice(self):
stmt1 = lambda y, a: y.eq(a[2])
self.assertStatement(stmt1, [C(0b10110100, 8)], C(0b1, 1))
stmt = lambda y, a: [Cat(l, m, n).eq(a), y.eq(Cat(n, m, l))]
self.assertStatement(stmt, [C(0b100101110, 9)], C(0b110101100, 9))
+ def test_nested_cat_lhs(self):
+ l = Signal(3)
+ m = Signal(3)
+ n = Signal(3)
+ stmt = lambda y, a: [Cat(Cat(l, Cat(m)), n).eq(a), y.eq(Cat(n, m, l))]
+ self.assertStatement(stmt, [C(0b100101110, 9)], C(0b110101100, 9))
+
def test_record(self):
rec = Record([
("l", 1),
class SimulatorIntegrationTestCase(FHDLTestCase):
@contextmanager
def assertSimulation(self, module, deadline=None):
- with Simulator(module) as sim:
- yield sim
+ sim = Simulator(module)
+ yield sim
+ with sim.write_vcd("test.vcd", "test.gtkw"):
if deadline is None:
sim.run()
else:
yield Delay(1e-6)
self.assertEqual((yield self.count), 4)
yield self.sync.clk.eq(1)
+ self.assertEqual((yield self.count), 4)
+ yield Settle()
self.assertEqual((yield self.count), 5)
yield Delay(1e-6)
self.assertEqual((yield self.count), 5)
yield self.sync.clk.eq(0)
self.assertEqual((yield self.count), 5)
+ yield Settle()
+ self.assertEqual((yield self.count), 5)
for _ in range(3):
yield Delay(1e-6)
yield self.sync.clk.eq(1)
self.assertEqual((yield self.count), 0)
sim.add_sync_process(process)
+ def test_reset(self):
+ self.setUp_counter()
+ sim = Simulator(self.m)
+ sim.add_clock(1e-6)
+ times = 0
+ def process():
+ nonlocal times
+ self.assertEqual((yield self.count), 4)
+ yield
+ self.assertEqual((yield self.count), 5)
+ yield
+ self.assertEqual((yield self.count), 6)
+ yield
+ times += 1
+ sim.add_sync_process(process)
+ sim.run()
+ sim.reset()
+ sim.run()
+ self.assertEqual(times, 2)
+
def setUp_alu(self):
self.a = Signal(8)
self.b = Signal(8)
def process():
yield self.i.eq(0b10101010)
yield self.i[:4].eq(-1)
- yield Delay()
+ yield Settle()
self.assertEqual((yield self.i[:4]), 0b1111)
self.assertEqual((yield self.i), 0b10101111)
sim.add_process(process)
def test_add_process_wrong(self):
with self.assertSimulation(Module()) as sim:
with self.assertRaises(TypeError,
- msg="Cannot add a process 1 because it is not a generator or "
- "a generator function"):
+ msg="Cannot add a process 1 because it is not a generator function"):
sim.add_process(1)
+ def test_add_process_wrong_generator(self):
+ with self.assertSimulation(Module()) as sim:
+ with self.assertWarns(DeprecationWarning,
+ msg="instead of generators, use generator functions as processes; "
+ "this allows the simulator to be repeatedly reset"):
+ def process():
+ yield Delay()
+ sim.add_process(process())
+
def test_add_clock_wrong_twice(self):
m = Module()
s = Signal()
with self.assertSimulation(m) as sim:
sim.add_clock(1, if_exists=True)
- def test_eq_signal_unused_wrong(self):
- self.setUp_lhs_rhs()
- self.s = Signal()
- with self.assertSimulation(self.m) as sim:
- def process():
- with self.assertRaisesRegex(ValueError,
- regex=r"Process .+? sent a request to set signal \(sig s\), "
- r"which is not a part of simulation"):
- yield self.s.eq(0)
- yield Delay()
- sim.add_process(process)
-
- def test_eq_signal_comb_wrong(self):
- self.setUp_lhs_rhs()
- with self.assertSimulation(self.m) as sim:
- def process():
- with self.assertRaisesRegex(ValueError,
- regex=r"Process .+? sent a request to set signal \(sig o\), "
- r"which is a part of combinatorial assignment in simulation"):
- yield self.o.eq(0)
- yield Delay()
- sim.add_process(process)
-
def test_command_wrong(self):
+ survived = False
with self.assertSimulation(Module()) as sim:
def process():
+ nonlocal survived
with self.assertRaisesRegex(TypeError,
regex=r"Received unsupported command 1 from process .+?"):
yield 1
- yield Delay()
+ yield Settle()
+ survived = True
sim.add_process(process)
+ self.assertTrue(survived)
def setUp_memory(self, rd_synchronous=True, rd_transparent=True, wr_granularity=None):
self.m = Module()
self.assertEqual((yield self.rdport.data), 0xaa)
yield
self.assertEqual((yield self.rdport.data), 0xaa)
- yield Delay(1e-6) # let comb propagate
+ yield Settle()
self.assertEqual((yield self.rdport.data), 0x33)
sim.add_clock(1e-6)
sim.add_sync_process(process)
yield self.wrport.en.eq(1)
yield
self.assertEqual((yield self.rdport.data), 0xaa)
- yield Delay(1e-6) # let comb propagate
+ yield Settle()
self.assertEqual((yield self.rdport.data), 0x33)
yield
yield self.rdport.addr.eq(1)
- yield Delay(1e-6) # let comb propagate
+ yield Settle()
self.assertEqual((yield self.rdport.data), 0x33)
sim.add_clock(1e-6)
sim.add_sync_process(process)
with self.assertSimulation(self.m) as sim:
def process():
yield self.rdport.addr.eq(0)
- yield Delay()
+ yield Settle()
self.assertEqual((yield self.rdport.data), 0xaa)
yield self.rdport.addr.eq(1)
- yield Delay()
+ yield Settle()
self.assertEqual((yield self.rdport.data), 0x55)
yield self.rdport.addr.eq(0)
yield self.wrport.addr.eq(0)
yield self.wrport.en.eq(1)
yield Tick("sync")
self.assertEqual((yield self.rdport.data), 0xaa)
- yield Delay(1e-6) # let comb propagate
+ yield Settle()
self.assertEqual((yield self.rdport.data), 0x33)
sim.add_clock(1e-6)
sim.add_process(process)
sim.add_sync_process(process_gen)
sim.add_sync_process(process_check)
- def test_wrong_not_run(self):
- with self.assertWarns(UserWarning,
- msg="Simulation created, but not run"):
- with Simulator(Fragment()) as sim:
+ def test_vcd_wrong_nonzero_time(self):
+ s = Signal()
+ m = Module()
+ m.d.sync += s.eq(s)
+ sim = Simulator(m)
+ sim.add_clock(1e-6)
+ sim.run_until(1e-5)
+ with self.assertRaisesRegex(ValueError,
+ regex=r"^Cannot start writing waveforms after advancing simulation time$"):
+ with sim.write_vcd(open(os.path.devnull, "wt")):
pass
+
+ def test_vcd_wrong_twice(self):
+ s = Signal()
+ m = Module()
+ m.d.sync += s.eq(s)
+ sim = Simulator(m)
+ sim.add_clock(1e-6)
+ with self.assertRaisesRegex(ValueError,
+ regex=r"^Already writing waveforms to .+$"):
+ with sim.write_vcd(open(os.path.devnull, "wt")):
+ with sim.write_vcd(open(os.path.devnull, "wt")):
+ pass
+
+
+class SimulatorRegressionTestCase(FHDLTestCase):
+ def test_bug_325(self):
+ dut = Module()
+ dut.d.comb += Signal().eq(Cat())
+ Simulator(dut).run()
+
+ def test_bug_325_bis(self):
+ dut = Module()
+ dut.d.comb += Signal().eq(Repl(Const(1), 0))
+ Simulator(dut).run()