Add example code for headless SoC
[gram.git] / examples / uartbridge.py
1 from nmigen import *
2 from nmigen.lib.io import pin_layout
3 from nmigen_soc import wishbone
4 from nmigen_stdio.serial import AsyncSerial, AsyncSerialTX
5 from nmigen.back.pysim import *
6
7 from lambdasoc.periph import Peripheral
8
9 import unittest
10
11 __ALL__ = ["UARTBridge"]
12
13 class UARTBridge(Elaboratable):
14 def __init__(self, divisor, pins):
15 self.bus = wishbone.Interface(addr_width=30,
16 data_width=32, granularity=32)
17 self._pins = pins
18 self._divisor = divisor
19
20 def elaborate(self, platform):
21 m = Module()
22
23 m.submodules.serial = serial = AsyncSerial(divisor=self._divisor, pins=self._pins)
24
25 address_width = 32
26 data_width = 32
27
28 cmd = Signal(8)
29 length = Signal(8)
30 address = Signal(address_width)
31 data = Signal(data_width)
32 bytes_count = Signal(range(data_width//8))
33 words_count = Signal(8)
34
35 m.d.comb += [
36 self.bus.dat_w.eq(data),
37 self.bus.adr.eq(address),
38 ]
39
40 with m.FSM():
41 with m.State("Receive-Cmd"):
42 m.d.comb += serial.rx.ack.eq(1)
43
44 # Reset registers
45 m.d.sync += [
46 bytes_count.eq(data_width//8-1),
47 words_count.eq(0),
48 ]
49
50 with m.If(serial.rx.rdy):
51 m.d.sync += cmd.eq(serial.rx.data)
52 m.next = "Receive-Length"
53
54 with m.State("Receive-Length"):
55 m.d.comb += serial.rx.ack.eq(1)
56
57 with m.If(serial.rx.rdy):
58 m.d.sync += length.eq(serial.rx.data)
59 m.next = "Receive-Address"
60
61 with m.State("Receive-Address"):
62 m.d.comb += serial.rx.ack.eq(1)
63
64 with m.If(serial.rx.rdy):
65 m.d.sync += [
66 address.eq(Cat(serial.rx.data, address)),
67 bytes_count.eq(bytes_count-1),
68 ]
69
70 with m.If(bytes_count == 0):
71 with m.Switch(cmd):
72 with m.Case(0x01):
73 m.next = "Handle-Write"
74 with m.Case(0x02):
75 m.next = "Handle-Read"
76 with m.Case():
77 m.next = "Receive-Cmd"
78
79 with m.State("Handle-Write"):
80 m.d.comb += serial.rx.ack.eq(1)
81
82 with m.If(serial.rx.rdy):
83 m.d.sync += [
84 data.eq(Cat(serial.rx.data, data)),
85 bytes_count.eq(bytes_count-1),
86 ]
87 with m.If(bytes_count == 0):
88 m.next = "Write-Data"
89
90 with m.State("Write-Data"):
91 m.d.comb += [
92 self.bus.stb.eq(1),
93 self.bus.we.eq(1),
94 self.bus.cyc.eq(1),
95 self.bus.sel.eq(1),
96 ]
97
98 with m.If(self.bus.ack):
99 m.next = "Receive-Cmd"
100
101
102 with m.State("Handle-Read"):
103 m.d.comb += [
104 self.bus.stb.eq(1),
105 self.bus.we.eq(0),
106 self.bus.cyc.eq(1),
107 ]
108
109 with m.If(self.bus.ack):
110 m.d.sync += [
111 bytes_count.eq(data_width//8-1),
112 data.eq(self.bus.dat_r),
113 ]
114 m.next = "Send-Data"
115
116 with m.State("Send-Data"):
117 m.d.comb += serial.tx.ack.eq(1)
118
119 with m.Switch(bytes_count):
120 for i in range(data_width//8):
121 with m.Case(i):
122 m.d.comb += serial.tx.data.eq(data[i*8:(i+1)*8])
123
124 with m.If(serial.tx.rdy):
125 m.next = "Send-Data-Wait"
126
127 with m.State("Send-Data-Wait"):
128 with m.If(serial.tx.rdy):
129 m.d.sync += [
130 bytes_count.eq(bytes_count-1),
131 ]
132
133 with m.If(bytes_count == 0):
134 m.next = "Receive-Cmd"
135 with m.Else():
136 m.next = "Send-Data"
137
138 return m
139
140 def serial_write(serial, val):
141 while not (yield serial.tx.rdy):
142 yield
143
144 yield serial.tx.data.eq(val)
145 yield serial.tx.ack.eq(1)
146 yield
147
148 while (yield serial.tx.rdy):
149 yield
150
151 yield serial.tx.ack.eq(0)
152
153 while not (yield serial.tx.rdy):
154 yield
155
156 yield
157
158 def serial_read(serial):
159 yield serial.rx.ack.eq(1)
160
161 while not (yield serial.rx.rdy):
162 yield
163
164 data = (yield serial.rx.data)
165 yield serial.rx.ack.eq(0)
166
167 while (yield serial.rx.rdy):
168 yield
169
170 return data
171
172 class UARTBridgeTestCase(unittest.TestCase):
173 # Minimum 5, lowest makes the simulation faster
174 divisor = 5
175 timeout = 10000
176
177 def test_read(self):
178 pins = Record([("rx", pin_layout(1, dir="i")),
179 ("tx", pin_layout(1, dir="o"))])
180 dut = UARTBridge(divisor=self.divisor, pins=pins)
181 serial = AsyncSerial(divisor=self.divisor)
182 m = Module()
183 m.submodules.bridge = dut
184 m.submodules.serial = serial
185 m.d.comb += [
186 pins.rx.i.eq(serial.tx.o),
187 serial.rx.i.eq(pins.tx.o),
188 ]
189
190 def process():
191 # Send read command
192 yield from serial_write(serial, 0x02)
193 yield
194
195 # Length = 1
196 yield from serial_write(serial, 0x01)
197 yield
198
199 # Send 0x4000 as address
200 yield from serial_write(serial, 0x00)
201 yield
202 yield from serial_write(serial, 0x00)
203 yield
204 yield from serial_write(serial, 0x40)
205 yield
206 yield from serial_write(serial, 0x00)
207 yield
208
209 # Handle wishbone request
210 timeout = 0
211 while not (yield dut.bus.cyc):
212 yield
213 timeout += 1
214 if timeout > self.timeout:
215 raise RuntimeError("Simulation timed out")
216
217 # Ensure Wishbone address is the one we asked for
218 self.assertEqual((yield dut.bus.adr), 0x00004000)
219 self.assertFalse((yield dut.bus.we))
220
221 # Answer
222 yield dut.bus.dat_r.eq(0x0DEFACED)
223 yield dut.bus.ack.eq(1)
224 yield
225
226 # Check response on UART
227 rx = yield from serial_read(serial)
228 self.assertEqual(rx, 0x0D)
229 rx = yield from serial_read(serial)
230 self.assertEqual(rx, 0xEF)
231 rx = yield from serial_read(serial)
232 self.assertEqual(rx, 0xAC)
233 rx = yield from serial_read(serial)
234 self.assertEqual(rx, 0xED)
235
236 yield
237
238 sim = Simulator(m)
239 with sim.write_vcd("test_uartbridge.vcd"):
240 sim.add_clock(1e-6)
241 sim.add_sync_process(process)
242 sim.run()
243
244 def test_write(self):
245 pins = Record([("rx", pin_layout(1, dir="i")),
246 ("tx", pin_layout(1, dir="o"))])
247 dut = UARTBridge(divisor=self.divisor, pins=pins)
248 serial = AsyncSerial(divisor=self.divisor)
249 m = Module()
250 m.submodules.bridge = dut
251 m.submodules.serial = serial
252 m.d.comb += [
253 pins.rx.i.eq(serial.tx.o),
254 serial.rx.i.eq(pins.tx.o),
255 ]
256
257 def process():
258 # Send write command
259 yield from serial_write(serial, 0x01)
260 yield
261
262 # Length = 1
263 yield from serial_write(serial, 0x01)
264 yield
265
266 # Send 0x4000 as address
267 yield from serial_write(serial, 0x00)
268 yield
269 yield from serial_write(serial, 0x00)
270 yield
271 yield from serial_write(serial, 0x40)
272 yield
273 yield from serial_write(serial, 0x00)
274 yield
275
276 # Send 0xFEEDFACE as value
277 yield from serial_write(serial, 0xFE)
278 yield
279 yield from serial_write(serial, 0xED)
280 yield
281 yield from serial_write(serial, 0xFA)
282 yield
283 yield from serial_write(serial, 0xCE)
284
285 # Handle wishbone request
286 timeout = 0
287 while not (yield dut.bus.cyc):
288 yield
289 timeout += 1
290 if timeout > self.timeout:
291 raise RuntimeError("Simulation timed out")
292
293 # Ensure Wishbone address is the one we asked for
294 self.assertEqual((yield dut.bus.adr), 0x00004000)
295 self.assertEqual((yield dut.bus.dat_w), 0xFEEDFACE)
296 self.assertTrue((yield dut.bus.we))
297
298 # Answer
299 yield dut.bus.ack.eq(1)
300 yield
301
302 sim = Simulator(m)
303 with sim.write_vcd("test_uartbridge.vcd"):
304 sim.add_clock(1e-6)
305 sim.add_sync_process(process)
306 sim.run()