yield self.delay.eq(delay)
+class ResultConsumer:
+ """
+ Consumes a result when requested by the Computation Unit
+ (`dut` parameter), using the `rel_o` / `go_i` handshake.
+
+ Attaches itself to the `dut` result indexed by `op_index`.
+
+ Has a programmable delay between the assertion of `rel_o` and the
+ `go_i` pulse.
+
+ Data is retrieved only during the cycle in which `go_i` is active.
+
+ It adds itself as a passive process to the simulation (`sim` parameter).
+ Since it is passive, it will not hang the simulation, and does not need a
+ flag to terminate itself.
+ """
+ def __init__(self, sim, dut, op_index):
+ # data and handshake signals from the DUT
+ self.port = dut.dest[op_index]
+ self.go_i = dut.wr.go_i[op_index]
+ self.rel_o = dut.wr.rel_o[op_index]
+ # transaction parameters, passed via signals
+ self.delay = Signal(8)
+ self.expected = Signal.like(self.port)
+ # add ourselves to the simulation process list
+ sim.add_sync_process(self._process)
+
+ def _process(self):
+ yield Passive()
+ while True:
+ # Settle() is needed to give a quick response to
+ # the zero delay case
+ yield Settle()
+ # wait for rel_o to become active
+ while not (yield self.rel_o):
+ yield
+ yield Settle()
+ # read the transaction parameters
+ delay = (yield self.delay)
+ expected = (yield self.expected)
+ # wait for `delay` cycles
+ for _ in range(delay):
+ yield
+ # activate go_i for one cycle
+ yield self.go_i.eq(1)
+ yield
+ # check received data against the expected value
+ result = (yield self.port)
+ assert result == expected,\
+ f"expected {expected}, received {result}"
+ yield self.go_i.eq(0)
+ yield self.port.eq(0)
+
+ def receive(self, expected, delay):
+ """
+ Schedules the module to receive some result,
+ counting `delay` cycles after `rel_i` becomes active.
+ As 'go_i' goes active, check the result with `expected`.
+
+ To be called from the main test-bench process,
+ it returns in the same cycle.
+
+ Communication with the worker process is done by means of
+ combinatorial simulation-only signals.
+ """
+ yield self.expected.eq(expected)
+ yield self.delay.eq(delay)
+
+
def op_sim(dut, a, b, op, inv_a=0, imm=0, imm_ok=0, zero_a=0):
yield dut.issue_i.eq(0)
yield
return result
-def scoreboard_sim_fsm(dut, producers):
+def scoreboard_sim_fsm(dut, producers, consumers):
- def op_sim_fsm(a, b, direction, delays):
- print("op_sim_fsm", a, b, direction)
+ def op_sim_fsm(a, b, direction, expected, delays):
+ print("op_sim_fsm", a, b, direction, expected)
yield dut.issue_i.eq(0)
yield
- # forward data and delays to the producers
+ # forward data and delays to the producers and consumers
yield from producers[0].send(a, delays[0])
yield from producers[1].send(b, delays[1])
+ yield from consumers[0].receive(expected, delays[2])
+ # submit operation, and assert issue_i for one cycle
yield dut.oper_i.sdir.eq(direction)
yield dut.issue_i.eq(1)
yield
yield dut.issue_i.eq(0)
- yield
-
- req_rel_o = yield dut.wr.rel_o
- res = yield dut.data_o
- print("req_rel", req_rel_o, res)
- while True:
- req_rel_o = yield dut.wr.rel_o
- res = yield dut.data_o
- print("req_rel", req_rel_o, res)
- if req_rel_o:
- break
- yield
- yield dut.wr.go_i[0].eq(1)
+ # wait for busy to be negated
yield Settle()
- res = yield dut.data_o
- yield
- print("result", res)
- yield dut.wr.go_i[0].eq(0)
- yield
- return res
-
- result = yield from op_sim_fsm(13, 2, 1, [0, 2])
- assert result == 3, result
-
- result = yield from op_sim_fsm(3, 4, 0, [2, 0])
- assert result == 48, result
+ while (yield dut.busy_o):
+ yield
+ yield Settle()
- result = yield from op_sim_fsm(21, 0, 0, [1, 1])
- assert result == 21, result
+ yield from op_sim_fsm(13, 2, 1, 3, [0, 2, 0])
+ yield from op_sim_fsm(3, 4, 0, 48, [2, 0, 2])
+ yield from op_sim_fsm(21, 0, 0, 21, [1, 1, 1])
def scoreboard_sim_dummy(dut):
# create one operand producer for each input port
prod_a = OperandProducer(sim, dut, 0)
prod_b = OperandProducer(sim, dut, 1)
- sim.add_sync_process(wrap(scoreboard_sim_fsm(dut, [prod_a, prod_b])))
+ # create an result consumer for the output port
+ cons = ResultConsumer(sim, dut, 0)
+ sim.add_sync_process(wrap(scoreboard_sim_fsm(dut,
+ [prod_a, prod_b],
+ [cons])))
sim_writer = sim.write_vcd('test_compunit_fsm1.vcd')
with sim_writer:
sim.run()
# create one operand producer for each input port
prod_a = OperandProducer(sim, dut, 0)
prod_b = OperandProducer(sim, dut, 1)
- sim.add_sync_process(wrap(scoreboard_sim_fsm(dut, [prod_a, prod_b])))
+ # create an result consumer for the output port
+ cons = ResultConsumer(sim, dut, 0)
+ sim.add_sync_process(wrap(scoreboard_sim_fsm(dut,
+ [prod_a, prod_b],
+ [cons])))
sim_writer = sim.write_vcd('test_compunit_regspec2_fsm.vcd')
with sim_writer:
sim.run()