2 from nmigen
.lib
.io
import pin_layout
3 from nmigen_soc
import wishbone
4 from nmigen_stdio
.serial
import AsyncSerial
, AsyncSerialTX
5 from nmigen
.back
.pysim
import *
7 from lambdasoc
.periph
import Peripheral
11 __ALL__
= ["UARTBridge"]
13 class UARTBridge(Elaboratable
):
14 def __init__(self
, divisor
, pins
):
15 self
.bus
= wishbone
.Interface(addr_width
=30,
16 data_width
=32, granularity
=32)
18 self
._divisor
= divisor
20 def elaborate(self
, platform
):
23 m
.submodules
.serial
= serial
= AsyncSerial(divisor
=self
._divisor
, pins
=self
._pins
)
30 address
= Signal(address_width
)
31 data
= Signal(data_width
)
32 bytes_count
= Signal(range(data_width
//8))
33 words_count
= Signal(8)
36 self
.bus
.dat_w
.eq(data
),
37 self
.bus
.adr
.eq(address
),
41 with m
.State("Receive-Cmd"):
42 m
.d
.comb
+= serial
.rx
.ack
.eq(1)
46 bytes_count
.eq(data_width
//8-1),
50 with m
.If(serial
.rx
.rdy
):
51 m
.d
.sync
+= cmd
.eq(serial
.rx
.data
)
52 m
.next
= "Receive-Length"
54 with m
.State("Receive-Length"):
55 m
.d
.comb
+= serial
.rx
.ack
.eq(1)
57 with m
.If(serial
.rx
.rdy
):
58 m
.d
.sync
+= length
.eq(serial
.rx
.data
)
59 m
.next
= "Receive-Address"
61 with m
.State("Receive-Address"):
62 m
.d
.comb
+= serial
.rx
.ack
.eq(1)
64 with m
.If(serial
.rx
.rdy
):
66 address
.eq(Cat(serial
.rx
.data
, address
)),
67 bytes_count
.eq(bytes_count
-1),
70 with m
.If(bytes_count
== 0):
73 m
.next
= "Handle-Write"
75 m
.next
= "Handle-Read"
77 m
.next
= "Receive-Cmd"
79 with m
.State("Handle-Write"):
80 m
.d
.comb
+= serial
.rx
.ack
.eq(1)
82 with m
.If(serial
.rx
.rdy
):
84 data
.eq(Cat(serial
.rx
.data
, data
)),
85 bytes_count
.eq(bytes_count
-1),
87 with m
.If(bytes_count
== 0):
90 with m
.State("Write-Data"):
98 with m
.If(self
.bus
.ack
):
99 m
.next
= "Receive-Cmd"
102 with m
.State("Handle-Read"):
109 with m
.If(self
.bus
.ack
):
111 bytes_count
.eq(data_width
//8-1),
112 data
.eq(self
.bus
.dat_r
),
116 with m
.State("Send-Data"):
117 m
.d
.comb
+= serial
.tx
.ack
.eq(1)
119 with m
.Switch(bytes_count
):
120 for i
in range(data_width
//8):
122 m
.d
.comb
+= serial
.tx
.data
.eq(data
[i
*8:(i
+1)*8])
124 with m
.If(serial
.tx
.rdy
):
125 m
.next
= "Send-Data-Wait"
127 with m
.State("Send-Data-Wait"):
128 with m
.If(serial
.tx
.rdy
):
130 bytes_count
.eq(bytes_count
-1),
133 with m
.If(bytes_count
== 0):
134 m
.next
= "Receive-Cmd"
140 def serial_write(serial
, val
):
141 while not (yield serial
.tx
.rdy
):
144 yield serial
.tx
.data
.eq(val
)
145 yield serial
.tx
.ack
.eq(1)
148 while (yield serial
.tx
.rdy
):
151 yield serial
.tx
.ack
.eq(0)
153 while not (yield serial
.tx
.rdy
):
158 def serial_read(serial
):
159 yield serial
.rx
.ack
.eq(1)
161 while not (yield serial
.rx
.rdy
):
164 data
= (yield serial
.rx
.data
)
165 yield serial
.rx
.ack
.eq(0)
167 while (yield serial
.rx
.rdy
):
172 class UARTBridgeTestCase(unittest
.TestCase
):
173 # Minimum 5, lowest makes the simulation faster
178 pins
= Record([("rx", pin_layout(1, dir="i")),
179 ("tx", pin_layout(1, dir="o"))])
180 dut
= UARTBridge(divisor
=self
.divisor
, pins
=pins
)
181 serial
= AsyncSerial(divisor
=self
.divisor
)
183 m
.submodules
.bridge
= dut
184 m
.submodules
.serial
= serial
186 pins
.rx
.i
.eq(serial
.tx
.o
),
187 serial
.rx
.i
.eq(pins
.tx
.o
),
192 yield from serial_write(serial
, 0x02)
196 yield from serial_write(serial
, 0x01)
199 # Send 0x4000 as address
200 yield from serial_write(serial
, 0x00)
202 yield from serial_write(serial
, 0x00)
204 yield from serial_write(serial
, 0x40)
206 yield from serial_write(serial
, 0x00)
209 # Handle wishbone request
211 while not (yield dut
.bus
.cyc
):
214 if timeout
> self
.timeout
:
215 raise RuntimeError("Simulation timed out")
217 # Ensure Wishbone address is the one we asked for
218 self
.assertEqual((yield dut
.bus
.adr
), 0x00004000)
219 self
.assertFalse((yield dut
.bus
.we
))
222 yield dut
.bus
.dat_r
.eq(0x0DEFACED)
223 yield dut
.bus
.ack
.eq(1)
226 # Check response on UART
227 rx
= yield from serial_read(serial
)
228 self
.assertEqual(rx
, 0x0D)
229 rx
= yield from serial_read(serial
)
230 self
.assertEqual(rx
, 0xEF)
231 rx
= yield from serial_read(serial
)
232 self
.assertEqual(rx
, 0xAC)
233 rx
= yield from serial_read(serial
)
234 self
.assertEqual(rx
, 0xED)
239 with sim
.write_vcd("test_uartbridge.vcd"):
241 sim
.add_sync_process(process
)
244 def test_write(self
):
245 pins
= Record([("rx", pin_layout(1, dir="i")),
246 ("tx", pin_layout(1, dir="o"))])
247 dut
= UARTBridge(divisor
=self
.divisor
, pins
=pins
)
248 serial
= AsyncSerial(divisor
=self
.divisor
)
250 m
.submodules
.bridge
= dut
251 m
.submodules
.serial
= serial
253 pins
.rx
.i
.eq(serial
.tx
.o
),
254 serial
.rx
.i
.eq(pins
.tx
.o
),
259 yield from serial_write(serial
, 0x01)
263 yield from serial_write(serial
, 0x01)
266 # Send 0x4000 as address
267 yield from serial_write(serial
, 0x00)
269 yield from serial_write(serial
, 0x00)
271 yield from serial_write(serial
, 0x40)
273 yield from serial_write(serial
, 0x00)
276 # Send 0xFEEDFACE as value
277 yield from serial_write(serial
, 0xFE)
279 yield from serial_write(serial
, 0xED)
281 yield from serial_write(serial
, 0xFA)
283 yield from serial_write(serial
, 0xCE)
285 # Handle wishbone request
287 while not (yield dut
.bus
.cyc
):
290 if timeout
> self
.timeout
:
291 raise RuntimeError("Simulation timed out")
293 # Ensure Wishbone address is the one we asked for
294 self
.assertEqual((yield dut
.bus
.adr
), 0x00004000)
295 self
.assertEqual((yield dut
.bus
.dat_w
), 0xFEEDFACE)
296 self
.assertTrue((yield dut
.bus
.we
))
299 yield dut
.bus
.ack
.eq(1)
303 with sim
.write_vcd("test_uartbridge.vcd"):
305 sim
.add_sync_process(process
)