clean up
[litex.git] / liteeth / test / model / mac.py
1 import math, binascii
2
3 from liteeth.common import *
4 from liteeth.test.common import *
5
6 def print_mac(s):
7 print_with_prefix(s, "[MAC]")
8
9 preamble = split_bytes(eth_preamble, 8, "little")
10
11 def crc32(l):
12 crc = []
13 crc_bytes = split_bytes(binascii.crc32(bytes(l)), 4, "little")
14 for byte in crc_bytes:
15 crc.append(int(byte))
16 return crc
17
18 # MAC model
19 class MACPacket(Packet):
20 def __init__(self, init=[]):
21 Packet.__init__(self, init)
22 self.preamble_error = False
23 self.crc_error = False
24
25 def check_remove_preamble(self):
26 if comp(self[0:8], preamble):
27 for i in range(8):
28 self.pop(0)
29 return False
30 else:
31 return True
32
33 def check_remove_crc(self):
34 if comp(self[-4:], crc32(self[:-4])):
35 for i in range(4):
36 self.pop()
37 return False
38 else:
39 return True
40
41 def decode_remove_header(self):
42 header = []
43 for byte in self[:mac_header_len]:
44 header.append(self.pop(0))
45 for k, v in sorted(mac_header.items()):
46 setattr(self, k, get_field_data(v, header))
47
48 def decode(self):
49 self.preamble_error = self.check_remove_preamble()
50 self.crc_error = self.check_remove_crc()
51 if self.crc_error or self.preamble_error:
52 raise ValueError # XXX handle this properly
53 else:
54 self.decode_remove_header()
55
56 def encode_header(self):
57 header = 0
58 for k, v in sorted(mac_header.items()):
59 value = merge_bytes(split_bytes(getattr(self, k), math.ceil(v.width/8)), "little")
60 header += (value << v.offset+(v.byte*8))
61 for d in split_bytes(header, mac_header_len):
62 self.insert(0, d)
63
64 def insert_crc(self):
65 for d in crc32(self):
66 self.append(d)
67
68 def insert_preamble(self):
69 for d in reversed(preamble):
70 self.insert(0, d)
71
72 def encode(self):
73 self.encode_header()
74 self.insert_crc()
75 self.insert_preamble()
76
77 def __repr__(self):
78 r = "--------\n"
79 for k in sorted(mac_header.keys()):
80 r += k + " : 0x%x" %getattr(self,k) + "\n"
81 r += "payload: "
82 for d in self:
83 r += "%02x" %d
84 return r
85
86 class MAC(Module):
87 def __init__(self, phy, debug=False, loopback=False):
88 self.phy = phy
89 self.debug = debug
90 self.loopback = loopback
91 self.tx_packets = []
92 self.tx_packet = MACPacket()
93 self.rx_packet = MACPacket()
94
95 self.ip_callback = None
96 self.arp_callback = None
97
98 def set_ip_callback(self, callback):
99 self.ip_callback = callback
100
101 def set_arp_callback(self, callback):
102 self.arp_callback = callback
103
104 def send(self, packet):
105 if self.debug:
106 print_mac(">>>>>>>>")
107 print_mac(packet)
108 packet.encode()
109 self.tx_packets.append(packet)
110
111 def callback(self, datas):
112 packet = MACPacket(datas)
113 packet.decode()
114 if self.debug:
115 print_mac("<<<<<<<<")
116 print_mac(packet)
117 if self.loopback:
118 self.send(packet)
119 else:
120 if packet.ethernet_type == ethernet_type_ip:
121 if self.ip_callback is not None:
122 self.ip_callback(packet)
123 elif packet.ethernet_type == ethernet_type_arp:
124 if self.arp_callback is not None:
125 self.arp_callback(packet)
126 else:
127 raise ValueError # XXX handle this properly
128
129 def gen_simulation(self, selfp):
130 self.tx_packet.done = True
131 while True:
132 yield from self.phy.receive()
133 self.callback(self.phy.packet)
134 # XXX add full duplex
135 if len(self.tx_packets) != 0:
136 tx_packet = self.tx_packets.pop(0)
137 yield from self.phy.send(tx_packet)
138
139 if __name__ == "__main__":
140 from liteeth.test.model.dumps import *
141 errors = 0
142 packet = MACPacket(arp_request)
143 packet.decode_remove_header()
144 #print(packet)
145 errors += verify_packet(packet, arp_request_infos)
146 packet.encode_header()
147 packet.decode_remove_header()
148 #print(packet)
149 errors += verify_packet(packet, arp_request_infos)
150
151 #print(packet)
152 packet = MACPacket(arp_reply)
153 packet.decode_remove_header()
154 errors += verify_packet(packet, arp_reply_infos)
155 packet.encode_header()
156 packet.decode_remove_header()
157 #print(packet)
158 errors += verify_packet(packet, arp_reply_infos)
159
160 print("mac errors " + str(errors))