2 from migen
.genlib
.record
import Record
3 from migen
.genlib
.cdc
import MultiReg
5 from litex
.soc
.interconnect
.csr
import *
6 from litex
.soc
.interconnect
.csr_eventmanager
import *
7 from litex
.soc
.interconnect
import stream
8 from litex
.soc
.interconnect
.wishbonebridge
import WishboneStreamingBridge
11 class RS232PHYInterface
:
13 self
.sink
= stream
.Endpoint([("data", 8)])
14 self
.source
= stream
.Endpoint([("data", 8)])
17 class RS232PHYRX(Module
):
18 def __init__(self
, pads
, tuning_word
):
19 self
.source
= stream
.Endpoint([("data", 8)])
23 uart_clk_rxen
= Signal()
24 phase_accumulator_rx
= Signal(32)
27 self
.specials
+= MultiReg(pads
.rx
, rx
)
30 rx_bitcount
= Signal(4)
32 rx_done
= self
.source
.valid
33 rx_data
= self
.source
.data
38 If(~rx
& rx_r
, # look for start bit
44 rx_bitcount
.eq(rx_bitcount
+ 1),
46 If(rx
, # verify start bit
49 ).Elif(rx_bitcount
== 9,
51 If(rx
, # verify stop bit
56 rx_reg
.eq(Cat(rx_reg
[1:], rx
))
63 Cat(phase_accumulator_rx
, uart_clk_rxen
).eq(phase_accumulator_rx
+ tuning_word
)
65 Cat(phase_accumulator_rx
, uart_clk_rxen
).eq(2**31)
69 class RS232PHYTX(Module
):
70 def __init__(self
, pads
, tuning_word
):
71 self
.sink
= stream
.Endpoint([("data", 8)])
75 uart_clk_txen
= Signal()
76 phase_accumulator_tx
= Signal(32)
81 tx_bitcount
= Signal(4)
84 self
.sink
.ready
.eq(0),
85 If(self
.sink
.valid
& ~tx_busy
& ~self
.sink
.ready
,
86 tx_reg
.eq(self
.sink
.data
),
90 ).Elif(uart_clk_txen
& tx_busy
,
91 tx_bitcount
.eq(tx_bitcount
+ 1),
94 ).Elif(tx_bitcount
== 9,
97 self
.sink
.ready
.eq(1),
99 pads
.tx
.eq(tx_reg
[0]),
100 tx_reg
.eq(Cat(tx_reg
[1:], 0))
106 Cat(phase_accumulator_tx
, uart_clk_txen
).eq(phase_accumulator_tx
+ tuning_word
)
108 Cat(phase_accumulator_tx
, uart_clk_txen
).eq(0)
113 class RS232PHY(Module
, AutoCSR
):
114 def __init__(self
, pads
, clk_freq
, baudrate
=115200):
115 self
._tuning
_word
= CSRStorage(32, reset
=int((baudrate
/clk_freq
)*2**32))
116 self
.submodules
.tx
= RS232PHYTX(pads
, self
._tuning
_word
.storage
)
117 self
.submodules
.rx
= RS232PHYRX(pads
, self
._tuning
_word
.storage
)
118 self
.sink
, self
.source
= self
.tx
.sink
, self
.rx
.source
121 class RS232PHYMultiplexer(Module
):
122 def __init__(self
, phys
, phy
):
123 self
.sel
= Signal(max=len(phys
))
128 for n
in range(len(phys
)):
129 # don't stall uarts when not selected
130 self
.comb
+= phys
[n
].sink
.ready
.eq(1)
131 # connect core to phy
133 phy
.source
.connect(phys
[n
].source
),
134 phys
[n
].sink
.connect(phy
.sink
)
136 self
.comb
+= Case(self
.sel
, cases
)
139 class RS232PHYModel(Module
):
140 def __init__(self
, pads
):
141 self
.sink
= stream
.Endpoint([("data", 8)])
142 self
.source
= stream
.Endpoint([("data", 8)])
145 pads
.source_valid
.eq(self
.sink
.valid
),
146 pads
.source_data
.eq(self
.sink
.data
),
147 self
.sink
.ready
.eq(pads
.source_ready
),
149 self
.source
.valid
.eq(pads
.sink_valid
),
150 self
.source
.data
.eq(pads
.sink_data
),
151 pads
.sink_ready
.eq(self
.source
.ready
)
155 def _get_uart_fifo(depth
, sink_cd
="sys", source_cd
="sys"):
156 if sink_cd
!= source_cd
:
157 fifo
= stream
.AsyncFIFO([("data", 8)], depth
)
158 return ClockDomainsRenamer({"write": sink_cd
, "read": source_cd
})(fifo
)
160 return stream
.SyncFIFO([("data", 8)], depth
)
163 class UART(Module
, AutoCSR
):
164 def __init__(self
, phy
,
169 self
._txfull
= CSRStatus()
170 self
._rxempty
= CSRStatus()
172 self
.submodules
.ev
= EventManager()
173 self
.ev
.tx
= EventSourceProcess()
174 self
.ev
.rx
= EventSourceProcess()
180 tx_fifo
= _get_uart_fifo(tx_fifo_depth
, source_cd
=phy_cd
)
181 self
.submodules
+= tx_fifo
184 tx_fifo
.sink
.valid
.eq(self
._rxtx
.re
),
185 tx_fifo
.sink
.data
.eq(self
._rxtx
.r
),
186 self
._txfull
.status
.eq(~tx_fifo
.sink
.ready
),
187 tx_fifo
.source
.connect(phy
.sink
),
188 # Generate TX IRQ when tx_fifo becomes non-full
189 self
.ev
.tx
.trigger
.eq(~tx_fifo
.sink
.ready
)
193 rx_fifo
= _get_uart_fifo(rx_fifo_depth
, sink_cd
=phy_cd
)
194 self
.submodules
+= rx_fifo
197 phy
.source
.connect(rx_fifo
.sink
),
198 self
._rxempty
.status
.eq(~rx_fifo
.source
.valid
),
199 self
._rxtx
.w
.eq(rx_fifo
.source
.data
),
200 rx_fifo
.source
.ready
.eq(self
.ev
.rx
.clear
),
201 # Generate RX IRQ when tx_fifo becomes non-empty
202 self
.ev
.rx
.trigger
.eq(~rx_fifo
.source
.valid
)
206 class UARTStub(Module
, AutoCSR
):
209 self
._txfull
= CSRStatus()
210 self
._rxempty
= CSRStatus()
212 self
.submodules
.ev
= EventManager()
213 self
.ev
.tx
= EventSourceProcess()
214 self
.ev
.rx
= EventSourceProcess()
220 self
._txfull
.status
.eq(0),
221 self
.ev
.tx
.trigger
.eq(~
(self
._rxtx
.re
& self
._rxtx
.r
)),
222 self
._rxempty
.status
.eq(1)
226 class UARTWishboneBridge(WishboneStreamingBridge
):
227 def __init__(self
, pads
, clk_freq
, baudrate
=115200):
228 self
.submodules
.phy
= RS232PHY(pads
, clk_freq
, baudrate
)
229 WishboneStreamingBridge
.__init
__(self
, self
.phy
, clk_freq
)
233 return Record([("tx", 1), ("rx", 1)])
236 class UARTMultiplexer(Module
):
237 def __init__(self
, uarts
, uart
):
238 self
.sel
= Signal(max=len(uarts
))
243 for n
in range(len(uarts
)):
245 uart
.tx
.eq(uarts
[n
].tx
),
246 uarts
[n
].rx
.eq(uart
.rx
)
248 self
.comb
+= Case(self
.sel
, cases
)