74273cb95df9d0531b3a707f57a11b66bdc26d32
3 from liteeth
.common
import *
4 from liteeth
.mac
.common
import *
5 from liteeth
.test
.common
import *
8 print_with_prefix(s
, "[MAC]")
10 preamble
= split_bytes(eth_preamble
, 8, "little")
14 crc_bytes
= split_bytes(binascii
.crc32(bytes(l
)), 4, "little")
15 for byte
in crc_bytes
:
20 class MACPacket(Packet
):
21 def __init__(self
, init
=[]):
22 Packet
.__init
__(self
, init
)
23 self
.preamble_error
= False
24 self
.crc_error
= False
26 def check_remove_preamble(self
):
27 if comp(self
[0:8], preamble
):
34 def check_remove_crc(self
):
35 if comp(self
[-4:], crc32(self
[:-4])):
42 def decode_remove_header(self
):
44 for byte
in self
[:mac_header_len
]:
45 header
.append(self
.pop(0))
46 for k
, v
in sorted(mac_header
.items()):
47 setattr(self
, k
, get_field_data(v
, header
))
50 self
.preamble_error
= self
.check_remove_preamble()
51 self
.crc_error
= self
.check_remove_crc()
52 if self
.crc_error
or self
.preamble_error
:
53 raise ValueError # XXX handle this properly
55 self
.decode_remove_header()
57 def encode_header(self
):
59 for k
, v
in sorted(mac_header
.items()):
60 value
= merge_bytes(split_bytes(getattr(self
, k
), math
.ceil(v
.width
/8)), "little")
61 header
+= (value
<< v
.offset
+(v
.byte
*8))
62 for d
in split_bytes(header
, mac_header_len
):
69 def insert_preamble(self
):
70 for d
in reversed(preamble
):
76 self
.insert_preamble()
80 for k
in sorted(mac_header
.keys()):
81 r
+= k
+ " : 0x%x" %getattr
(self
,k
) + "\n"
88 def __init__(self
, phy
, debug
=False, loopback
=False):
91 self
.loopback
= loopback
93 self
.tx_packet
= MACPacket()
94 self
.rx_packet
= MACPacket()
96 self
.ip_callback
= None
97 self
.arp_callback
= None
99 def set_ip_callback(self
, callback
):
100 self
.ip_callback
= callback
102 def set_arp_callback(self
, callback
):
103 self
.arp_callback
= callback
105 def send(self
, packet
):
107 print_mac(">>>>>>>>")
110 self
.tx_packets
.append(packet
)
112 def callback(self
, datas
):
113 packet
= MACPacket(datas
)
116 print_mac("<<<<<<<<")
121 if packet
.ethernet_type
== ethernet_type_ip
:
122 if self
.ip_callback
is not None:
123 self
.ip_callback(packet
)
124 elif packet
.ethernet_type
== ethernet_type_arp
:
125 if self
.arp_callback
is not None:
126 self
.arp_callback(packet
)
128 raise ValueError # XXX handle this properly
130 def gen_simulation(self
, selfp
):
131 self
.tx_packet
.done
= True
133 yield from self
.phy
.receive()
134 self
.callback(self
.phy
.packet
)
135 # XXX add full duplex
136 if len(self
.tx_packets
) != 0:
137 tx_packet
= self
.tx_packets
.pop(0)
138 yield from self
.phy
.send(tx_packet
)
140 if __name__
== "__main__":
141 from liteeth
.test
.model
.dumps
import *
143 packet
= MACPacket(arp_request
)
144 packet
.decode_remove_header()
146 errors
+= verify_packet(packet
, arp_request_infos
)
147 packet
.encode_header()
148 packet
.decode_remove_header()
150 errors
+= verify_packet(packet
, arp_request_infos
)
153 packet
= MACPacket(arp_reply
)
154 packet
.decode_remove_header()
155 errors
+= verify_packet(packet
, arp_reply_infos
)
156 packet
.encode_header()
157 packet
.decode_remove_header()
159 errors
+= verify_packet(packet
, arp_reply_infos
)
161 print("mac errors " + str(errors
))