improve test_udp (add random data and check)
authorFlorent Kermarrec <florent@enjoy-digital.fr>
Fri, 6 Feb 2015 19:49:30 +0000 (20:49 +0100)
committerFlorent Kermarrec <florent@enjoy-digital.fr>
Fri, 6 Feb 2015 19:49:30 +0000 (20:49 +0100)
test/test_udp.py

index 5f05249eb9421e1f79095f044f08e1500622b678..7c5c625fd3410306929eb8acbdaafb9e32e81e6c 100644 (file)
@@ -1,24 +1,72 @@
 import socket
 import time
 import threading
+import copy
+
+KB = 1024
+MB = 1024*KB
+GB = 1024*MB
+
+def seed_to_data(seed, random=True):
+       if random:
+               return (seed * 0x31415979 + 1) & 0xffffffff
+       else:
+               return seed
+
+def check(p1, p2):
+       p1 = copy.deepcopy(p1)
+       p2 = copy.deepcopy(p2)
+       if isinstance(p1, int):
+               return 0, 1, int(p1 != p2)
+       else:
+               if len(p1) >= len(p2):
+                       ref, res = p1, p2
+               else:
+                       ref, res = p2, p1
+               shift = 0
+               while((ref[0] != res[0]) and (len(res)>1)):
+                       res.pop(0)
+                       shift += 1
+               length = min(len(ref), len(res))
+               errors = 0
+               for i in range(length):
+                       if ref.pop(0) != res.pop(0):
+                               errors += 1
+               return shift, length, errors
+
+def generate_packet(seed, length):
+       r = []
+       for i in range(length):
+               r.append(seed_to_data(seed, True)%0xff) # XXX FIXME
+               seed += 1
+       return r, seed
 
 FPGA_IP = "192.168.1.40"
 HOST_IP = "192.168.1.12"
-UDP_PORT = 5010
+UDP_PORT = 6000
 MESSAGE = bytes("LiteEth UDP Loopback test", "utf-8")
 tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 rx_sock  = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 rx_sock.bind(("", UDP_PORT))
 
 def receive():
-    while True:
-      data, addr = rx_sock.recvfrom(1024)
-      print(data)
+       rx_seed = 0
+       while rx_seed < 1*MB:
+               data, addr = rx_sock.recvfrom(1024)
+               rx_packet = []
+               for byte in data:
+                       rx_packet.append(int(byte))
+               rx_reference_packet, rx_seed = generate_packet(rx_seed, 512)
+               s, l, e = check(rx_reference_packet, rx_packet)
+               print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
+       done = True
 
 def send():
-  while True:
-    tx_sock.sendto(MESSAGE, (FPGA_IP, UDP_PORT))
-    time.sleep(0.01)
+       tx_seed = 0
+       while tx_seed < 1*MB:
+               tx_packet, tx_seed = generate_packet(tx_seed, 512)
+               tx_sock.sendto(bytes(tx_packet), (FPGA_IP, UDP_PORT))
+               time.sleep(0.01)
 
 receive_thread = threading.Thread(target=receive, daemon=True)
 receive_thread.start()