Remove useless signal
[gram.git] / examples / uartbridge.py
1 from nmigen import *
2 from nmigen.lib.io import pin_layout
3 from nmigen_soc import wishbone
4 from nmigen_stdio.serial import AsyncSerial, AsyncSerialTX
5 from nmigen.back.pysim import *
6
7 import unittest
8
9 __ALL__ = ["UARTBridge"]
10
11 class UARTBridge(Elaboratable):
12 def __init__(self, divisor, pins):
13 self.bus = wishbone.Interface(addr_width=30,
14 data_width=32, granularity=32)
15 self._pins = pins
16 self._divisor = divisor
17
18 def elaborate(self, platform):
19 m = Module()
20
21 m.submodules.serial = serial = AsyncSerial(divisor=self._divisor, pins=self._pins)
22
23 address_width = 32
24 data_width = 32
25
26 cmd = Signal(8)
27 length = Signal(8)
28 address = Signal(address_width)
29 data = Signal(data_width)
30 bytes_count = Signal(range(data_width//8))
31 words_count = Signal(8)
32
33 m.d.comb += [
34 self.bus.dat_w.eq(data),
35 self.bus.adr.eq(address),
36 ]
37
38 with m.FSM():
39 with m.State("Receive-Cmd"):
40 m.d.comb += serial.rx.ack.eq(1)
41
42 # Reset registers
43 m.d.sync += [
44 bytes_count.eq(data_width//8-1),
45 words_count.eq(0),
46 ]
47
48 with m.If(serial.rx.rdy):
49 m.d.sync += cmd.eq(serial.rx.data)
50 m.next = "Receive-Length"
51
52 with m.State("Receive-Length"):
53 m.d.comb += serial.rx.ack.eq(1)
54
55 with m.If(serial.rx.rdy):
56 m.d.sync += length.eq(serial.rx.data)
57 m.next = "Receive-Address"
58
59 with m.State("Receive-Address"):
60 m.d.comb += serial.rx.ack.eq(1)
61
62 with m.If(serial.rx.rdy):
63 m.d.sync += [
64 address.eq(Cat(serial.rx.data, address)),
65 bytes_count.eq(bytes_count-1),
66 ]
67
68 with m.If(bytes_count == 0):
69 with m.Switch(cmd):
70 with m.Case(0x01):
71 m.next = "Handle-Write"
72 with m.Case(0x02):
73 m.next = "Handle-Read"
74 with m.Case():
75 m.next = "Receive-Cmd"
76
77 with m.State("Handle-Write"):
78 m.d.comb += serial.rx.ack.eq(1)
79
80 with m.If(serial.rx.rdy):
81 m.d.sync += [
82 data.eq(Cat(serial.rx.data, data)),
83 bytes_count.eq(bytes_count-1),
84 ]
85 with m.If(bytes_count == 0):
86 m.next = "Write-Data"
87
88 with m.State("Write-Data"):
89 m.d.comb += [
90 self.bus.stb.eq(1),
91 self.bus.we.eq(1),
92 self.bus.cyc.eq(1),
93 self.bus.sel.eq(0xF),
94 ]
95
96 with m.If(self.bus.ack):
97 m.next = "Receive-Cmd"
98
99
100 with m.State("Handle-Read"):
101 m.d.comb += [
102 self.bus.stb.eq(1),
103 self.bus.we.eq(0),
104 self.bus.cyc.eq(1),
105 ]
106
107 with m.If(self.bus.ack):
108 m.d.sync += [
109 bytes_count.eq(data_width//8-1),
110 data.eq(self.bus.dat_r),
111 ]
112 m.next = "Send-Data"
113
114 with m.State("Send-Data"):
115 m.d.comb += serial.tx.ack.eq(1)
116
117 with m.Switch(bytes_count):
118 for i in range(data_width//8):
119 with m.Case(i):
120 m.d.comb += serial.tx.data.eq(data[i*8:(i+1)*8])
121
122 with m.If(serial.tx.rdy):
123 m.next = "Send-Data-Wait"
124
125 with m.State("Send-Data-Wait"):
126 with m.If(serial.tx.rdy):
127 m.d.sync += [
128 bytes_count.eq(bytes_count-1),
129 ]
130
131 with m.If(bytes_count == 0):
132 m.next = "Receive-Cmd"
133 with m.Else():
134 m.next = "Send-Data"
135
136 return m
137
138 def serial_write(serial, val):
139 while not (yield serial.tx.rdy):
140 yield
141
142 yield serial.tx.data.eq(val)
143 yield serial.tx.ack.eq(1)
144 yield
145
146 while (yield serial.tx.rdy):
147 yield
148
149 yield serial.tx.ack.eq(0)
150
151 while not (yield serial.tx.rdy):
152 yield
153
154 yield
155
156 def serial_read(serial):
157 yield serial.rx.ack.eq(1)
158
159 while not (yield serial.rx.rdy):
160 yield
161
162 data = (yield serial.rx.data)
163 yield serial.rx.ack.eq(0)
164
165 while (yield serial.rx.rdy):
166 yield
167
168 return data
169
170 class UARTBridgeTestCase(unittest.TestCase):
171 # Minimum 5, lowest makes the simulation faster
172 divisor = 5
173 timeout = 10000
174
175 def test_read(self):
176 pins = Record([("rx", pin_layout(1, dir="i")),
177 ("tx", pin_layout(1, dir="o"))])
178 dut = UARTBridge(divisor=self.divisor, pins=pins)
179 serial = AsyncSerial(divisor=self.divisor)
180 m = Module()
181 m.submodules.bridge = dut
182 m.submodules.serial = serial
183 m.d.comb += [
184 pins.rx.i.eq(serial.tx.o),
185 serial.rx.i.eq(pins.tx.o),
186 ]
187
188 def process():
189 # Send read command
190 yield from serial_write(serial, 0x02)
191 yield
192
193 # Length = 1
194 yield from serial_write(serial, 0x01)
195 yield
196
197 # Send 0x4000 as address
198 yield from serial_write(serial, 0x00)
199 yield
200 yield from serial_write(serial, 0x00)
201 yield
202 yield from serial_write(serial, 0x40)
203 yield
204 yield from serial_write(serial, 0x00)
205 yield
206
207 # Handle wishbone request
208 timeout = 0
209 while not (yield dut.bus.cyc):
210 yield
211 timeout += 1
212 if timeout > self.timeout:
213 raise RuntimeError("Simulation timed out")
214
215 # Ensure Wishbone address is the one we asked for
216 self.assertEqual((yield dut.bus.adr), 0x00004000)
217 self.assertFalse((yield dut.bus.we))
218
219 # Answer
220 yield dut.bus.dat_r.eq(0x0DEFACED)
221 yield dut.bus.ack.eq(1)
222 yield
223
224 # Check response on UART
225 rx = yield from serial_read(serial)
226 self.assertEqual(rx, 0x0D)
227 rx = yield from serial_read(serial)
228 self.assertEqual(rx, 0xEF)
229 rx = yield from serial_read(serial)
230 self.assertEqual(rx, 0xAC)
231 rx = yield from serial_read(serial)
232 self.assertEqual(rx, 0xED)
233
234 yield
235
236 sim = Simulator(m)
237 with sim.write_vcd("test_uartbridge.vcd"):
238 sim.add_clock(1e-6)
239 sim.add_sync_process(process)
240 sim.run()
241
242 def test_write(self):
243 pins = Record([("rx", pin_layout(1, dir="i")),
244 ("tx", pin_layout(1, dir="o"))])
245 dut = UARTBridge(divisor=self.divisor, pins=pins)
246 serial = AsyncSerial(divisor=self.divisor)
247 m = Module()
248 m.submodules.bridge = dut
249 m.submodules.serial = serial
250 m.d.comb += [
251 pins.rx.i.eq(serial.tx.o),
252 serial.rx.i.eq(pins.tx.o),
253 ]
254
255 def process():
256 # Send write command
257 yield from serial_write(serial, 0x01)
258 yield
259
260 # Length = 1
261 yield from serial_write(serial, 0x01)
262 yield
263
264 # Send 0x4000 as address
265 yield from serial_write(serial, 0x00)
266 yield
267 yield from serial_write(serial, 0x00)
268 yield
269 yield from serial_write(serial, 0x40)
270 yield
271 yield from serial_write(serial, 0x00)
272 yield
273
274 # Send 0xFEEDFACE as value
275 yield from serial_write(serial, 0xFE)
276 yield
277 yield from serial_write(serial, 0xED)
278 yield
279 yield from serial_write(serial, 0xFA)
280 yield
281 yield from serial_write(serial, 0xCE)
282
283 # Handle wishbone request
284 timeout = 0
285 while not (yield dut.bus.cyc):
286 yield
287 timeout += 1
288 if timeout > self.timeout:
289 raise RuntimeError("Simulation timed out")
290
291 # Ensure Wishbone address is the one we asked for
292 self.assertEqual((yield dut.bus.adr), 0x00004000)
293 self.assertEqual((yield dut.bus.dat_w), 0xFEEDFACE)
294 self.assertTrue((yield dut.bus.we))
295
296 # Answer
297 yield dut.bus.ack.eq(1)
298 yield
299
300 sim = Simulator(m)
301 with sim.write_vcd("test_uartbridge.vcd"):
302 sim.add_clock(1e-6)
303 sim.add_sync_process(process)
304 sim.run()