From: Florent Kermarrec Date: Fri, 6 Feb 2015 19:49:30 +0000 (+0100) Subject: improve test_udp (add random data and check) X-Git-Tag: 24jan2021_ls180~2604^2~65 X-Git-Url: https://git.libre-soc.org/?a=commitdiff_plain;h=a8300a2e933554204e27585c465a50b608e912a1;p=litex.git improve test_udp (add random data and check) --- diff --git a/test/test_udp.py b/test/test_udp.py index 5f05249e..7c5c625f 100644 --- a/test/test_udp.py +++ b/test/test_udp.py @@ -1,24 +1,72 @@ import socket import time import threading +import copy + +KB = 1024 +MB = 1024*KB +GB = 1024*MB + +def seed_to_data(seed, random=True): + if random: + return (seed * 0x31415979 + 1) & 0xffffffff + else: + return seed + +def check(p1, p2): + p1 = copy.deepcopy(p1) + p2 = copy.deepcopy(p2) + if isinstance(p1, int): + return 0, 1, int(p1 != p2) + else: + if len(p1) >= len(p2): + ref, res = p1, p2 + else: + ref, res = p2, p1 + shift = 0 + while((ref[0] != res[0]) and (len(res)>1)): + res.pop(0) + shift += 1 + length = min(len(ref), len(res)) + errors = 0 + for i in range(length): + if ref.pop(0) != res.pop(0): + errors += 1 + return shift, length, errors + +def generate_packet(seed, length): + r = [] + for i in range(length): + r.append(seed_to_data(seed, True)%0xff) # XXX FIXME + seed += 1 + return r, seed FPGA_IP = "192.168.1.40" HOST_IP = "192.168.1.12" -UDP_PORT = 5010 +UDP_PORT = 6000 MESSAGE = bytes("LiteEth UDP Loopback test", "utf-8") tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) rx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) rx_sock.bind(("", UDP_PORT)) def receive(): - while True: - data, addr = rx_sock.recvfrom(1024) - print(data) + rx_seed = 0 + while rx_seed < 1*MB: + data, addr = rx_sock.recvfrom(1024) + rx_packet = [] + for byte in data: + rx_packet.append(int(byte)) + rx_reference_packet, rx_seed = generate_packet(rx_seed, 512) + s, l, e = check(rx_reference_packet, rx_packet) + print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e)) + done = True def send(): - while True: - tx_sock.sendto(MESSAGE, (FPGA_IP, UDP_PORT)) - time.sleep(0.01) + tx_seed = 0 + while tx_seed < 1*MB: + tx_packet, tx_seed = generate_packet(tx_seed, 512) + tx_sock.sendto(bytes(tx_packet), (FPGA_IP, UDP_PORT)) + time.sleep(0.01) receive_thread = threading.Thread(target=receive, daemon=True) receive_thread.start()