74273cb95df9d0531b3a707f57a11b66bdc26d32
[litex.git] / liteeth / test / model / mac.py
1 import math, binascii
2
3 from liteeth.common import *
4 from liteeth.mac.common import *
5 from liteeth.test.common import *
6
7 def print_mac(s):
8 print_with_prefix(s, "[MAC]")
9
10 preamble = split_bytes(eth_preamble, 8, "little")
11
12 def crc32(l):
13 crc = []
14 crc_bytes = split_bytes(binascii.crc32(bytes(l)), 4, "little")
15 for byte in crc_bytes:
16 crc.append(int(byte))
17 return crc
18
19 # MAC model
20 class MACPacket(Packet):
21 def __init__(self, init=[]):
22 Packet.__init__(self, init)
23 self.preamble_error = False
24 self.crc_error = False
25
26 def check_remove_preamble(self):
27 if comp(self[0:8], preamble):
28 for i in range(8):
29 self.pop(0)
30 return False
31 else:
32 return True
33
34 def check_remove_crc(self):
35 if comp(self[-4:], crc32(self[:-4])):
36 for i in range(4):
37 self.pop()
38 return False
39 else:
40 return True
41
42 def decode_remove_header(self):
43 header = []
44 for byte in self[:mac_header_len]:
45 header.append(self.pop(0))
46 for k, v in sorted(mac_header.items()):
47 setattr(self, k, get_field_data(v, header))
48
49 def decode(self):
50 self.preamble_error = self.check_remove_preamble()
51 self.crc_error = self.check_remove_crc()
52 if self.crc_error or self.preamble_error:
53 raise ValueError # XXX handle this properly
54 else:
55 self.decode_remove_header()
56
57 def encode_header(self):
58 header = 0
59 for k, v in sorted(mac_header.items()):
60 value = merge_bytes(split_bytes(getattr(self, k), math.ceil(v.width/8)), "little")
61 header += (value << v.offset+(v.byte*8))
62 for d in split_bytes(header, mac_header_len):
63 self.insert(0, d)
64
65 def insert_crc(self):
66 for d in crc32(self):
67 self.append(d)
68
69 def insert_preamble(self):
70 for d in reversed(preamble):
71 self.insert(0, d)
72
73 def encode(self):
74 self.encode_header()
75 self.insert_crc()
76 self.insert_preamble()
77
78 def __repr__(self):
79 r = "--------\n"
80 for k in sorted(mac_header.keys()):
81 r += k + " : 0x%x" %getattr(self,k) + "\n"
82 r += "payload: "
83 for d in self:
84 r += "%02x" %d
85 return r
86
87 class MAC(Module):
88 def __init__(self, phy, debug=False, loopback=False):
89 self.phy = phy
90 self.debug = debug
91 self.loopback = loopback
92 self.tx_packets = []
93 self.tx_packet = MACPacket()
94 self.rx_packet = MACPacket()
95
96 self.ip_callback = None
97 self.arp_callback = None
98
99 def set_ip_callback(self, callback):
100 self.ip_callback = callback
101
102 def set_arp_callback(self, callback):
103 self.arp_callback = callback
104
105 def send(self, packet):
106 if self.debug:
107 print_mac(">>>>>>>>")
108 print_mac(packet)
109 packet.encode()
110 self.tx_packets.append(packet)
111
112 def callback(self, datas):
113 packet = MACPacket(datas)
114 packet.decode()
115 if self.debug:
116 print_mac("<<<<<<<<")
117 print_mac(packet)
118 if self.loopback:
119 self.send(packet)
120 else:
121 if packet.ethernet_type == ethernet_type_ip:
122 if self.ip_callback is not None:
123 self.ip_callback(packet)
124 elif packet.ethernet_type == ethernet_type_arp:
125 if self.arp_callback is not None:
126 self.arp_callback(packet)
127 else:
128 raise ValueError # XXX handle this properly
129
130 def gen_simulation(self, selfp):
131 self.tx_packet.done = True
132 while True:
133 yield from self.phy.receive()
134 self.callback(self.phy.packet)
135 # XXX add full duplex
136 if len(self.tx_packets) != 0:
137 tx_packet = self.tx_packets.pop(0)
138 yield from self.phy.send(tx_packet)
139
140 if __name__ == "__main__":
141 from liteeth.test.model.dumps import *
142 errors = 0
143 packet = MACPacket(arp_request)
144 packet.decode_remove_header()
145 #print(packet)
146 errors += verify_packet(packet, arp_request_infos)
147 packet.encode_header()
148 packet.decode_remove_header()
149 #print(packet)
150 errors += verify_packet(packet, arp_request_infos)
151
152 #print(packet)
153 packet = MACPacket(arp_reply)
154 packet.decode_remove_header()
155 errors += verify_packet(packet, arp_reply_infos)
156 packet.encode_header()
157 packet.decode_remove_header()
158 #print(packet)
159 errors += verify_packet(packet, arp_reply_infos)
160
161 print("mac errors " + str(errors))