"destination_ip_address": HField(24, 0, 32)
}
-ipv4_header_len = 24
+ipv4_header_len = 20
ipv4_header = {
"version": HField(0, 0, 4),
"ihl": HField(0, 4, 4),
"protocol": HField(9, 0, 8),
"header_checksum": HField(10, 0, 16),
"source_ip_address": HField(12, 0, 32),
- "destination_ip_address": HField(16, 0, 32),
- "options": HField(20, 0, 32)
+ "destination_ip_address": HField(16, 0, 32)
}
udp_header_len = 8
packetizer.sink.flags.eq(0),
packetizer.sink.fragment_offset.eq(0),
packetizer.sink.time_to_live.eq(0x80),
- packetizer.sink.source_ip_address.eq(ip_address),
- packetizer.sink.options.eq(0)
+ packetizer.sink.source_ip_address.eq(ip_address)
]
sink = packetizer.source
preamble = split_bytes(eth_preamble, 8)
+def carry_around_add(a, b):
+ c = a + b
+ return (c & 0xffff) + (c >> 16)
+
+def checksum(msg):
+ s = 0
+ for i in range(0, len(msg), 2):
+ w = msg[i] + (msg[i+1] << 8)
+ s = carry_around_add(s, w)
+ return ~s & 0xffff
+
# IP model
class IPPacket(Packet):
def __init__(self, init=[]):
Packet.__init__(self, init)
+ def get_checksum(self):
+ return self[10] | (self[11] << 8)
+
+ def check_checksum(self):
+ return checksum(self[:ipv4_header_len]) == 0
+
def decode(self):
header = []
for byte in self[:ipv4_header_len]:
for d in split_bytes(header, ipv4_header_len):
self.insert(0, d)
+ def insert_checksum(self):
+ self[10] = 0
+ self[11] = 0
+ c = checksum(self[:ipv4_header_len])
+ self[10] = c & 0xff
+ self[11] = (c >> 8) & 0xff
+
def __repr__(self):
r = "--------\n"
for k in sorted(ipv4_header.keys()):
def send(self, packet):
packet.encode()
+ packet.insert_checksum()
if self.debug:
print_ip(">>>>>>>>")
print_ip(packet)
def callback(self, packet):
packet = IPPacket(packet)
+ if not packet.check_checksum():
+ received = packet.get_checksum()
+ packet.insert_checksum()
+ expected = packet.get_checksum()
+ raise ValueError("Checksum error received %04x / expected %04x" %(received, expected)) # XXX maybe too restrictive
packet.decode()
if self.debug:
print_ip("<<<<<<<<")
def process(self, packet):
pass
-
+
if __name__ == "__main__":
from liteeth.test.model.dumps import *
from liteeth.test.model.mac import *
#print(packet)
packet = IPPacket(packet)
# check decoding
+ errors += not packet.check_checksum()
packet.decode()
#print(packet)
errors += verify_packet(packet, {})
# check encoding
packet.encode()
+ packet.insert_checksum()
+ errors += not packet.check_checksum()
packet.decode()
#print(packet)
errors += verify_packet(packet, {})