+""" Unit tests for Buffered and Unbuffered pipelines
+
+ contains useful worked examples of how to use the Pipeline API,
+ including:
+
+ * Combinatorial Stage "Chaining"
+ * class-based data stages
+ * nmigen module-based data stages
+ * special nmigen module-based data stage, where the stage *is* the module
+ * Record-based data stages
+ * static-class data stages
+ * multi-stage pipelines (and how to connect them)
+ * how to *use* the pipelines (see Test5) - how to get data in and out
+
+"""
+
from nmigen import Module, Signal, Mux
from nmigen.hdl.rec import Record
from nmigen.compat.sim import run_simulation
from example_buf_pipe import ExamplePipeline, UnbufferedPipeline
from example_buf_pipe import ExampleStageCls
from example_buf_pipe import PrevControl, NextControl, BufferedPipeline
-from example_buf_pipe import StageChain, PipelineBase
+from example_buf_pipe import StageChain, ControlBase, StageCls
from random import randint
if o == len(data):
break
+######################################################################
+# Test 2 and 4
+######################################################################
-class ExampleBufPipe2(PipelineBase):
+class ExampleBufPipe2(ControlBase):
"""
- connect these: ------|---------------|
- v v
+ connect these: ------|---------------|
+ v v
i_p_valid >>in pipe1 o_n_valid out>> i_p_valid >>in pipe2
o_p_ready <<out pipe1 i_n_ready <<in o_p_ready <<out pipe2
p_i_data >>in pipe1 p_i_data out>> n_o_data >>in pipe2
"""
def __init__(self):
- PipelineBase.__init__(self)
+ ControlBase.__init__(self)
# input / output
self.p.i_data = Signal(32) # >>in - comes in from the PREVIOUS stage
% (i, o, o_data, repr(expected))
+######################################################################
+# Test 6 and 10
+######################################################################
+
class SetLessThan:
def __init__(self, width, signed):
self.src1 = Signal((width, signed))
return m
-class LTStage:
+class LTStage(StageCls):
+ """ module-based stage example
+ """
def __init__(self):
self.slt = SetLessThan(16, True)
return self.o
-class LTStageDerived(SetLessThan):
+class LTStageDerived(SetLessThan, StageCls):
+ """ special version of a nmigen module where the module is also a stage
+
+ shows that you don't actually need to combinatorially connect
+ to the outputs, or add the module as a submodule: just return
+ the module output parameter(s) from the Stage.process() function
+ """
def __init__(self):
SetLessThan.__init__(self, 16, True)
class ExampleLTPipeline(UnbufferedPipeline):
- """ an example of how to use the combinatorial pipeline.
+ """ an example of how to use the unbuffered pipeline.
"""
def __init__(self):
class ExampleLTBufferedPipeDerived(BufferedPipeline):
- """ an example of how to use the combinatorial pipeline.
+ """ an example of how to use the buffered pipeline.
"""
def __init__(self):
% (i, o, o_data, repr(expected))
-class ExampleAddRecordStage:
+######################################################################
+# Test 7
+######################################################################
+
+class ExampleAddRecordStage(StageCls):
""" example use of a Record
"""
return {'src1': i.src1 + 1,
'src2': i.src2 + 1}
+######################################################################
+# Test 11
+######################################################################
-class ExampleAddRecordPlaceHolderStage:
+class ExampleAddRecordPlaceHolderStage(StageCls):
""" example use of a Record, with a placeholder as the processing result
"""
o.src2 = i.src2 + 1
return o
+
class PlaceHolder: pass
% (i, o, repr(o_data), repr(expected))
+######################################################################
+# Test 8
+######################################################################
+
+
class Example2OpClass:
""" an example of a class used to store 2 operands.
requires an eq function, to conform with the pipeline stage API
return [self.op1.eq(i.op1), self.op2.eq(i.op2)]
-class ExampleAddClassStage:
+class ExampleAddClassStage(StageCls):
""" an example of how to use the buffered pipeline, as a class instance
"""
data.append(TestInputAdd(randint(0, 1<<16-1), randint(0, 1<<16-1)))
return data
-class InputPriorityArbiter:
- def __init__(self, pipe, num_rows):
- self.pipe = pipe
- self.num_rows = num_rows
- self.mmax = int(log(self.num_rows) / log(2))
- self.mid = Signal(self.mmax, reset_less=True) # multiplex id
- self.active = Signal(reset_less=True)
-
- def elaborate(self, platform):
- m = Module()
-
- assert len(self.pipe.p) == self.num_rows, \
- "must declare input to be same size"
- pe = PriorityEncoder(self.num_rows)
- m.submodules.selector = pe
-
- # connect priority encoder
- in_ready = []
- for i in range(self.num_rows):
- p_i_valid = Signal(reset_less=True)
- m.d.comb += p_i_valid.eq(self.pipe[i].i_valid_logic())
- in_ready.append(p_i_valid)
- m.d.comb += pe.i.eq(Cat(*in_ready)) # array of input "valids"
- m.d.comb += self.active.eq(~pe.n) # encoder active (one input valid)
- m.d.comb += self.mid.eq(pe.o) # output one active input
-
- return m
-
- def ports(self):
- return [self.mid, self.active]
-
-
-class PriorityUnbufferedPipeline(UnbufferedPipeline):
- def __init__(self, stage, p_len=4):
- p_mux = InputPriorityArbiter(self, p_len)
- UnbufferedPipeline.__init__(stage, p_len=p_len, p_mux=p_mux)
-
- def elaborate(self, platform):
- m = Module()
-
- pe = PriorityEncoder(self.num_rows)
- m.submodules.selector = pe
- m.submodules.out_op = self.out_op
- m.submodules += self.rs
-
- # connect priority encoder
- in_ready = []
- for i in range(self.num_rows):
- in_ready.append(self.rs[i].ready)
- m.d.comb += pe.i.eq(Cat(*in_ready))
-
- active = Signal(reset_less=True)
- out_en = Signal(reset_less=True)
- m.d.comb += active.eq(~pe.n) # encoder active
- m.d.comb += out_en.eq(active & self.out_op.trigger)
-
- # encoder active: ack relevant input, record MID, pass output
- with m.If(out_en):
- rs = self.rs[pe.o]
- m.d.sync += self.mid.eq(pe.o)
- m.d.sync += rs.ack.eq(0)
- m.d.sync += self.out_op.stb.eq(0)
- for j in range(self.num_ops):
- m.d.sync += self.out_op.v[j].eq(rs.out_op[j])
- with m.Else():
- m.d.sync += self.out_op.stb.eq(1)
- # acks all default to zero
- for i in range(self.num_rows):
- m.d.sync += self.rs[i].ack.eq(1)
-
- return m
-
- def ports(self):
- res = []
- for i in range(self.num_rows):
- inop = self.rs[i]
- res += inop.in_op + [inop.stb]
- return self.out_op.ports() + res + [self.mid]
-
-
num_tests = 100