Implement control path and unit test.
[soc.git] / src / soc / experiment / alu_fsm.py
1 """Simple example of a FSM-based ALU
2
3 This demonstrates a design that follows the valid/ready protocol of the
4 ALU, but with a FSM implementation, instead of a pipeline. It is also
5 intended to comply with both the CompALU API and the nmutil Pipeline API
6 (Liskov Substitution Principle)
7
8 The basic rules are:
9
10 1) p.ready_o is asserted on the initial ("Idle") state, otherwise it keeps low.
11 2) n.valid_o is asserted on the final ("Done") state, otherwise it keeps low.
12 3) The FSM stays in the Idle state while p.valid_i is low, otherwise
13 it accepts the input data and moves on.
14 4) The FSM stays in the Done state while n.ready_i is low, otherwise
15 it releases the output data and goes back to the Idle state.
16
17 """
18
19 from nmigen import Elaboratable, Signal, Module, Cat
20 from nmigen.back.pysim import Simulator
21 from nmigen.cli import rtlil
22 from soc.fu.cr.cr_input_record import CompCROpSubset
23 from math import log2
24
25
26 class Dummy:
27 pass
28
29
30 class Shifter(Elaboratable):
31 """Simple sequential shifter
32
33 Prev port data:
34 * p.data_i.data: value to be shifted
35 * p.data_i.shift: shift amount
36 * When zero, no shift occurs.
37 * On POWER, range is 0 to 63 for 32-bit,
38 * and 0 to 127 for 64-bit.
39 * Other values wrap around.
40 * p.data_i.dir: shift direction (0 = left, 1 = right)
41
42 Next port data:
43 * n.data_o.data: shifted value
44 """
45 class PrevData:
46 def __init__(self, width):
47 self.data = Signal(width, name="p_data_i")
48 self.shift = Signal(width, name="p_shift_i")
49 self.dir = Signal(name="p_dir_i")
50 self.ctx = Dummy() # comply with CompALU API
51
52 def _get_data(self):
53 return [self.data, self.shift]
54
55 class NextData:
56 def __init__(self, width):
57 self.data = Signal(width, name="n_data_o")
58
59 def _get_data(self):
60 return [self.data]
61
62 class PrevPort:
63 def __init__(self, width):
64 self.data_i = Shifter.PrevData(width)
65 self.valid_i = Signal(name="p_valid_i")
66 self.ready_o = Signal(name="p_ready_o")
67
68 class NextPort:
69 def __init__(self, width):
70 self.data_o = Shifter.NextData(width)
71 self.valid_o = Signal(name="n_valid_o")
72 self.ready_i = Signal(name="n_ready_i")
73
74 def __init__(self, width):
75 self.width = width
76 self.p = self.PrevPort(width)
77 self.n = self.NextPort(width)
78
79 # more pieces to make this example class comply with the CompALU API
80 self.op = CompCROpSubset()
81 self.p.data_i.ctx.op = self.op
82 self.i = self.p.data_i._get_data()
83 self.out = self.n.data_o._get_data()
84
85 def elaborate(self, platform):
86 m = Module()
87
88 # Note:
89 # It is good practice to design a sequential circuit as
90 # a data path and a control path.
91
92 # Data path
93 # ---------
94 # The idea is to have a register that can be
95 # loaded or shifted (left and right).
96
97 # the control signals
98 load = Signal()
99 shift = Signal()
100 direction = Signal()
101 # the data flow
102 shift_in = Signal(self.width)
103 shift_left_by_1 = Signal(self.width)
104 shift_right_by_1 = Signal(self.width)
105 next_shift = Signal(self.width)
106 # the register
107 shift_reg = Signal(self.width, reset_less=True)
108 # build the data flow
109 m.d.comb += [
110 # connect input and output
111 shift_in.eq(self.p.data_i.data),
112 self.n.data_o.data.eq(shift_reg),
113 # generate shifted views of the register
114 shift_left_by_1.eq(Cat(0, shift_reg[:-1])),
115 shift_right_by_1.eq(Cat(shift_reg[1:], 0)),
116 ]
117 # choose the next value of the register according to the
118 # control signals
119 # default is no change
120 m.d.comb += next_shift.eq(shift_reg)
121 with m.If(load):
122 m.d.comb += next_shift.eq(shift_in)
123 with m.Elif(shift):
124 with m.If(direction):
125 m.d.comb += next_shift.eq(shift_right_by_1)
126 with m.Else():
127 m.d.comb += next_shift.eq(shift_left_by_1)
128
129 # register the next value
130 m.d.sync += shift_reg.eq(next_shift)
131
132 # Control path
133 # ------------
134 # The idea is to have a SHIFT state where the shift register
135 # is shifted every cycle, while a counter decrements.
136 # This counter is loaded with shift amount in the initial state.
137 # The SHIFT state is left when the counter goes to zero.
138
139 # Shift counter
140 shift_width = int(log2(self.width)) + 1
141 next_count = Signal(shift_width)
142 count = Signal(shift_width, reset_less=True)
143 m.d.sync += count.eq(next_count)
144
145 with m.FSM():
146 with m.State("IDLE"):
147 m.d.comb += [
148 # keep p.ready_o active on IDLE
149 self.p.ready_o.eq(1),
150 # keep loading the shift register and shift count
151 load.eq(1),
152 next_count.eq(self.p.data_i.shift),
153 ]
154 # capture the direction bit as well
155 m.d.sync += direction.eq(self.p.data_i.dir)
156 with m.If(self.p.valid_i):
157 # Leave IDLE when data arrives
158 with m.If(next_count == 0):
159 # short-circuit for zero shift
160 m.next = "DONE"
161 with m.Else():
162 m.next = "SHIFT"
163 with m.State("SHIFT"):
164 m.d.comb += [
165 # keep shifting, while counter is not zero
166 shift.eq(1),
167 # decrement the shift counter
168 next_count.eq(count - 1),
169 ]
170 with m.If(next_count == 0):
171 # exit when shift counter goes to zero
172 m.next = "DONE"
173 with m.State("DONE"):
174 # keep n.valid_o active while the data is not accepted
175 m.d.comb += self.n.valid_o.eq(1)
176 with m.If(self.n.ready_i):
177 # go back to IDLE when the data is accepted
178 m.next = "IDLE"
179
180 return m
181
182 def __iter__(self):
183 yield self.p.data_i.data
184 yield self.p.data_i.shift
185 yield self.p.data_i.dir
186 yield self.p.valid_i
187 yield self.p.ready_o
188 yield self.n.ready_i
189 yield self.n.valid_o
190 yield self.n.data_o.data
191
192 def ports(self):
193 return list(self)
194
195
196 def test_shifter():
197 m = Module()
198 m.submodules.shf = dut = Shifter(8)
199 print("Shifter port names:")
200 for port in dut:
201 print("-", port.name)
202 # generate RTLIL
203 # try "proc; show" in yosys to check the data path
204 il = rtlil.convert(dut, ports=dut.ports())
205 with open("test_shifter.il", "w") as f:
206 f.write(il)
207 sim = Simulator(m)
208 sim.add_clock(1e-6)
209
210 def send(data, shift, direction):
211 # present input data and assert valid_i
212 yield dut.p.data_i.data.eq(data)
213 yield dut.p.data_i.shift.eq(shift)
214 yield dut.p.data_i.dir.eq(direction)
215 yield dut.p.valid_i.eq(1)
216 yield
217 # wait for p.ready_o to be asserted
218 while not (yield dut.p.ready_o):
219 yield
220 # clear input data and negate p.valid_i
221 yield dut.p.valid_i.eq(0)
222 yield dut.p.data_i.data.eq(0)
223 yield dut.p.data_i.shift.eq(0)
224 yield dut.p.data_i.dir.eq(0)
225
226 def receive(expected):
227 # signal readiness to receive data
228 yield dut.n.ready_i.eq(1)
229 yield
230 # wait for n.valid_o to be asserted
231 while not (yield dut.n.valid_o):
232 yield
233 # read result
234 result = yield dut.n.data_o.data
235 # negate n.ready_i
236 yield dut.n.ready_i.eq(0)
237 # check result
238 assert result == expected
239
240 def producer():
241 # 13 >> 2
242 yield from send(13, 2, 1)
243 # 3 << 4
244 yield from send(3, 4, 0)
245 # 21 << 0
246 yield from send(21, 0, 0)
247
248 def consumer():
249 # the consumer is not in step with the producer, but the
250 # order of the results are preserved
251 # 13 >> 2 = 3
252 yield from receive(3)
253 # 3 << 4 = 48
254 yield from receive(48)
255 # 21 << 0 = 21
256 yield from receive(21)
257
258 sim.add_sync_process(producer)
259 sim.add_sync_process(consumer)
260 sim_writer = sim.write_vcd(
261 "test_shifter.vcd",
262 "test_shifter.gtkw",
263 traces=dut.ports()
264 )
265 with sim_writer:
266 sim.run()
267
268
269 if __name__ == "__main__":
270 test_shifter()