test_udp: test loopback on port 6000 (dw=8) and port 8000 (dw=32) OK on board!
authorFlorent Kermarrec <florent@enjoy-digital.fr>
Tue, 10 Feb 2015 15:30:34 +0000 (16:30 +0100)
committerFlorent Kermarrec <florent@enjoy-digital.fr>
Tue, 10 Feb 2015 15:30:34 +0000 (16:30 +0100)
test/test_udp.py

index 666ab35c3d6fafdba24b73ec5a3aad5eb3d6b5ba..2405684c8abc7a9aec2cafdb0d9911755e93a414 100644 (file)
@@ -41,39 +41,40 @@ def generate_packet(seed, length):
                seed += 1
        return r, seed
 
-fpga_ip = "192.168.1.40"
-udp_port = 6000
-test_size = 1*MB
-tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-rx_sock  = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-rx_sock.bind(("", udp_port))
+def test(fpga_ip, udp_port, test_size):
+       tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+       rx_sock  = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+       rx_sock.bind(("", udp_port))
 
-def receive():
-       rx_seed = 0
-       while rx_seed < test_size:
-               data, addr = rx_sock.recvfrom(8192)
-               rx_packet = []
-               for byte in data:
-                       rx_packet.append(int(byte))
-               rx_reference_packet, rx_seed = generate_packet(rx_seed, 1024)
-               s, l, e = check(rx_reference_packet, rx_packet)
-               print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
+       def receive():
+               rx_seed = 0
+               while rx_seed < test_size:
+                       data, addr = rx_sock.recvfrom(8192)
+                       rx_packet = []
+                       for byte in data:
+                               rx_packet.append(int(byte))
+                       rx_reference_packet, rx_seed = generate_packet(rx_seed, 1024)
+                       s, l, e = check(rx_reference_packet, rx_packet)
+                       print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
 
-def send():
-       tx_seed = 0
-       while tx_seed < test_size:
-               tx_packet, tx_seed = generate_packet(tx_seed, 1024)
-               tx_sock.sendto(bytes(tx_packet), (fpga_ip, udp_port))
-               time.sleep(0.001) # XXX: FIXME, Python limitation?
+       def send():
+               tx_seed = 0
+               while tx_seed < test_size:
+                       tx_packet, tx_seed = generate_packet(tx_seed, 1024)
+                       tx_sock.sendto(bytes(tx_packet), (fpga_ip, udp_port))
+                       time.sleep(0.001) # XXX: FIXME, Python limitation?
 
-receive_thread = threading.Thread(target=receive)
-receive_thread.start()
+       receive_thread = threading.Thread(target=receive)
+       receive_thread.start()
 
-send_thread = threading.Thread(target=send)
-send_thread.start()
+       send_thread = threading.Thread(target=send)
+       send_thread.start()
 
-try:
-       send_thread.join(10)
-       receive_thread.join(0.1)
-except KeyboardInterrupt:
-       pass
+       try:
+               send_thread.join(10)
+               receive_thread.join(0.1)
+       except KeyboardInterrupt:
+               pass
+
+test("192.168.1.40", 6000, 128*KB)
+test("192.168.1.40", 8000, 128*KB)