from nmigen import Signal, Cat, Const, Mux, Module
from nmigen.cli import verilog, rtlil
+from collections.abc import Sequence
-class ExampleStage:
- """ an example of how to use the buffered pipeline. actual names of
- variables (i_data, r_data, o_data, result) below do not matter:
- the functions however do.
+class IOAckIn:
+
+ def __init__(self):
+ self.p_valid = Signal() # >>in - comes in from PREVIOUS stage
+ self.n_ready = Signal() # in<< - comes in from the NEXT stage
+
+
+class IOAckOut:
+
+ def __init__(self):
+ self.n_valid = Signal() # out>> - goes out to the NEXT stage
+ self.p_ready = Signal() # <<out - goes out to the PREVIOUS stage
+
+
+def eq(o, i):
+ if not isinstance(o, Sequence):
+ o, i = [o], [i]
+ res = []
+ for (ao, ai) in zip(o, i):
+ res.append(ao.eq(ai))
+ return res
+
+
+class BufferedPipeline:
+ """ buffered pipeline stage. data and strobe signals travel in sync.
+ if ever the input is ready and the output is not, processed data
+ is stored in a temporary register.
+
+ stage-1 i.p_valid >>in stage o.n_valid out>> stage+1
+ stage-1 o.p_ready <<out stage i.n_ready <<in stage+1
+ stage-1 i.data >>in stage o.data out>> stage+1
+ | |
+ process --->----^
+ | |
+ +-- r_data ->-+
input data i_data is read (only), is processed and goes into an
intermediate result store [process()]. this is updated combinatorially.
on the next cycle (as long as stall is not raised again) the
input may begin to be processed and transferred directly to output.
"""
-
- def __init__(self):
- """ i_data can be a DIFFERENT type from everything else
- o_data, r_data and result are best of the same type.
- however this is not strictly necessary. an intermediate
- transformation process could hypothetically be applied, however
- it is result and r_data that definitively need to be of the same
- (intermediary) type, as it is both result and r_data that
- are transferred into o_data:
-
- i_data -> process() -> result --> o_data
+ def __init__(self, stage):
+ """ pass in a "stage" which may be either a static class or a class
+ instance, which has three functions:
+ * ispec: returns input signals according to the input specification
+ * ispec: returns output signals to the output specification
+ * process: takes an input instance and returns processed data
+
+ i_data -> process() -> result --> o.data
| ^
| |
+-> r_data -+
"""
- self.i_data = Signal(16)
- self.r_data = Signal(16)
- self.o_data = Signal(16)
- self.result = Signal(16)
-
- def process(self):
- """ process the input data and store it in result.
- (not needed to be known: result is combinatorial)
- """
- return self.result.eq(self.i_data + 1)
+ # input: strobe comes in from previous stage, ready comes in from next
+ self.i = IOAckIn()
+ #self.i.p_valid = Signal() # >>in - comes in from PREVIOUS stage
+ #self.i.n_ready = Signal() # in<< - comes in from the NEXT stage
+
+ # output: strobe goes out to next stage, ready comes in from previous
+ self.o = IOAckOut()
+ #self.o.n_valid = Signal() # out>> - goes out to the NEXT stage
+ #self.o.p_ready = Signal() # <<out - goes out to the PREVIOUS stage
+
+ # set up the input and output data
+ self.i.data = stage.ispec() # input type
+ self.r_data = stage.ospec() # all these are output type
+ self.result = stage.ospec()
+ self.o.data = stage.ospec()
+ self.stage = stage
+
+ def set_input(self, i):
+ return eq(self.i.data, i)
def update_buffer(self):
""" copies the result into the intermediate register r_data,
which will need to be outputted on a subsequent cycle
prior to allowing "normal" operation.
"""
- return self.r_data.eq(self.result)
+ return eq(self.r_data, self.result)
def update_output(self):
""" copies the (combinatorial) result into the output
"""
- return self.o_data.eq(self.result)
+ return eq(self.o.data, self.result)
def flush_buffer(self):
""" copies the *intermediate* register r_data into the output
"""
- return self.o_data.eq(self.r_data)
+ return eq(self.o.data, self.r_data)
def ports(self):
- return [self.i_data, self.o_data]
-
-class IOAckIn:
-
- def __init__(self):
- self.p_valid = Signal() # >>in - comes in from PREVIOUS stage
- self.n_ready = Signal() # in<< - comes in from the NEXT stage
-
-
-class IOAckOut:
-
- def __init__(self):
- self.n_valid = Signal() # out>> - goes out to the NEXT stage
- self.p_ready = Signal() # <<out - goes out to the PREVIOUS stage
-
-
-class BufferedPipeline:
- """ buffered pipeline stage
-
- stage-1 i.p_valid >>in stage o.n_valid out>> stage+1
- stage-1 o.p_ready <<out stage i.n_ready <<in stage+1
- stage-1 i_data >>in stage o_data out>> stage+1
- | |
- +-------> process
- | |
- +-- r_data ---+
- """
- def __init__(self):
- # input: strobe comes in from previous stage, ready comes in from next
- self.i = IOAckIn()
- #self.i.p_valid = Signal() # >>in - comes in from PREVIOUS stage
- #self.i.n_ready = Signal() # in<< - comes in from the NEXT stage
-
- # output: strobe goes out to next stage, ready comes in from previous
- self.o = IOAckOut()
- #self.o.n_valid = Signal() # out>> - goes out to the NEXT stage
- #self.o.p_ready = Signal() # <<out - goes out to the PREVIOUS stage
+ return [self.i.data, self.o.data]
def elaborate(self, platform):
m = Module()
# store result of processing in combinatorial temporary
with m.If(self.i.p_valid): # input is valid: process it
- m.d.comb += self.stage.process()
+ m.d.comb += eq(self.result, self.stage.process(self.i.data))
# if not in stall condition, update the temporary register
with m.If(self.o.p_ready): # not stalled
- m.d.sync += self.stage.update_buffer()
+ m.d.sync += self.update_buffer()
#with m.If(self.i.p_rst): # reset
# m.d.sync += self.o.n_valid.eq(0)
with m.If(self.o.p_ready): # not stalled
# nothing in buffer: send (processed) input direct to output
m.d.sync += [self.o.n_valid.eq(self.i.p_valid),
- self.stage.update_output(),
+ self.update_output(),
]
with m.Else(): # o.p_ready is false, and something is in buffer.
# Flush the [already processed] buffer to the output port.
m.d.sync += [self.o.n_valid.eq(1),
- self.stage.flush_buffer(),
+ self.flush_buffer(),
# clear stall condition, declare register empty.
self.o.p_ready.eq(1),
]
m.d.sync += [self.o.n_valid.eq(self.i.p_valid),
self.o.p_ready.eq(1), # Keep the buffer empty
# set the output data (from comb result)
- self.stage.update_output(),
+ self.update_output(),
]
# (i.n_ready) false and (o.n_valid) true:
with m.Elif(i_p_valid_o_p_ready):
]
-class ExampleBufPipe(BufferedPipeline):
+class ExampleAddStage:
+ """ an example of how to use the buffered pipeline, as a class instance
+ """
+
+ def ispec(self):
+ """ returns a tuple of input signals which will be the incoming data
+ """
+ return (Signal(16), Signal(16))
+
+ def ospec(self):
+ """ returns an output signal which will happen to contain the sum
+ of the two inputs
+ """
+ return Signal(16)
+
+ def process(self, i):
+ """ process the input data (sums the values in the tuple) and returns it
+ """
+ return i[0] + i[1]
+
+
+class ExampleBufPipeAdd(BufferedPipeline):
+ """ an example of how to use the buffered pipeline, using a class instance
+ """
def __init__(self):
- BufferedPipeline.__init__(self)
- self.stage = ExampleStage()
+ addstage = ExampleAddStage()
+ BufferedPipeline.__init__(self, addstage)
- def ports(self):
- return self.stage.ports() + BufferedPipeline.ports(self)
+
+class ExampleStage:
+ """ an example of how to use the buffered pipeline, in a static class
+ fashion
+ """
+
+ def ispec():
+ return Signal(16)
+
+ def ospec():
+ return Signal(16)
+
+ def process(i):
+ """ process the input data and returns it (adds 1)
+ """
+ return i + 1
+
+
+class ExampleBufPipe(BufferedPipeline):
+ """ an example of how to use the buffered pipeline.
+ """
+
+ def __init__(self):
+ BufferedPipeline.__init__(self, ExampleStage)
if __name__ == '__main__':
- dut = BufPipe()
+ dut = ExampleBufPipe()
vl = rtlil.convert(dut, ports=dut.ports())
with open("test_bufpipe.il", "w") as f:
f.write(vl)
from nmigen import Module, Signal
from nmigen.compat.sim import run_simulation
-from example_buf_pipe import ExampleBufPipe
+from example_buf_pipe import ExampleBufPipe, ExampleBufPipeAdd
from random import randint
yield
#yield dut.i_p_rst.eq(0)
yield dut.i.n_ready.eq(1)
- yield dut.stage.i_data.eq(5)
+ yield dut.i.data.eq(5)
yield dut.i.p_valid.eq(1)
yield
- yield dut.stage.i_data.eq(7)
+ yield dut.i.data.eq(7)
yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed
yield
yield from check_o_n_valid(dut, 1) # ok *now* i_p_valid effect is felt
- yield dut.stage.i_data.eq(2)
+ yield dut.i.data.eq(2)
yield
yield dut.i.n_ready.eq(0) # begin going into "stall" (next stage says ready)
- yield dut.stage.i_data.eq(9)
+ yield dut.i.data.eq(9)
yield
yield dut.i.p_valid.eq(0)
- yield dut.stage.i_data.eq(12)
+ yield dut.i.data.eq(12)
yield
- yield dut.stage.i_data.eq(32)
+ yield dut.i.data.eq(32)
yield dut.i.n_ready.eq(1)
yield
yield from check_o_n_valid(dut, 1) # buffer still needs to output
def __init__(self, dut):
self.dut = dut
self.data = []
- for i in range(10000):
+ for i in range(num_tests):
#data.append(randint(0, 1<<16-1))
self.data.append(i+1)
self.i = 0
continue
if send and self.i != len(self.data):
yield self.dut.i.p_valid.eq(1)
- yield self.dut.stage.i_data.eq(self.data[self.i])
+ yield self.dut.i.data.eq(self.data[self.i])
self.i += 1
else:
yield self.dut.i.p_valid.eq(0)
i_n_ready = yield self.dut.i.n_ready
if not o_n_valid or not i_n_ready:
continue
- o_data = yield self.dut.stage.o_data
+ o_data = yield self.dut.o.data
assert o_data == self.data[self.o] + 1, \
"%d-%d data %x not match %x\n" \
% (self.i, self.o, o_data, self.data[self.o])
break
+class Test5:
+ def __init__(self, dut):
+ self.dut = dut
+ self.data = []
+ for i in range(num_tests):
+ self.data.append((randint(0, 1<<14), randint(0, 1<<14)))
+ self.i = 0
+ self.o = 0
+
+ def send(self):
+ while self.o != len(self.data):
+ send_range = randint(0, 3)
+ for j in range(randint(1,10)):
+ if send_range == 0:
+ send = True
+ else:
+ send = randint(0, send_range) != 0
+ o_p_ready = yield self.dut.o.p_ready
+ if not o_p_ready:
+ yield
+ continue
+ if send and self.i != len(self.data):
+ yield self.dut.i.p_valid.eq(1)
+ for v in self.dut.set_input(self.data[self.i]):
+ yield v
+ self.i += 1
+ else:
+ yield self.dut.i.p_valid.eq(0)
+ yield
+
+ def rcv(self):
+ while self.o != len(self.data):
+ stall_range = randint(0, 3)
+ for j in range(randint(1,10)):
+ stall = randint(0, stall_range) != 0
+ yield self.dut.i.n_ready.eq(stall)
+ yield
+ o_n_valid = yield self.dut.o.n_valid
+ i_n_ready = yield self.dut.i.n_ready
+ if not o_n_valid or not i_n_ready:
+ continue
+ o_data = yield self.dut.o.data
+ res = self.data[self.o][0] + self.data[self.o][1]
+ assert o_data == res, \
+ "%d-%d data %x not match %s\n" \
+ % (self.i, self.o, o_data, repr(self.data[self.o]))
+ self.o += 1
+ if self.o == len(self.data):
+ break
+
+
def testbench4(dut):
data = []
- for i in range(10000):
+ for i in range(num_tests):
#data.append(randint(0, 1<<16-1))
data.append(i+1)
i = 0
v v
i_p_valid >>in pipe1 o_n_valid out>> i_p_valid >>in pipe2
o_p_ready <<out pipe1 i_n_ready <<in o_p_ready <<out pipe2
- stage.i_data >>in pipe1 o_data out>> stage.i_data >>in pipe2
+ i_data >>in pipe1 o_data out>> i_data >>in pipe2
"""
def __init__(self):
self.pipe1 = ExampleBufPipe()
# connect inter-pipe input/output valid/ready/data
m.d.comb += self.pipe2.i.p_valid.eq(self.pipe1.o.n_valid)
m.d.comb += self.pipe1.i.n_ready.eq(self.pipe2.o.p_ready)
- m.d.comb += self.pipe2.stage.i_data.eq(self.pipe1.stage.o_data)
+ m.d.comb += self.pipe2.i.data.eq(self.pipe1.o.data)
# inputs/outputs to the module: pipe1 connections here (LHS)
m.d.comb += self.pipe1.i.p_valid.eq(self.i_p_valid)
m.d.comb += self.o_p_ready.eq(self.pipe1.o.p_ready)
- m.d.comb += self.pipe1.stage.i_data.eq(self.i_data)
+ m.d.comb += self.pipe1.i.data.eq(self.i_data)
# now pipe2 connections (RHS)
m.d.comb += self.o_n_valid.eq(self.pipe2.o.n_valid)
m.d.comb += self.pipe2.i.n_ready.eq(self.i_n_ready)
- m.d.comb += self.o_data.eq(self.pipe2.stage.o_data)
+ m.d.comb += self.o_data.eq(self.pipe2.o.data)
return m
+num_tests = 10000
+
if __name__ == '__main__':
print ("test 1")
dut = ExampleBufPipe()
print ("test 4")
dut = ExampleBufPipe2()
run_simulation(dut, testbench4(dut), vcd_name="test_bufpipe4.vcd")
+
+ print ("test 5")
+ dut = ExampleBufPipeAdd()
+ test = Test5(dut)
+ run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe5.vcd")
+