examples: Load stock calibration profile if calibration failed
[gram.git] / examples / uartbridge.py
1 from nmigen import *
2 from nmigen.lib.io import pin_layout
3 from nmigen_soc import wishbone
4 from nmigen_stdio.serial import AsyncSerial, AsyncSerialTX
5 from nmigen.back.pysim import *
6
7 import unittest
8
9 __ALL__ = ["UARTBridge"]
10
11 class UARTBridge(Elaboratable):
12 def __init__(self, divisor, pins):
13 self.bus = wishbone.Interface(addr_width=30,
14 data_width=32, granularity=8)
15 self._pins = pins
16 self._divisor = divisor
17
18 def elaborate(self, platform):
19 m = Module()
20
21 m.submodules.serial = serial = AsyncSerial(divisor=self._divisor, pins=self._pins)
22
23 address_width = 32
24 data_width = 32
25
26 cmd = Signal(8)
27 length = Signal(8)
28 address = Signal(address_width)
29 data = Signal(data_width)
30 bytes_count = Signal(range(data_width//8))
31 words_count = Signal(8)
32
33 m.d.comb += [
34 self.bus.dat_w.eq(data),
35 self.bus.adr.eq(address),
36 ]
37
38 with m.FSM():
39 with m.State("Receive-Cmd"):
40 m.d.comb += serial.rx.ack.eq(1)
41
42 # Reset registers
43 m.d.sync += [
44 bytes_count.eq(data_width//8-1),
45 words_count.eq(0),
46 ]
47
48 with m.If(serial.rx.rdy):
49 m.d.sync += cmd.eq(serial.rx.data)
50 m.next = "Receive-Length"
51
52 with m.State("Receive-Length"):
53 m.d.comb += serial.rx.ack.eq(1)
54
55 with m.If(serial.rx.rdy):
56 m.d.sync += length.eq(serial.rx.data)
57 m.next = "Receive-Address"
58
59 with m.State("Receive-Address"):
60 m.d.comb += serial.rx.ack.eq(1)
61
62 with m.If(serial.rx.rdy):
63 m.d.sync += [
64 address.eq(Cat(serial.rx.data, address)),
65 bytes_count.eq(bytes_count-1),
66 ]
67
68 with m.If(bytes_count == 0):
69 with m.Switch(cmd):
70 with m.Case(0x01):
71 m.next = "Handle-Write"
72 with m.Case(0x02):
73 m.next = "Handle-Read"
74 with m.Case():
75 m.next = "Receive-Cmd"
76
77 with m.State("Handle-Write"):
78 m.d.comb += serial.rx.ack.eq(1)
79
80 with m.If(serial.rx.rdy):
81 m.d.sync += [
82 data.eq(Cat(serial.rx.data, data)),
83 bytes_count.eq(bytes_count-1),
84 ]
85 with m.If(bytes_count == 0):
86 m.next = "Write-Data"
87
88 with m.State("Write-Data"):
89 m.d.comb += [
90 self.bus.stb.eq(1),
91 self.bus.we.eq(1),
92 self.bus.cyc.eq(1),
93 self.bus.sel.eq(0xF),
94 ]
95
96 with m.If(self.bus.ack):
97 m.next = "Receive-Cmd"
98
99
100 with m.State("Handle-Read"):
101 m.d.comb += [
102 self.bus.stb.eq(1),
103 self.bus.we.eq(0),
104 self.bus.cyc.eq(1),
105 self.bus.sel.eq(0xF),
106 ]
107
108 with m.If(self.bus.ack):
109 m.d.sync += [
110 bytes_count.eq(data_width//8-1),
111 data.eq(self.bus.dat_r),
112 ]
113 m.next = "Send-Data"
114
115 with m.State("Send-Data"):
116 m.d.comb += serial.tx.ack.eq(1)
117
118 with m.Switch(bytes_count):
119 for i in range(data_width//8):
120 with m.Case(i):
121 m.d.comb += serial.tx.data.eq(data[i*8:(i+1)*8])
122
123 with m.If(serial.tx.rdy):
124 m.next = "Send-Data-Wait"
125
126 with m.State("Send-Data-Wait"):
127 with m.If(serial.tx.rdy):
128 m.d.sync += [
129 bytes_count.eq(bytes_count-1),
130 ]
131
132 with m.If(bytes_count == 0):
133 m.next = "Receive-Cmd"
134 with m.Else():
135 m.next = "Send-Data"
136
137 return m
138
139 def serial_write(serial, val):
140 while not (yield serial.tx.rdy):
141 yield
142
143 yield serial.tx.data.eq(val)
144 yield serial.tx.ack.eq(1)
145 yield
146
147 while (yield serial.tx.rdy):
148 yield
149
150 yield serial.tx.ack.eq(0)
151
152 while not (yield serial.tx.rdy):
153 yield
154
155 yield
156
157 def serial_read(serial):
158 yield serial.rx.ack.eq(1)
159
160 while not (yield serial.rx.rdy):
161 yield
162
163 data = (yield serial.rx.data)
164 yield serial.rx.ack.eq(0)
165
166 while (yield serial.rx.rdy):
167 yield
168
169 return data
170
171 class UARTBridgeTestCase(unittest.TestCase):
172 # Minimum 5, lowest makes the simulation faster
173 divisor = 5
174 timeout = 10000
175
176 def test_read(self):
177 pins = Record([("rx", pin_layout(1, dir="i")),
178 ("tx", pin_layout(1, dir="o"))])
179 dut = UARTBridge(divisor=self.divisor, pins=pins)
180 serial = AsyncSerial(divisor=self.divisor)
181 m = Module()
182 m.submodules.bridge = dut
183 m.submodules.serial = serial
184 m.d.comb += [
185 pins.rx.i.eq(serial.tx.o),
186 serial.rx.i.eq(pins.tx.o),
187 ]
188
189 def process():
190 # Send read command
191 yield from serial_write(serial, 0x02)
192 yield
193
194 # Length = 1
195 yield from serial_write(serial, 0x01)
196 yield
197
198 # Send 0x4000 as address
199 yield from serial_write(serial, 0x00)
200 yield
201 yield from serial_write(serial, 0x00)
202 yield
203 yield from serial_write(serial, 0x40)
204 yield
205 yield from serial_write(serial, 0x00)
206 yield
207
208 # Handle wishbone request
209 timeout = 0
210 while not (yield dut.bus.cyc):
211 yield
212 timeout += 1
213 if timeout > self.timeout:
214 raise RuntimeError("Simulation timed out")
215
216 # Ensure Wishbone address is the one we asked for
217 self.assertEqual((yield dut.bus.adr), 0x00004000)
218 self.assertFalse((yield dut.bus.we))
219
220 # Answer
221 yield dut.bus.dat_r.eq(0x0DEFACED)
222 yield dut.bus.ack.eq(1)
223 yield
224
225 # Check response on UART
226 rx = yield from serial_read(serial)
227 self.assertEqual(rx, 0x0D)
228 rx = yield from serial_read(serial)
229 self.assertEqual(rx, 0xEF)
230 rx = yield from serial_read(serial)
231 self.assertEqual(rx, 0xAC)
232 rx = yield from serial_read(serial)
233 self.assertEqual(rx, 0xED)
234
235 yield
236
237 sim = Simulator(m)
238 with sim.write_vcd("test_uartbridge.vcd"):
239 sim.add_clock(1e-6)
240 sim.add_sync_process(process)
241 sim.run()
242
243 def test_write(self):
244 pins = Record([("rx", pin_layout(1, dir="i")),
245 ("tx", pin_layout(1, dir="o"))])
246 dut = UARTBridge(divisor=self.divisor, pins=pins)
247 serial = AsyncSerial(divisor=self.divisor)
248 m = Module()
249 m.submodules.bridge = dut
250 m.submodules.serial = serial
251 m.d.comb += [
252 pins.rx.i.eq(serial.tx.o),
253 serial.rx.i.eq(pins.tx.o),
254 ]
255
256 def process():
257 # Send write command
258 yield from serial_write(serial, 0x01)
259 yield
260
261 # Length = 1
262 yield from serial_write(serial, 0x01)
263 yield
264
265 # Send 0x4000 as address
266 yield from serial_write(serial, 0x00)
267 yield
268 yield from serial_write(serial, 0x00)
269 yield
270 yield from serial_write(serial, 0x40)
271 yield
272 yield from serial_write(serial, 0x00)
273 yield
274
275 # Send 0xFEEDFACE as value
276 yield from serial_write(serial, 0xFE)
277 yield
278 yield from serial_write(serial, 0xED)
279 yield
280 yield from serial_write(serial, 0xFA)
281 yield
282 yield from serial_write(serial, 0xCE)
283
284 # Handle wishbone request
285 timeout = 0
286 while not (yield dut.bus.cyc):
287 yield
288 timeout += 1
289 if timeout > self.timeout:
290 raise RuntimeError("Simulation timed out")
291
292 # Ensure Wishbone address is the one we asked for
293 self.assertEqual((yield dut.bus.adr), 0x00004000)
294 self.assertEqual((yield dut.bus.dat_w), 0xFEEDFACE)
295 self.assertTrue((yield dut.bus.we))
296
297 # Answer
298 yield dut.bus.ack.eq(1)
299 yield
300
301 sim = Simulator(m)
302 with sim.write_vcd("test_uartbridge.vcd"):
303 sim.add_clock(1e-6)
304 sim.add_sync_process(process)
305 sim.run()