import socket
import time
import threading
+import copy
+
+KB = 1024
+MB = 1024*KB
+GB = 1024*MB
+
+def seed_to_data(seed, random=True):
+ if random:
+ return (seed * 0x31415979 + 1) & 0xffffffff
+ else:
+ return seed
+
+def check(p1, p2):
+ p1 = copy.deepcopy(p1)
+ p2 = copy.deepcopy(p2)
+ if isinstance(p1, int):
+ return 0, 1, int(p1 != p2)
+ else:
+ if len(p1) >= len(p2):
+ ref, res = p1, p2
+ else:
+ ref, res = p2, p1
+ shift = 0
+ while((ref[0] != res[0]) and (len(res)>1)):
+ res.pop(0)
+ shift += 1
+ length = min(len(ref), len(res))
+ errors = 0
+ for i in range(length):
+ if ref.pop(0) != res.pop(0):
+ errors += 1
+ return shift, length, errors
+
+def generate_packet(seed, length):
+ r = []
+ for i in range(length):
+ r.append(seed_to_data(seed, True)%0xff) # XXX FIXME
+ seed += 1
+ return r, seed
FPGA_IP = "192.168.1.40"
HOST_IP = "192.168.1.12"
-UDP_PORT = 5010
+UDP_PORT = 6000
MESSAGE = bytes("LiteEth UDP Loopback test", "utf-8")
tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
rx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
rx_sock.bind(("", UDP_PORT))
def receive():
- while True:
- data, addr = rx_sock.recvfrom(1024)
- print(data)
+ rx_seed = 0
+ while rx_seed < 1*MB:
+ data, addr = rx_sock.recvfrom(1024)
+ rx_packet = []
+ for byte in data:
+ rx_packet.append(int(byte))
+ rx_reference_packet, rx_seed = generate_packet(rx_seed, 512)
+ s, l, e = check(rx_reference_packet, rx_packet)
+ print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
+ done = True
def send():
- while True:
- tx_sock.sendto(MESSAGE, (FPGA_IP, UDP_PORT))
- time.sleep(0.01)
+ tx_seed = 0
+ while tx_seed < 1*MB:
+ tx_packet, tx_seed = generate_packet(tx_seed, 512)
+ tx_sock.sendto(bytes(tx_packet), (FPGA_IP, UDP_PORT))
+ time.sleep(0.01)
receive_thread = threading.Thread(target=receive, daemon=True)
receive_thread.start()