3b0418a3582ac32a0df274b7dba3b370b191d59b
1 """Simple example of a FSM-based ALU
3 This demonstrates a design that follows the valid/ready protocol of the
4 ALU, but with a FSM implementation, instead of a pipeline. It is also
5 intended to comply with both the CompALU API and the nmutil Pipeline API
6 (Liskov Substitution Principle)
10 1) p.o_ready is asserted on the initial ("Idle") state, otherwise it keeps low.
11 2) n.o_valid is asserted on the final ("Done") state, otherwise it keeps low.
12 3) The FSM stays in the Idle state while p.i_valid is low, otherwise
13 it accepts the input data and moves on.
14 4) The FSM stays in the Done state while n.i_ready is low, otherwise
15 it releases the output data and goes back to the Idle state.
19 from nmigen
import Elaboratable
, Signal
, Module
, Cat
20 from nmigen
.cli
import rtlil
23 from nmutil
.iocontrol
import PrevControl
, NextControl
25 from soc
.fu
.base_input_record
import CompOpSubsetBase
27 from nmutil
.gtkw
import write_gtkw
28 from nmutil
.sim_tmp_alternative
import (Simulator
, is_engine_pysim
)
31 class CompFSMOpSubset(CompOpSubsetBase
):
32 def __init__(self
, name
=None):
33 layout
= (('sdir', 1),
36 super().__init
__(layout
, name
=name
)
43 class Shifter(Elaboratable
):
44 """Simple sequential shifter
47 * p.data_i.data: value to be shifted
48 * p.data_i.shift: shift amount
49 * When zero, no shift occurs.
50 * On POWER, range is 0 to 63 for 32-bit,
51 * and 0 to 127 for 64-bit.
52 * Other values wrap around.
55 * op.sdir: shift direction (0 = left, 1 = right)
58 * n.data_o.data: shifted value
61 def __init__(self
, width
):
62 self
.data
= Signal(width
, name
="p_data_i")
63 self
.shift
= Signal(width
, name
="p_shift_i")
64 self
.ctx
= Dummy() # comply with CompALU API
67 return [self
.data
, self
.shift
]
70 def __init__(self
, width
):
71 self
.data
= Signal(width
, name
="n_data_o")
76 def __init__(self
, width
):
78 self
.p
= PrevControl()
79 self
.n
= NextControl()
80 self
.p
.data_i
= Shifter
.PrevData(width
)
81 self
.n
.data_o
= Shifter
.NextData(width
)
83 # more pieces to make this example class comply with the CompALU API
84 self
.op
= CompFSMOpSubset(name
="op")
85 self
.p
.data_i
.ctx
.op
= self
.op
86 self
.i
= self
.p
.data_i
._get
_data
()
87 self
.out
= self
.n
.data_o
._get
_data
()
89 def elaborate(self
, platform
):
92 m
.submodules
.p
= self
.p
93 m
.submodules
.n
= self
.n
96 # It is good practice to design a sequential circuit as
97 # a data path and a control path.
101 # The idea is to have a register that can be
102 # loaded or shifted (left and right).
104 # the control signals
109 shift_in
= Signal(self
.width
)
110 shift_left_by_1
= Signal(self
.width
)
111 shift_right_by_1
= Signal(self
.width
)
112 next_shift
= Signal(self
.width
)
114 shift_reg
= Signal(self
.width
, reset_less
=True)
115 # build the data flow
117 # connect input and output
118 shift_in
.eq(self
.p
.data_i
.data
),
119 self
.n
.data_o
.data
.eq(shift_reg
),
120 # generate shifted views of the register
121 shift_left_by_1
.eq(Cat(0, shift_reg
[:-1])),
122 shift_right_by_1
.eq(Cat(shift_reg
[1:], 0)),
124 # choose the next value of the register according to the
126 # default is no change
127 m
.d
.comb
+= next_shift
.eq(shift_reg
)
129 m
.d
.comb
+= next_shift
.eq(shift_in
)
131 with m
.If(direction
):
132 m
.d
.comb
+= next_shift
.eq(shift_right_by_1
)
134 m
.d
.comb
+= next_shift
.eq(shift_left_by_1
)
136 # register the next value
137 m
.d
.sync
+= shift_reg
.eq(next_shift
)
141 # The idea is to have a SHIFT state where the shift register
142 # is shifted every cycle, while a counter decrements.
143 # This counter is loaded with shift amount in the initial state.
144 # The SHIFT state is left when the counter goes to zero.
147 shift_width
= int(log2(self
.width
)) + 1
148 next_count
= Signal(shift_width
)
149 count
= Signal(shift_width
, reset_less
=True)
150 m
.d
.sync
+= count
.eq(next_count
)
153 with m
.State("IDLE"):
155 # keep p.o_ready active on IDLE
156 self
.p
.o_ready
.eq(1),
157 # keep loading the shift register and shift count
159 next_count
.eq(self
.p
.data_i
.shift
),
161 # capture the direction bit as well
162 m
.d
.sync
+= direction
.eq(self
.op
.sdir
)
163 with m
.If(self
.p
.i_valid
):
164 # Leave IDLE when data arrives
165 with m
.If(next_count
== 0):
166 # short-circuit for zero shift
170 with m
.State("SHIFT"):
172 # keep shifting, while counter is not zero
174 # decrement the shift counter
175 next_count
.eq(count
- 1),
177 with m
.If(next_count
== 0):
178 # exit when shift counter goes to zero
180 with m
.State("DONE"):
181 # keep n.o_valid active while the data is not accepted
182 m
.d
.comb
+= self
.n
.o_valid
.eq(1)
183 with m
.If(self
.n
.i_ready
):
184 # go back to IDLE when the data is accepted
191 yield self
.p
.data_i
.data
192 yield self
.p
.data_i
.shift
197 yield self
.n
.data_o
.data
205 m
.submodules
.shf
= dut
= Shifter(8)
206 print("Shifter port names:")
208 print("-", port
.name
)
210 # try "proc; show" in yosys to check the data path
211 il
= rtlil
.convert(dut
, ports
=dut
.ports())
212 with
open("test_shifter.il", "w") as f
:
216 'in': {'color': 'orange'},
217 'out': {'color': 'yellow'},
222 {'comment': 'Shifter Demonstration'},
225 ('p_data_i[7:0]', 'in'),
226 ('p_shift_i[7:0]', 'in'),
227 ({'submodule': 'p'}, [
229 ('p_o_ready', 'out')])]),
231 'fsm_state' if is_engine_pysim() else 'fsm_state[1:0]',
235 ('n_data_o[7:0]', 'out'),
236 ({'submodule': 'n'}, [
237 ('n_o_valid', 'out'),
238 ('n_i_ready', 'in')])])]
240 write_gtkw("test_shifter.gtkw", "test_shifter.vcd",
241 gtkwave_desc
, gtkwave_style
,
242 module
='top.shf', loc
=__file__
, base
='dec')
247 def send(data
, shift
, direction
):
248 # present input data and assert i_valid
249 yield dut
.p
.data_i
.data
.eq(data
)
250 yield dut
.p
.data_i
.shift
.eq(shift
)
251 yield dut
.op
.sdir
.eq(direction
)
252 yield dut
.p
.i_valid
.eq(1)
254 # wait for p.o_ready to be asserted
255 while not (yield dut
.p
.o_ready
):
257 # clear input data and negate p.i_valid
258 yield dut
.p
.i_valid
.eq(0)
259 yield dut
.p
.data_i
.data
.eq(0)
260 yield dut
.p
.data_i
.shift
.eq(0)
261 yield dut
.op
.sdir
.eq(0)
263 def receive(expected
):
264 # signal readiness to receive data
265 yield dut
.n
.i_ready
.eq(1)
267 # wait for n.o_valid to be asserted
268 while not (yield dut
.n
.o_valid
):
271 result
= yield dut
.n
.data_o
.data
273 yield dut
.n
.i_ready
.eq(0)
275 assert result
== expected
279 yield from send(13, 2, 1)
281 yield from send(3, 4, 0)
283 yield from send(21, 0, 0)
286 # the consumer is not in step with the producer, but the
287 # order of the results are preserved
289 yield from receive(3)
291 yield from receive(48)
293 yield from receive(21)
295 sim
.add_sync_process(producer
)
296 sim
.add_sync_process(consumer
)
297 sim_writer
= sim
.write_vcd("test_shifter.vcd")
302 if __name__
== "__main__":