from migen.bank.description import *
from migen.bank.eventmanager import *
from migen.genlib.record import Record
-from migen.sim.generic import Simulator, TopLevel
-from migen.sim import icarus
+from migen.flow.actor import Sink, Source
-class UART(Module, AutoCSR):
- def __init__(self, pads, clk_freq, baud=115200):
- self._r_rxtx = CSR(8)
-
- self.submodules.ev = EventManager()
- self.ev.tx = EventSourceProcess()
- self.ev.rx = EventSourcePulse()
- self.ev.finalize()
-
- # Tuning word value
- tuning_word=int((baud/clk_freq)*2**32)
- self._r_tuning_word = CSRStorage(32, reset=tuning_word)
+class UARTRX(Module):
+ def __init__(self, pads, tuning_word):
+ self.source = Source([("d", 8)])
###
uart_clk_rxen = Signal()
- uart_clk_txen = Signal()
phase_accumulator_rx = Signal(32)
- phase_accumulator_tx = Signal(32)
- pads.tx.reset = 1
-
- # TX
- tx_reg = Signal(8)
- tx_bitcount = Signal(4)
- tx_busy = self.ev.tx.trigger
- self.sync += [
- If(self._r_rxtx.re,
- tx_reg.eq(self._r_rxtx.r),
- tx_bitcount.eq(0),
- tx_busy.eq(1),
- pads.tx.eq(0)
- ).Elif(uart_clk_txen & tx_busy,
- tx_bitcount.eq(tx_bitcount + 1),
- If(tx_bitcount == 8,
- pads.tx.eq(1)
- ).Elif(tx_bitcount == 9,
- pads.tx.eq(1),
- tx_busy.eq(0)
- ).Else(
- pads.tx.eq(tx_reg[0]),
- tx_reg.eq(Cat(tx_reg[1:], 0))
- )
- )
- ]
-
- # RX
rx = Signal()
self.specials += MultiReg(pads.rx, rx)
rx_r = Signal()
rx_reg = Signal(8)
rx_bitcount = Signal(4)
rx_busy = Signal()
- rx_done = self.ev.rx.trigger
- rx_data = self._r_rxtx.w
+ rx_done = self.source.stb
+ rx_data = self.source.payload.d
self.sync += [
rx_done.eq(0),
rx_r.eq(rx),
)
)
]
-
-
- self.sync += [
+ self.sync += \
If(rx_busy,
- Cat(phase_accumulator_rx, uart_clk_rxen).eq(phase_accumulator_rx + self._r_tuning_word.storage)
+ Cat(phase_accumulator_rx, uart_clk_rxen).eq(phase_accumulator_rx + tuning_word)
).Else(
Cat(phase_accumulator_rx, uart_clk_rxen).eq(2**31)
- ),
+ )
+
+class UARTTX(Module):
+ def __init__(self, pads, tuning_word):
+ self.sink = Sink([("d", 8)])
+
+ ###
+
+ uart_clk_txen = Signal()
+ phase_accumulator_tx = Signal(32)
+
+ pads.tx.reset = 1
+ tx_reg = Signal(8)
+ tx_bitcount = Signal(4)
+ tx_busy = Signal()
+ self.sync += [
+ self.sink.ack.eq(0),
+ If(self.sink.stb & ~tx_busy & ~self.sink.ack,
+ tx_reg.eq(self.sink.payload.d),
+ tx_bitcount.eq(0),
+ tx_busy.eq(1),
+ pads.tx.eq(0)
+ ).Elif(uart_clk_txen & tx_busy,
+ tx_bitcount.eq(tx_bitcount + 1),
+ If(tx_bitcount == 8,
+ pads.tx.eq(1)
+ ).Elif(tx_bitcount == 9,
+ pads.tx.eq(1),
+ tx_busy.eq(0),
+ self.sink.ack.eq(1),
+ ).Else(
+ pads.tx.eq(tx_reg[0]),
+ tx_reg.eq(Cat(tx_reg[1:], 0))
+ )
+ )
+ ]
+ self.sync += [
If(tx_busy,
- Cat(phase_accumulator_tx, uart_clk_txen).eq(phase_accumulator_tx + self._r_tuning_word.storage)
+ Cat(phase_accumulator_tx, uart_clk_txen).eq(phase_accumulator_tx + tuning_word)
).Else(
Cat(phase_accumulator_tx, uart_clk_txen).eq(0)
)
- ]
+ ]
+
+class UART(Module, AutoCSR):
+ def __init__(self, pads, clk_freq, baud=115200):
+ self._r_rxtx = CSR(8)
+
+ self.submodules.ev = EventManager()
+ self.ev.tx = EventSourcePulse()
+ self.ev.rx = EventSourcePulse()
+ self.ev.finalize()
+
+ # Tuning word value
+ self._tuning_word = CSRStorage(32, reset=int((baud/clk_freq)*2**32))
+ tuning_word = self._tuning_word.storage
+
+ ###
+
+ self.submodules.rx = UARTRX(pads, tuning_word)
+ self.submodules.tx = UARTTX(pads, tuning_word)
+
+ self.sync += [
+ If(self._r_rxtx.re,
+ self.tx.sink.stb.eq(1),
+ self.tx.sink.payload.d.eq(self._r_rxtx.r),
+ ).Elif(self.tx.sink.ack,
+ self.tx.sink.stb.eq(0)
+ ),
+ If(self.rx.source.stb,
+ self._r_rxtx.w.eq(self.rx.source.d)
+ )
+ ]
+ self.comb += [
+ self.ev.tx.trigger.eq(self.tx.sink.stb & self.tx.sink.ack),
+ self.ev.rx.trigger.eq(self.rx.source.stb) #self.rx.source.ack supposed to be always 1
+ ]
class UARTTB(Module):
def __init__(self):
tx_string = "01234"
print("Sending string: " + tx_string)
for c in tx_string:
- while selfp.slave.ev.tx.trigger == 1:
- yield
selfp.slave._r_rxtx.r = ord(c)
selfp.slave._r_rxtx.re = 1
yield
print("FAILURE: sent decimal value "+str(val)+" (char "+chr(val)+") instead of "+c)
else:
print("SUCCESS: sent "+c)
+ while selfp.slave.ev.tx.trigger != 1:
+ yield
# Then receive a character
while True:
yield
-
if __name__ == "__main__":
+ from migen.sim.generic import Simulator, TopLevel
+ from migen.sim import icarus
with Simulator(UARTTB(), TopLevel("top.vcd", clk_period=int(1/0.08333333)), icarus.Runner(keep_files=False)) as s:
s.run(20000)