--- /dev/null
+import math, binascii
+
+from liteeth.common import *
+from liteeth.mac.common import *
+from liteeth.test.common import *
+from liteeth.test.mac import *
+
+def print_arp(s):
+ print_with_prefix(s, "[ARP]")
+
+preamble = split_bytes(eth_preamble, 8)
+
+# ARP model
+class ARPPacket(Packet):
+ def __init__(self, init=[]):
+ Packet.__init__(self, init)
+
+ def decode(self):
+ header = []
+ for byte in self[:arp_header_len]:
+ header.append(self.pop(0))
+ for k, v in sorted(arp_header.items()):
+ setattr(self, k, get_field_data(v, header))
+
+ def encode(self):
+ header = 0
+ for k, v in sorted(arp_header.items()):
+ header |= (getattr(self, k) << (v.byte*8+v.offset))
+ for d in reversed(split_bytes(header, arp_header_len)):
+ self.insert(0, d)
+
+ def __repr__(self):
+ r = "--------\n"
+ for k in sorted(arp_header.keys()):
+ r += k + " : 0x%x" %getattr(self,k) + "\n"
+ r += "payload: "
+ for d in self:
+ r += "%02x" %d
+ return r
+
+class ARP(Module):
+ def __init__(self, mac, ip_address, mac_address, debug=False):
+ self.mac = mac
+ self.ip_address = ip_address
+ self.mac_address = mac_addres
+ self.debug = debug
+ self.tx_packets = []
+ self.tx_packet = ARPPacket()
+ self.rx_packet = ARPPacket()
+ self.table = {}
+ self.request_pending = False
+
+ self.mac.set_arp_callback(self.callback)
+
+ def send(self, packet):
+ if self.debug:
+ print_arp(">>>>>>>>")
+ print_arp(packet)
+ packet.encode()
+ self.mac.send(MACPacket(packet))
+
+ def callback(self, packet):
+ packet = ARPPacket(datas)
+ packet.decode()
+ if self.debug:
+ print_arp("<<<<<<<<")
+ print_arp(packet)
+ self.process_packet()
+
+ def process(self, packet):
+ if len(packet) != arp_packet_length-arp_header_length:
+ raise ValueError
+ if packet.hardware_type != arp_hwtype_ethernet:
+ raise ValueError
+ if packet.protocol_type != arp_proto_ip:
+ raise ValueError
+ if packet.hardware_address_length != 6:
+ raise ValueError
+ if packet.protocol_address_length != 4:
+ raise ValueError
+ if packet.operation == arp_opcode_request:
+ self.process_request(packet)
+ elif packet.operation == arp_opcode_reply:
+ self.process_reply(packet)
+
+ def process_request(self, request):
+ if request.destination_ip_address = self.ip_address:
+ reply = ARPPacket([0]*(arp_packet_length-arp_header_length))
+ reply.hardware_type = arp_hwtype_ethernet
+ reply.protocol_type = arp_proto_ip
+ reply.hardware_address_length = 6
+ reply.protocol_address_length = 4
+ reply.source_mac_address = self.mac_address
+ reply.source_ip_address = self.ip_address
+ reply.destination_mac_address = request.source_mac_address
+ reply.destination_ip_address = request.source_ip_address
+ self.send(reply)
+
+ def process_reply(self, reply):
+ self.table[reply.source_ip_address] = reply.source_mac_address
+
+ def request(self, ip_address):
+ request = ARPPacket([0]*(arp_packet_length-arp_header_length))
+ request.hardware_type = arp_hwtype_ethernet
+ request.protocol_type = arp_proto_ip
+ request.hardware_address_length = 6
+ request.protocol_address_length = 4
+ request.source_mac_address = self.mac_address
+ request.source_ip_address = self.ip_address
+ request.destination_mac_address = 0xffffffffffff
+ request.destination_ip_address = ip_address