arp_request_infos = {
"sender_mac" : 0x00123f979201,
- "target_mac" : 0x00221922549e,
- "ethernet_type" : 0x806,
- "hwtype" : 0x1,
- "opcode" : 0x1,
- "protosize" : 0x4,
+ "target_mac" : 0x00221922549e,
+ "ethernet_type" : 0x806,
+ "hwtype" : 0x1,
+ "opcode" : 0x1,
+ "protosize" : 0x4,
"proto" : 0x800,
"sender_ip" : 0xa9feff42,
- "target_ip" : 0xa9fe6462
+ "target_ip" : 0xa9fe6462
}
arp_reply_infos = {
"sender_mac" : 0x00221922549e,
- "target_mac" : 0x00123f979201,
- "ethernet_type" : 0x806,
- "hwtype" : 0x1,
- "opcode" : 0x2,
- "protosize" : 0x4,
+ "target_mac" : 0x00123f979201,
+ "ethernet_type" : 0x806,
+ "hwtype" : 0x1,
+ "opcode" : 0x2,
+ "protosize" : 0x4,
"proto" : 0x800,
"sender_ip" : 0xa9fe6462,
- "target_ip" : 0xa9feff42
+ "target_ip" : 0xa9feff42
}
udp = format_dump("""
udp_infos = {
"sender_mac" : 0x00140b333327,
- "target_mac" : 0xd07ab596cd0a,
- "protocol" : 0x11,
+ "target_mac" : 0xd07ab596cd0a,
+ "protocol" : 0x11,
"sender_ip" : 0xc0a80165,
- "target_ip" : 0xb27b0d78,
- "src_port" : 0xa63f,
+ "target_ip" : 0xb27b0d78,
+ "src_port" : 0xa63f,
"dst_port" : 0x690f
}
+
+ping_request = format_dump("""
+00 50 56 e0 14 49 00 0c 29 34 0b de 08 00 45 00
+00 3c d7 43 00 00 80 01 2b 73 c0 a8 9e 8b ae 89
+2a 4d 08 00 2a 5c 02 00 21 00 61 62 63 64 65 66
+67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76
+77 61 62 63 64 65 66 67 68 69""")
+
+ping_request_infos = {
+ "code" : 0x0,
+ "msgtype" : 0x8,
+ "quench" : 0x2002100
+}
+
+ping_reply = format_dump("""
+00 0c 29 34 0b de 00 50 56 e0 14 49 08 00 45 00
+00 3c 76 e1 00 00 80 01 8b d5 ae 89 2a 4d c0 a8
+9e 8b 00 00 32 5c 02 00 21 00 61 62 63 64 65 66
+67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76
+77 61 62 63 64 65 66 67 68 69""")
+
+ping_reply_infos = {}
--- /dev/null
+import math
+
+from liteeth.common import *
+from liteeth.test.common import *
+
+from liteeth.test.model import ip
+
+def print_icmp(s):
+ print_with_prefix(s, "[ICMP]")
+
+# ICMP model
+class ICMPPacket(Packet):
+ def __init__(self, init=[]):
+ Packet.__init__(self, init)
+
+ def decode(self):
+ header = []
+ for byte in self[:icmp_header_len]:
+ header.append(self.pop(0))
+ for k, v in sorted(icmp_header.items()):
+ setattr(self, k, get_field_data(v, header))
+
+ def encode(self):
+ header = 0
+ for k, v in sorted(icmp_header.items()):
+ value = merge_bytes(split_bytes(getattr(self, k), math.ceil(v.width/8)), "little")
+ header += (value << v.offset+(v.byte*8))
+ for d in split_bytes(header, icmp_header_len):
+ self.insert(0, d)
+
+ def __repr__(self):
+ r = "--------\n"
+ for k in sorted(icmp_header.keys()):
+ r += k + " : 0x%x" %getattr(self,k) + "\n"
+ r += "payload: "
+ for d in self:
+ r += "%02x" %d
+ return r
+
+class ICMP(Module):
+ def __init__(self, ip, ip_address, debug=False):
+ self.ip = ip
+ self.ip_address = ip_address
+ self.debug = debug
+ self.loopback = loopback
+ self.tx_packets = []
+ self.tx_packet = ICMPPacket()
+ self.rx_packet = ICMPPacket()
+
+ self.ip.set_icmp_callback(self.callback)
+
+ def send(self, packet):
+ packet.encode()
+ if self.debug:
+ print_icmp(">>>>>>>>")
+ print_icmp(packet)
+ ip_packet = ip.IPPacket(packet)
+ ip_packet.version = 0x4
+ ip_packet.ihl = 0x5
+ ip_packet.total_length = len(packet) + ip_packet.ihl
+ ip_packet.identification = 0
+ ip_packet.flags = 0
+ ip_packet.fragment_offset = 0
+ ip_packet.ttl = 0x80
+ ip_packet.sender_ip = self.ip_address
+ ip_packet.target_ip = 0x12345678 # XXX
+ ip_packet.checksum = 0
+ ip_packet.protocol = icmp_protocol
+ self.ip.send(ip_packet)
+
+ def callback(self, packet):
+ packet = ICMPPacket(packet)
+ packet.decode()
+ if self.debug:
+ print_icmp("<<<<<<<<")
+ print_icmp(packet)
+ if self.loopback:
+ self.send(packet)
+ else:
+ self.process(packet)
+
+ def process(self, packet):
+ pass
+
+if __name__ == "__main__":
+ from liteeth.test.model.dumps import *
+ from liteeth.test.model.mac import *
+ from liteeth.test.model.ip import *
+ errors = 0
+ # ICMP packet
+ packet = MACPacket(ping_request)
+ packet.decode_remove_header()
+ #print(packet)
+ packet = IPPacket(packet)
+ packet.decode()
+ #print(packet)
+ packet = ICMPPacket(packet)
+ packet.decode()
+ #print(packet)
+ errors += verify_packet(packet, ping_request_infos)
+ packet.encode()
+ packet.decode()
+ #print(packet)
+ errors += verify_packet(packet, ping_request_infos)
+
+ print("icmp errors " + str(errors))
\ No newline at end of file
self.table = {}
self.request_pending = False
self.udp_callback = None
+ self.icmp_callback = None
self.mac.set_ip_callback(self.callback)
def set_udp_callback(self, callback):
self.udp_callback = callback
+ def set_icmp_callback(self, callback):
+ self.icmp_callback = callback
+
def send(self, packet):
packet.encode()
packet.insert_checksum()
if packet.protocol == udp_protocol:
if self.udp_callback is not None:
self.udp_callback(packet)
+ elif packet.protocol == icmp_protocol:
+ if self.icmp_callback is not None:
+ self.icmp_callback(packet)
if __name__ == "__main__":
from liteeth.test.model.dumps import *