1 """Simple example of a FSM-based ALU
3 This demonstrates a design that follows the valid/ready protocol of the
4 ALU, but with a FSM implementation, instead of a pipeline. It is also
5 intended to comply with both the CompALU API and the nmutil Pipeline API
6 (Liskov Substitution Principle)
10 1) p.ready_o is asserted on the initial ("Idle") state, otherwise it keeps low.
11 2) n.valid_o is asserted on the final ("Done") state, otherwise it keeps low.
12 3) The FSM stays in the Idle state while p.valid_i is low, otherwise
13 it accepts the input data and moves on.
14 4) The FSM stays in the Done state while n.ready_i is low, otherwise
15 it releases the output data and goes back to the Idle state.
19 from nmigen
import Elaboratable
, Signal
, Module
, Cat
20 from nmigen
.back
.pysim
import Simulator
21 from nmigen
.cli
import rtlil
22 from soc
.fu
.cr
.cr_input_record
import CompCROpSubset
30 class Shifter(Elaboratable
):
31 """Simple sequential shifter
34 * p.data_i.data: value to be shifted
35 * p.data_i.shift: shift amount
36 * When zero, no shift occurs.
37 * On POWER, range is 0 to 63 for 32-bit,
38 * and 0 to 127 for 64-bit.
39 * Other values wrap around.
40 * p.data_i.dir: shift direction (0 = left, 1 = right)
43 * n.data_o.data: shifted value
46 def __init__(self
, width
):
47 self
.data
= Signal(width
, name
="p_data_i")
48 self
.shift
= Signal(width
, name
="p_shift_i")
49 self
.dir = Signal(name
="p_dir_i")
50 self
.ctx
= Dummy() # comply with CompALU API
53 return [self
.data
, self
.shift
]
56 def __init__(self
, width
):
57 self
.data
= Signal(width
, name
="n_data_o")
63 def __init__(self
, width
):
64 self
.data_i
= Shifter
.PrevData(width
)
65 self
.valid_i
= Signal(name
="p_valid_i")
66 self
.ready_o
= Signal(name
="p_ready_o")
69 def __init__(self
, width
):
70 self
.data_o
= Shifter
.NextData(width
)
71 self
.valid_o
= Signal(name
="n_valid_o")
72 self
.ready_i
= Signal(name
="n_ready_i")
74 def __init__(self
, width
):
76 self
.p
= self
.PrevPort(width
)
77 self
.n
= self
.NextPort(width
)
79 # more pieces to make this example class comply with the CompALU API
80 self
.op
= CompCROpSubset()
81 self
.p
.data_i
.ctx
.op
= self
.op
82 self
.i
= self
.p
.data_i
._get
_data
()
83 self
.out
= self
.n
.data_o
._get
_data
()
85 def elaborate(self
, platform
):
89 # It is good practice to design a sequential circuit as
90 # a data path and a control path.
94 # The idea is to have a register that can be
95 # loaded or shifted (left and right).
102 shift_in
= Signal(self
.width
)
103 shift_left_by_1
= Signal(self
.width
)
104 shift_right_by_1
= Signal(self
.width
)
105 next_shift
= Signal(self
.width
)
107 shift_reg
= Signal(self
.width
, reset_less
=True)
108 # build the data flow
110 # connect input and output
111 shift_in
.eq(self
.p
.data_i
.data
),
112 self
.n
.data_o
.data
.eq(shift_reg
),
113 # generate shifted views of the register
114 shift_left_by_1
.eq(Cat(0, shift_reg
[:-1])),
115 shift_right_by_1
.eq(Cat(shift_reg
[1:], 0)),
117 # choose the next value of the register according to the
119 # default is no change
120 m
.d
.comb
+= next_shift
.eq(shift_reg
)
122 m
.d
.comb
+= next_shift
.eq(shift_in
)
124 with m
.If(direction
):
125 m
.d
.comb
+= next_shift
.eq(shift_right_by_1
)
127 m
.d
.comb
+= next_shift
.eq(shift_left_by_1
)
129 # register the next value
130 m
.d
.sync
+= shift_reg
.eq(next_shift
)
134 # The idea is to have a SHIFT state where the shift register
135 # is shifted every cycle, while a counter decrements.
136 # This counter is loaded with shift amount in the initial state.
137 # The SHIFT state is left when the counter goes to zero.
140 shift_width
= int(log2(self
.width
)) + 1
141 next_count
= Signal(shift_width
)
142 count
= Signal(shift_width
, reset_less
=True)
143 m
.d
.sync
+= count
.eq(next_count
)
146 with m
.State("IDLE"):
148 # keep p.ready_o active on IDLE
149 self
.p
.ready_o
.eq(1),
150 # keep loading the shift register and shift count
152 next_count
.eq(self
.p
.data_i
.shift
),
154 # capture the direction bit as well
155 m
.d
.sync
+= direction
.eq(self
.p
.data_i
.dir)
156 with m
.If(self
.p
.valid_i
):
157 # Leave IDLE when data arrives
158 with m
.If(next_count
== 0):
159 # short-circuit for zero shift
163 with m
.State("SHIFT"):
165 # keep shifting, while counter is not zero
167 # decrement the shift counter
168 next_count
.eq(count
- 1),
170 with m
.If(next_count
== 0):
171 # exit when shift counter goes to zero
173 with m
.State("DONE"):
174 # keep n.valid_o active while the data is not accepted
175 m
.d
.comb
+= self
.n
.valid_o
.eq(1)
176 with m
.If(self
.n
.ready_i
):
177 # go back to IDLE when the data is accepted
183 yield self
.p
.data_i
.data
184 yield self
.p
.data_i
.shift
185 yield self
.p
.data_i
.dir
190 yield self
.n
.data_o
.data
198 m
.submodules
.shf
= dut
= Shifter(8)
199 print("Shifter port names:")
201 print("-", port
.name
)
203 # try "proc; show" in yosys to check the data path
204 il
= rtlil
.convert(dut
, ports
=dut
.ports())
205 with
open("test_shifter.il", "w") as f
:
210 def send(data
, shift
, direction
):
211 # present input data and assert valid_i
212 yield dut
.p
.data_i
.data
.eq(data
)
213 yield dut
.p
.data_i
.shift
.eq(shift
)
214 yield dut
.p
.data_i
.dir.eq(direction
)
215 yield dut
.p
.valid_i
.eq(1)
217 # wait for p.ready_o to be asserted
218 while not (yield dut
.p
.ready_o
):
220 # clear input data and negate p.valid_i
221 yield dut
.p
.valid_i
.eq(0)
222 yield dut
.p
.data_i
.data
.eq(0)
223 yield dut
.p
.data_i
.shift
.eq(0)
224 yield dut
.p
.data_i
.dir.eq(0)
226 def receive(expected
):
227 # signal readiness to receive data
228 yield dut
.n
.ready_i
.eq(1)
230 # wait for n.valid_o to be asserted
231 while not (yield dut
.n
.valid_o
):
234 result
= yield dut
.n
.data_o
.data
236 yield dut
.n
.ready_i
.eq(0)
238 assert result
== expected
242 yield from send(13, 2, 1)
244 yield from send(3, 4, 0)
246 yield from send(21, 0, 0)
249 # the consumer is not in step with the producer, but the
250 # order of the results are preserved
252 yield from receive(3)
254 yield from receive(48)
256 yield from receive(21)
258 sim
.add_sync_process(producer
)
259 sim
.add_sync_process(consumer
)
260 sim_writer
= sim
.write_vcd(
269 if __name__
== "__main__":