79ac98d0a036df1d0d6dd11ce91107237dec31ee
[litex.git] / liteeth / test / model / arp.py
1 import math, binascii
2
3 from liteeth.common import *
4 from liteeth.mac.common import *
5 from liteeth.test.common import *
6
7 from liteeth.test.model import mac
8
9 def print_arp(s):
10 print_with_prefix(s, "[ARP]")
11
12 preamble = split_bytes(eth_preamble, 8)
13
14 # ARP model
15 class ARPPacket(Packet):
16 def __init__(self, init=[]):
17 Packet.__init__(self, init)
18
19 def decode(self):
20 header = []
21 for byte in self[:arp_header_len]:
22 header.append(self.pop(0))
23 for k, v in sorted(arp_header.items()):
24 setattr(self, k, get_field_data(v, header))
25
26 def encode(self):
27 header = 0
28 for k, v in sorted(arp_header.items()):
29 value = merge_bytes(split_bytes(getattr(self, k), math.ceil(v.width/8)), "little")
30 header += (value << v.offset+(v.byte*8))
31 for d in split_bytes(header, arp_header_len):
32 self.insert(0, d)
33
34 def __repr__(self):
35 r = "--------\n"
36 for k in sorted(arp_header.keys()):
37 r += k + " : 0x%x" %getattr(self,k) + "\n"
38 r += "payload: "
39 for d in self:
40 r += "%02x" %d
41 return r
42
43 class ARP(Module):
44 def __init__(self, mac, mac_address, ip_address, debug=False):
45 self.mac = mac
46 self.mac_address = mac_address
47 self.ip_address = ip_address
48 self.debug = debug
49 self.tx_packets = []
50 self.tx_packet = ARPPacket()
51 self.rx_packet = ARPPacket()
52 self.table = {}
53 self.request_pending = False
54
55 self.mac.set_arp_callback(self.callback)
56
57 def send(self, packet):
58 packet.encode()
59 if self.debug:
60 print_arp(">>>>>>>>")
61 print_arp(packet)
62 mac_packet = mac.MACPacket(packet)
63 mac_packet.destination_mac_address = packet.destination_mac_address
64 mac_packet.source_mac_address = packet.source_mac_address
65 mac_packet.ethernet_type = ethernet_type_arp
66 self.mac.send(mac_packet)
67
68 def callback(self, packet):
69 packet = ARPPacket(packet)
70 packet.decode()
71 if self.debug:
72 print_arp("<<<<<<<<")
73 print_arp(packet)
74 self.process(packet)
75
76 def process(self, packet):
77 if len(packet) != arp_packet_length-arp_header_len:
78 raise ValueError
79 if packet.hardware_type != arp_hwtype_ethernet:
80 raise ValueError
81 if packet.protocol_type != arp_proto_ip:
82 raise ValueError
83 if packet.hardware_address_length != 6:
84 raise ValueError
85 if packet.protocol_address_length != 4:
86 raise ValueError
87 if packet.operation == arp_opcode_request:
88 self.process_request(packet)
89 elif packet.operation == arp_opcode_reply:
90 self.process_reply(packet)
91
92 def process_request(self, request):
93 if request.destination_ip_address == self.ip_address:
94 reply = ARPPacket([0]*(arp_packet_length-arp_header_len))
95 reply.hardware_type = arp_hwtype_ethernet
96 reply.protocol_type = arp_proto_ip
97 reply.operation = arp_opcode_reply
98 reply.hardware_address_length = 6
99 reply.protocol_address_length = 4
100 reply.source_mac_address = self.mac_address
101 reply.source_ip_address = self.ip_address
102 reply.destination_mac_address = request.source_mac_address
103 reply.destination_ip_address = request.source_ip_address
104 self.send(reply)
105
106 def process_reply(self, reply):
107 self.table[reply.source_ip_address] = reply.source_mac_address
108
109 def request(self, ip_address):
110 request = ARPPacket([0]*(arp_packet_length-arp_header_len))
111 request.hardware_type = arp_hwtype_ethernet
112 request.protocol_type = arp_proto_ip
113 request.operation = arp_opcode_request
114 request.hardware_address_length = 6
115 request.protocol_address_length = 4
116 request.source_mac_address = self.mac_address
117 request.source_ip_address = self.ip_address
118 request.destination_mac_address = 0xffffffffffff
119 request.destination_ip_address = ip_address
120
121 if __name__ == "__main__":
122 from liteeth.test.model.dumps import *
123 from liteeth.test.model.mac import *
124 errors = 0
125 # ARP request
126 packet = MACPacket(arp_request)
127 packet.decode_remove_header()
128 packet = ARPPacket(packet)
129 # check decoding
130 packet.decode()
131 #print(packet)
132 errors += verify_packet(packet, arp_request_infos)
133 # check encoding
134 packet.encode()
135 packet.decode()
136 #print(packet)
137 errors += verify_packet(packet, arp_request_infos)
138
139 # ARP Reply
140 packet = MACPacket(arp_reply)
141 packet.decode_remove_header()
142 packet = ARPPacket(packet)
143 # check decoding
144 packet.decode()
145 #print(packet)
146 errors += verify_packet(packet, arp_reply_infos)
147 # check encoding
148 packet.encode()
149 packet.decode()
150 #print(packet)
151 errors += verify_packet(packet, arp_reply_infos)
152
153 print("arp errors " + str(errors))