2 from nmigen
.lib
.io
import pin_layout
3 from nmigen_soc
import wishbone
4 from nmigen_stdio
.serial
import AsyncSerial
, AsyncSerialTX
5 from nmigen
.back
.pysim
import *
9 __ALL__
= ["UARTBridge"]
11 class UARTBridge(Elaboratable
):
12 def __init__(self
, divisor
, pins
):
13 self
.bus
= wishbone
.Interface(addr_width
=30,
14 data_width
=32, granularity
=8)
16 self
._divisor
= divisor
18 def elaborate(self
, platform
):
21 m
.submodules
.serial
= serial
= AsyncSerial(divisor
=self
._divisor
, pins
=self
._pins
)
28 address
= Signal(address_width
)
29 data
= Signal(data_width
)
30 bytes_count
= Signal(range(data_width
//8))
31 words_count
= Signal(8)
34 self
.bus
.dat_w
.eq(data
),
35 self
.bus
.adr
.eq(address
),
39 with m
.State("Receive-Cmd"):
40 m
.d
.comb
+= serial
.rx
.ack
.eq(1)
44 bytes_count
.eq(data_width
//8-1),
48 with m
.If(serial
.rx
.rdy
):
49 m
.d
.sync
+= cmd
.eq(serial
.rx
.data
)
50 m
.next
= "Receive-Length"
52 with m
.State("Receive-Length"):
53 m
.d
.comb
+= serial
.rx
.ack
.eq(1)
55 with m
.If(serial
.rx
.rdy
):
56 m
.d
.sync
+= length
.eq(serial
.rx
.data
)
57 m
.next
= "Receive-Address"
59 with m
.State("Receive-Address"):
60 m
.d
.comb
+= serial
.rx
.ack
.eq(1)
62 with m
.If(serial
.rx
.rdy
):
64 address
.eq(Cat(serial
.rx
.data
, address
)),
65 bytes_count
.eq(bytes_count
-1),
68 with m
.If(bytes_count
== 0):
71 m
.next
= "Handle-Write"
73 m
.next
= "Handle-Read"
75 m
.next
= "Receive-Cmd"
77 with m
.State("Handle-Write"):
78 m
.d
.comb
+= serial
.rx
.ack
.eq(1)
80 with m
.If(serial
.rx
.rdy
):
82 data
.eq(Cat(serial
.rx
.data
, data
)),
83 bytes_count
.eq(bytes_count
-1),
85 with m
.If(bytes_count
== 0):
88 with m
.State("Write-Data"):
96 with m
.If(self
.bus
.ack
):
97 m
.next
= "Receive-Cmd"
100 with m
.State("Handle-Read"):
105 self
.bus
.sel
.eq(0xF),
108 with m
.If(self
.bus
.ack
):
110 bytes_count
.eq(data_width
//8-1),
111 data
.eq(self
.bus
.dat_r
),
115 with m
.State("Send-Data"):
116 m
.d
.comb
+= serial
.tx
.ack
.eq(1)
118 with m
.Switch(bytes_count
):
119 for i
in range(data_width
//8):
121 m
.d
.comb
+= serial
.tx
.data
.eq(data
[i
*8:(i
+1)*8])
123 with m
.If(serial
.tx
.rdy
):
124 m
.next
= "Send-Data-Wait"
126 with m
.State("Send-Data-Wait"):
127 with m
.If(serial
.tx
.rdy
):
129 bytes_count
.eq(bytes_count
-1),
132 with m
.If(bytes_count
== 0):
133 m
.next
= "Receive-Cmd"
139 def serial_write(serial
, val
):
140 while not (yield serial
.tx
.rdy
):
143 yield serial
.tx
.data
.eq(val
)
144 yield serial
.tx
.ack
.eq(1)
147 while (yield serial
.tx
.rdy
):
150 yield serial
.tx
.ack
.eq(0)
152 while not (yield serial
.tx
.rdy
):
157 def serial_read(serial
):
158 yield serial
.rx
.ack
.eq(1)
160 while not (yield serial
.rx
.rdy
):
163 data
= (yield serial
.rx
.data
)
164 yield serial
.rx
.ack
.eq(0)
166 while (yield serial
.rx
.rdy
):
171 class UARTBridgeTestCase(unittest
.TestCase
):
172 # Minimum 5, lowest makes the simulation faster
177 pins
= Record([("rx", pin_layout(1, dir="i")),
178 ("tx", pin_layout(1, dir="o"))])
179 dut
= UARTBridge(divisor
=self
.divisor
, pins
=pins
)
180 serial
= AsyncSerial(divisor
=self
.divisor
)
182 m
.submodules
.bridge
= dut
183 m
.submodules
.serial
= serial
185 pins
.rx
.i
.eq(serial
.tx
.o
),
186 serial
.rx
.i
.eq(pins
.tx
.o
),
191 yield from serial_write(serial
, 0x02)
195 yield from serial_write(serial
, 0x01)
198 # Send 0x4000 as address
199 yield from serial_write(serial
, 0x00)
201 yield from serial_write(serial
, 0x00)
203 yield from serial_write(serial
, 0x40)
205 yield from serial_write(serial
, 0x00)
208 # Handle wishbone request
210 while not (yield dut
.bus
.cyc
):
213 if timeout
> self
.timeout
:
214 raise RuntimeError("Simulation timed out")
216 # Ensure Wishbone address is the one we asked for
217 self
.assertEqual((yield dut
.bus
.adr
), 0x00004000)
218 self
.assertFalse((yield dut
.bus
.we
))
221 yield dut
.bus
.dat_r
.eq(0x0DEFACED)
222 yield dut
.bus
.ack
.eq(1)
225 # Check response on UART
226 rx
= yield from serial_read(serial
)
227 self
.assertEqual(rx
, 0x0D)
228 rx
= yield from serial_read(serial
)
229 self
.assertEqual(rx
, 0xEF)
230 rx
= yield from serial_read(serial
)
231 self
.assertEqual(rx
, 0xAC)
232 rx
= yield from serial_read(serial
)
233 self
.assertEqual(rx
, 0xED)
238 with sim
.write_vcd("test_uartbridge.vcd"):
240 sim
.add_sync_process(process
)
243 def test_write(self
):
244 pins
= Record([("rx", pin_layout(1, dir="i")),
245 ("tx", pin_layout(1, dir="o"))])
246 dut
= UARTBridge(divisor
=self
.divisor
, pins
=pins
)
247 serial
= AsyncSerial(divisor
=self
.divisor
)
249 m
.submodules
.bridge
= dut
250 m
.submodules
.serial
= serial
252 pins
.rx
.i
.eq(serial
.tx
.o
),
253 serial
.rx
.i
.eq(pins
.tx
.o
),
258 yield from serial_write(serial
, 0x01)
262 yield from serial_write(serial
, 0x01)
265 # Send 0x4000 as address
266 yield from serial_write(serial
, 0x00)
268 yield from serial_write(serial
, 0x00)
270 yield from serial_write(serial
, 0x40)
272 yield from serial_write(serial
, 0x00)
275 # Send 0xFEEDFACE as value
276 yield from serial_write(serial
, 0xFE)
278 yield from serial_write(serial
, 0xED)
280 yield from serial_write(serial
, 0xFA)
282 yield from serial_write(serial
, 0xCE)
284 # Handle wishbone request
286 while not (yield dut
.bus
.cyc
):
289 if timeout
> self
.timeout
:
290 raise RuntimeError("Simulation timed out")
292 # Ensure Wishbone address is the one we asked for
293 self
.assertEqual((yield dut
.bus
.adr
), 0x00004000)
294 self
.assertEqual((yield dut
.bus
.dat_w
), 0xFEEDFACE)
295 self
.assertTrue((yield dut
.bus
.we
))
298 yield dut
.bus
.ack
.eq(1)
302 with sim
.write_vcd("test_uartbridge.vcd"):
304 sim
.add_sync_process(process
)