79ac98d0a036df1d0d6dd11ce91107237dec31ee
3 from liteeth
.common
import *
4 from liteeth
.mac
.common
import *
5 from liteeth
.test
.common
import *
7 from liteeth
.test
.model
import mac
10 print_with_prefix(s
, "[ARP]")
12 preamble
= split_bytes(eth_preamble
, 8)
15 class ARPPacket(Packet
):
16 def __init__(self
, init
=[]):
17 Packet
.__init
__(self
, init
)
21 for byte
in self
[:arp_header_len
]:
22 header
.append(self
.pop(0))
23 for k
, v
in sorted(arp_header
.items()):
24 setattr(self
, k
, get_field_data(v
, header
))
28 for k
, v
in sorted(arp_header
.items()):
29 value
= merge_bytes(split_bytes(getattr(self
, k
), math
.ceil(v
.width
/8)), "little")
30 header
+= (value
<< v
.offset
+(v
.byte
*8))
31 for d
in split_bytes(header
, arp_header_len
):
36 for k
in sorted(arp_header
.keys()):
37 r
+= k
+ " : 0x%x" %getattr
(self
,k
) + "\n"
44 def __init__(self
, mac
, mac_address
, ip_address
, debug
=False):
46 self
.mac_address
= mac_address
47 self
.ip_address
= ip_address
50 self
.tx_packet
= ARPPacket()
51 self
.rx_packet
= ARPPacket()
53 self
.request_pending
= False
55 self
.mac
.set_arp_callback(self
.callback
)
57 def send(self
, packet
):
62 mac_packet
= mac
.MACPacket(packet
)
63 mac_packet
.destination_mac_address
= packet
.destination_mac_address
64 mac_packet
.source_mac_address
= packet
.source_mac_address
65 mac_packet
.ethernet_type
= ethernet_type_arp
66 self
.mac
.send(mac_packet
)
68 def callback(self
, packet
):
69 packet
= ARPPacket(packet
)
76 def process(self
, packet
):
77 if len(packet
) != arp_packet_length
-arp_header_len
:
79 if packet
.hardware_type
!= arp_hwtype_ethernet
:
81 if packet
.protocol_type
!= arp_proto_ip
:
83 if packet
.hardware_address_length
!= 6:
85 if packet
.protocol_address_length
!= 4:
87 if packet
.operation
== arp_opcode_request
:
88 self
.process_request(packet
)
89 elif packet
.operation
== arp_opcode_reply
:
90 self
.process_reply(packet
)
92 def process_request(self
, request
):
93 if request
.destination_ip_address
== self
.ip_address
:
94 reply
= ARPPacket([0]*(arp_packet_length
-arp_header_len
))
95 reply
.hardware_type
= arp_hwtype_ethernet
96 reply
.protocol_type
= arp_proto_ip
97 reply
.operation
= arp_opcode_reply
98 reply
.hardware_address_length
= 6
99 reply
.protocol_address_length
= 4
100 reply
.source_mac_address
= self
.mac_address
101 reply
.source_ip_address
= self
.ip_address
102 reply
.destination_mac_address
= request
.source_mac_address
103 reply
.destination_ip_address
= request
.source_ip_address
106 def process_reply(self
, reply
):
107 self
.table
[reply
.source_ip_address
] = reply
.source_mac_address
109 def request(self
, ip_address
):
110 request
= ARPPacket([0]*(arp_packet_length
-arp_header_len
))
111 request
.hardware_type
= arp_hwtype_ethernet
112 request
.protocol_type
= arp_proto_ip
113 request
.operation
= arp_opcode_request
114 request
.hardware_address_length
= 6
115 request
.protocol_address_length
= 4
116 request
.source_mac_address
= self
.mac_address
117 request
.source_ip_address
= self
.ip_address
118 request
.destination_mac_address
= 0xffffffffffff
119 request
.destination_ip_address
= ip_address
121 if __name__
== "__main__":
122 from liteeth
.test
.model
.dumps
import *
123 from liteeth
.test
.model
.mac
import *
126 packet
= MACPacket(arp_request
)
127 packet
.decode_remove_header()
128 packet
= ARPPacket(packet
)
132 errors
+= verify_packet(packet
, arp_request_infos
)
137 errors
+= verify_packet(packet
, arp_request_infos
)
140 packet
= MACPacket(arp_reply
)
141 packet
.decode_remove_header()
142 packet
= ARPPacket(packet
)
146 errors
+= verify_packet(packet
, arp_reply_infos
)
151 errors
+= verify_packet(packet
, arp_reply_infos
)
153 print("arp errors " + str(errors
))