--- /dev/null
+from liteeth.common import *
+from liteeth.generic import *
+
+class LiteEthTTYTX(Module):
+ def __init__(self, ip_address, udp_port, fifo_depth=None):
+ self.sink = sink = Sink(eth_tty_description(8))
+ self.source = source = Source(eth_udp_user_description(8))
+ ###
+ if fifo_depth is None:
+ self.comb += [
+ source.stb.eq(sink.stb),
+ source.sop.eq(1),
+ source.eop.eq(1),
+ source.length.eq(1),
+ source.data.eq(sink.data),
+ sink.ack.eq(source.ack)
+ ]
+ else:
+ self.submodules.fifo = fifo = SyncFIFO([("data", 8)], fifo_depth)
+ self.comb += Record.connect(sink, fifo.sink)
+
+ self.submodules.level = level = FlipFlop(max=fifo_depth)
+ self.comb += level.d.eq(fifo.fifo.level)
+
+ self.submodules.counter = counter = Counter(max=fifo_depth)
+
+ self.submodules.fsm = fsm = FSM(reset_state="IDLE")
+ fsm.act("IDLE",
+ If(fifo.source.stb,
+ level.ce.eq(1),
+ counter.reset.eq(1),
+ NextState("SEND")
+ )
+ )
+ fsm.act("SEND",
+ source.stb.eq(fifo.source.stb),
+ source.sop.eq(counter.value == 0),
+ If(level.q == 0,
+ source.eop.eq(1),
+ ).Else(
+ source.eop.eq(counter.value == (level.q-1)),
+ ),
+ source.src_port.eq(udp_port),
+ source.dst_port.eq(udp_port),
+ source.ip_address.eq(ip_address),
+ If(level.q == 0,
+ source.length.eq(1),
+ ).Else(
+ source.length.eq(level.q),
+ ),
+ source.data.eq(fifo.source.data),
+ fifo.source.ack.eq(source.ack),
+ If(source.stb & source.ack,
+ counter.ce.eq(1),
+ If(source.eop,
+ NextState("IDLE")
+ )
+ )
+ )
+
+class LiteEthTTYRX(Module):
+ def __init__(self, ip_address, udp_port, fifo_depth=None):
+ self.sink = sink = Sink(eth_udp_user_description(8))
+ self.source = source = Source(eth_tty_description(8))
+ ###
+ valid = Signal()
+ self.comb += valid.eq(
+ (sink.ip_address == ip_address) &
+ (sink.dst_port == udp_port)
+ )
+ if fifo_depth is None:
+ self.comb += [
+ source.stb.eq(sink.stb & valid),
+ source.data.eq(sink.data),
+ sink.ack.eq(source.ack)
+ ]
+ else:
+ self.submodules.fifo = fifo = SyncFIFO([("data", 8)], fifo_depth)
+ self.comb += [
+ fifo.sink.stb.eq(sink.stb & valid),
+ fifo.sink.data.eq(sink.data),
+ sink.ack.eq(fifo.sink.ack),
+ Record.connect(fifo.source, source)
+ ]
+
+class LiteEthTTY(Module):
+ def __init__(self, udp, ip_address, udp_port,
+ rx_fifo_depth=64,
+ tx_fifo_depth=64):
+ self.submodules.tx = tx = LiteEthTTYTX(ip_address, udp_port, tx_fifo_depth)
+ self.submodules.rx = rx = LiteEthTTYRX(ip_address, udp_port, rx_fifo_depth)
+ udp_port = udp.crossbar.get_port(udp_port, dw=8)
+ self.comb += [
+ Record.connect(tx.source, udp_port.sink),
+ Record.connect(udp_port.source, rx.sink)
+ ]
+ self.sink, self.source = self.tx.sink, self.rx.source
--- /dev/null
+from litescope.common import *
+from litescope.frontend.la import LiteScopeLA
+from litescope.core.port import LiteScopeTerm
+
+from liteeth.common import *
+from liteeth.generic import *
+
+from targets.base import BaseSoC
+from liteeth.core.tty import LiteEthTTY
+
+class TTYSoC(BaseSoC):
+ default_platform = "kc705"
+ def __init__(self, platform):
+ BaseSoC.__init__(self, platform,
+ mac_address=0x10e2d5000000,
+ ip_address="192.168.1.40")
+ self.submodules.tty = LiteEthTTY(self.core.udp, convert_ip("192.168.1.12"), 10000)
+ self.comb += Record.connect(self.tty.source, self.tty.sink)
+
+class TTYSoCDevel(TTYSoC):
+ csr_map = {
+ "la": 20
+ }
+ csr_map.update(TTYSoC.csr_map)
+ def __init__(self, platform):
+ TTYSoC.__init__(self, platform)
+ debug = (
+ self.tty.sink.stb,
+ self.tty.sink.ack,
+ self.tty.sink.data,
+
+ self.tty.source.stb,
+ self.tty.source.ack,
+ self.tty.source.data
+ )
+ self.submodules.la = LiteScopeLA(debug, 4096)
+ self.la.trigger.add_port(LiteScopeTerm(self.la.dw))
+
+ def do_exit(self, vns):
+ self.la.export(vns, "test/la.csv")
+
+default_subtarget = TTYSoC
--- /dev/null
+import socket
+import threading
+
+test_message = "LiteEth virtual TTY Hello world"
+
+def test(fpga_ip, udp_port, test_message):
+ tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ rx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ rx_sock.bind(("", udp_port))
+
+ def receive():
+ data, addr = rx_sock.recvfrom(8192)
+ rx_packet = []
+ for byte in data:
+ rx_packet.append(int(byte))
+ for e in rx_packet:
+ print(chr(e))
+
+ def send():
+ tx_sock.sendto(bytes(test_message, "utf-8"), (fpga_ip, udp_port))
+
+ receive_thread = threading.Thread(target=receive)
+ receive_thread.start()
+
+ send_thread = threading.Thread(target=send)
+ send_thread.start()
+
+ try:
+ send_thread.join(10)
+ receive_thread.join(0.1)
+ except KeyboardInterrupt:
+ pass
+
+test_message = "LiteEth virtual TTY Hello world\n"
+test("192.168.1.40", 10000, test_message)