# Create loopback on UDP port 6000
loopback_port = self.core.udp.crossbar.get_port(6000)
- loopback_fifo = SyncFIFO(eth_udp_user_description(8), 2048, buffered=True)
+ loopback_fifo = SyncFIFO(eth_udp_user_description(8), 8192, buffered=True)
self.submodules += loopback_fifo
self.comb += [
Record.connect(loopback_port.source, loopback_fifo.sink),
seed += 1
return r, seed
-FPGA_IP = "192.168.1.40"
-HOST_IP = "192.168.1.12"
-UDP_PORT = 6000
-MESSAGE = bytes("LiteEth UDP Loopback test", "utf-8")
+fpga_ip = "192.168.1.40"
+udp_port = 6000
+test_size = 1*MB
tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
rx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-rx_sock.bind(("", UDP_PORT))
+rx_sock.bind(("", udp_port))
def receive():
rx_seed = 0
- while rx_seed < 1*MB:
+ while rx_seed < test_size:
data, addr = rx_sock.recvfrom(1024)
rx_packet = []
for byte in data:
rx_packet.append(int(byte))
- rx_reference_packet, rx_seed = generate_packet(rx_seed, 512)
+ rx_reference_packet, rx_seed = generate_packet(rx_seed, 1024)
s, l, e = check(rx_reference_packet, rx_packet)
print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
- done = True
def send():
tx_seed = 0
- while tx_seed < 1*MB:
- tx_packet, tx_seed = generate_packet(tx_seed, 512)
- tx_sock.sendto(bytes(tx_packet), (FPGA_IP, UDP_PORT))
- time.sleep(0.01)
+ while tx_seed < test_size:
+ tx_packet, tx_seed = generate_packet(tx_seed, 1024)
+ tx_sock.sendto(bytes(tx_packet), (fpga_ip, udp_port))
+ time.sleep(0.001) # XXX: FIXME
-receive_thread = threading.Thread(target=receive, daemon=True)
+receive_thread = threading.Thread(target=receive)
receive_thread.start()
-send_thread = threading.Thread(target=send, daemon=True)
+send_thread = threading.Thread(target=send)
send_thread.start()
-while True:
- time.sleep(1)
+try:
+ send_thread.join()
+ receive_thread.join()
+except KeyboardInterrupt:
+ pass