2 from nmigen
.lib
.io
import pin_layout
3 from nmigen_soc
import wishbone
4 from nmigen_stdio
.serial
import AsyncSerial
, AsyncSerialTX
5 from nmigen
.back
.pysim
import *
9 __ALL__
= ["UARTBridge"]
11 class UARTBridge(Elaboratable
):
12 def __init__(self
, divisor
, pins
):
13 self
.bus
= wishbone
.Interface(addr_width
=30,
14 data_width
=32, granularity
=32)
16 self
._divisor
= divisor
18 def elaborate(self
, platform
):
21 m
.submodules
.serial
= serial
= AsyncSerial(divisor
=self
._divisor
, pins
=self
._pins
)
28 address
= Signal(address_width
)
29 data
= Signal(data_width
)
30 bytes_count
= Signal(range(data_width
//8))
31 words_count
= Signal(8)
34 self
.bus
.dat_w
.eq(data
),
35 self
.bus
.adr
.eq(address
),
39 with m
.State("Receive-Cmd"):
40 m
.d
.comb
+= serial
.rx
.ack
.eq(1)
44 bytes_count
.eq(data_width
//8-1),
48 with m
.If(serial
.rx
.rdy
):
49 m
.d
.sync
+= cmd
.eq(serial
.rx
.data
)
50 m
.next
= "Receive-Length"
52 with m
.State("Receive-Length"):
53 m
.d
.comb
+= serial
.rx
.ack
.eq(1)
55 with m
.If(serial
.rx
.rdy
):
56 m
.d
.sync
+= length
.eq(serial
.rx
.data
)
57 m
.next
= "Receive-Address"
59 with m
.State("Receive-Address"):
60 m
.d
.comb
+= serial
.rx
.ack
.eq(1)
62 with m
.If(serial
.rx
.rdy
):
64 address
.eq(Cat(serial
.rx
.data
, address
)),
65 bytes_count
.eq(bytes_count
-1),
68 with m
.If(bytes_count
== 0):
71 m
.next
= "Handle-Write"
73 m
.next
= "Handle-Read"
75 m
.next
= "Receive-Cmd"
77 with m
.State("Handle-Write"):
78 m
.d
.comb
+= serial
.rx
.ack
.eq(1)
80 with m
.If(serial
.rx
.rdy
):
82 data
.eq(Cat(serial
.rx
.data
, data
)),
83 bytes_count
.eq(bytes_count
-1),
85 with m
.If(bytes_count
== 0):
88 with m
.State("Write-Data"):
96 with m
.If(self
.bus
.ack
):
97 m
.next
= "Receive-Cmd"
100 with m
.State("Handle-Read"):
107 with m
.If(self
.bus
.ack
):
109 bytes_count
.eq(data_width
//8-1),
110 data
.eq(self
.bus
.dat_r
),
114 with m
.State("Send-Data"):
115 m
.d
.comb
+= serial
.tx
.ack
.eq(1)
117 with m
.Switch(bytes_count
):
118 for i
in range(data_width
//8):
120 m
.d
.comb
+= serial
.tx
.data
.eq(data
[i
*8:(i
+1)*8])
122 with m
.If(serial
.tx
.rdy
):
123 m
.next
= "Send-Data-Wait"
125 with m
.State("Send-Data-Wait"):
126 with m
.If(serial
.tx
.rdy
):
128 bytes_count
.eq(bytes_count
-1),
131 with m
.If(bytes_count
== 0):
132 m
.next
= "Receive-Cmd"
138 def serial_write(serial
, val
):
139 while not (yield serial
.tx
.rdy
):
142 yield serial
.tx
.data
.eq(val
)
143 yield serial
.tx
.ack
.eq(1)
146 while (yield serial
.tx
.rdy
):
149 yield serial
.tx
.ack
.eq(0)
151 while not (yield serial
.tx
.rdy
):
156 def serial_read(serial
):
157 yield serial
.rx
.ack
.eq(1)
159 while not (yield serial
.rx
.rdy
):
162 data
= (yield serial
.rx
.data
)
163 yield serial
.rx
.ack
.eq(0)
165 while (yield serial
.rx
.rdy
):
170 class UARTBridgeTestCase(unittest
.TestCase
):
171 # Minimum 5, lowest makes the simulation faster
176 pins
= Record([("rx", pin_layout(1, dir="i")),
177 ("tx", pin_layout(1, dir="o"))])
178 dut
= UARTBridge(divisor
=self
.divisor
, pins
=pins
)
179 serial
= AsyncSerial(divisor
=self
.divisor
)
181 m
.submodules
.bridge
= dut
182 m
.submodules
.serial
= serial
184 pins
.rx
.i
.eq(serial
.tx
.o
),
185 serial
.rx
.i
.eq(pins
.tx
.o
),
190 yield from serial_write(serial
, 0x02)
194 yield from serial_write(serial
, 0x01)
197 # Send 0x4000 as address
198 yield from serial_write(serial
, 0x00)
200 yield from serial_write(serial
, 0x00)
202 yield from serial_write(serial
, 0x40)
204 yield from serial_write(serial
, 0x00)
207 # Handle wishbone request
209 while not (yield dut
.bus
.cyc
):
212 if timeout
> self
.timeout
:
213 raise RuntimeError("Simulation timed out")
215 # Ensure Wishbone address is the one we asked for
216 self
.assertEqual((yield dut
.bus
.adr
), 0x00004000)
217 self
.assertFalse((yield dut
.bus
.we
))
220 yield dut
.bus
.dat_r
.eq(0x0DEFACED)
221 yield dut
.bus
.ack
.eq(1)
224 # Check response on UART
225 rx
= yield from serial_read(serial
)
226 self
.assertEqual(rx
, 0x0D)
227 rx
= yield from serial_read(serial
)
228 self
.assertEqual(rx
, 0xEF)
229 rx
= yield from serial_read(serial
)
230 self
.assertEqual(rx
, 0xAC)
231 rx
= yield from serial_read(serial
)
232 self
.assertEqual(rx
, 0xED)
237 with sim
.write_vcd("test_uartbridge.vcd"):
239 sim
.add_sync_process(process
)
242 def test_write(self
):
243 pins
= Record([("rx", pin_layout(1, dir="i")),
244 ("tx", pin_layout(1, dir="o"))])
245 dut
= UARTBridge(divisor
=self
.divisor
, pins
=pins
)
246 serial
= AsyncSerial(divisor
=self
.divisor
)
248 m
.submodules
.bridge
= dut
249 m
.submodules
.serial
= serial
251 pins
.rx
.i
.eq(serial
.tx
.o
),
252 serial
.rx
.i
.eq(pins
.tx
.o
),
257 yield from serial_write(serial
, 0x01)
261 yield from serial_write(serial
, 0x01)
264 # Send 0x4000 as address
265 yield from serial_write(serial
, 0x00)
267 yield from serial_write(serial
, 0x00)
269 yield from serial_write(serial
, 0x40)
271 yield from serial_write(serial
, 0x00)
274 # Send 0xFEEDFACE as value
275 yield from serial_write(serial
, 0xFE)
277 yield from serial_write(serial
, 0xED)
279 yield from serial_write(serial
, 0xFA)
281 yield from serial_write(serial
, 0xCE)
283 # Handle wishbone request
285 while not (yield dut
.bus
.cyc
):
288 if timeout
> self
.timeout
:
289 raise RuntimeError("Simulation timed out")
291 # Ensure Wishbone address is the one we asked for
292 self
.assertEqual((yield dut
.bus
.adr
), 0x00004000)
293 self
.assertEqual((yield dut
.bus
.dat_w
), 0xFEEDFACE)
294 self
.assertTrue((yield dut
.bus
.we
))
297 yield dut
.bus
.ack
.eq(1)
301 with sim
.write_vcd("test_uartbridge.vcd"):
303 sim
.add_sync_process(process
)