3 from liteeth
.common
import *
4 from liteeth
.test
.common
import *
7 print_with_prefix(s
, "[MAC]")
9 preamble
= split_bytes(eth_preamble
, 8, "little")
13 crc_bytes
= split_bytes(binascii
.crc32(bytes(l
)), 4, "little")
14 for byte
in crc_bytes
:
19 class MACPacket(Packet
):
20 def __init__(self
, init
=[]):
21 Packet
.__init
__(self
, init
)
22 self
.preamble_error
= False
23 self
.crc_error
= False
25 def check_remove_preamble(self
):
26 if comp(self
[0:8], preamble
):
33 def check_remove_crc(self
):
34 if comp(self
[-4:], crc32(self
[:-4])):
41 def decode_remove_header(self
):
43 for byte
in self
[:mac_header_len
]:
44 header
.append(self
.pop(0))
45 for k
, v
in sorted(mac_header
.items()):
46 setattr(self
, k
, get_field_data(v
, header
))
49 self
.preamble_error
= self
.check_remove_preamble()
50 self
.crc_error
= self
.check_remove_crc()
51 if self
.crc_error
or self
.preamble_error
:
52 raise ValueError # XXX handle this properly
54 self
.decode_remove_header()
56 def encode_header(self
):
58 for k
, v
in sorted(mac_header
.items()):
59 value
= merge_bytes(split_bytes(getattr(self
, k
), math
.ceil(v
.width
/8)), "little")
60 header
+= (value
<< v
.offset
+(v
.byte
*8))
61 for d
in split_bytes(header
, mac_header_len
):
68 def insert_preamble(self
):
69 for d
in reversed(preamble
):
75 self
.insert_preamble()
79 for k
in sorted(mac_header
.keys()):
80 r
+= k
+ " : 0x%x" %getattr
(self
,k
) + "\n"
87 def __init__(self
, phy
, debug
=False, loopback
=False):
90 self
.loopback
= loopback
92 self
.tx_packet
= MACPacket()
93 self
.rx_packet
= MACPacket()
95 self
.ip_callback
= None
96 self
.arp_callback
= None
98 def set_ip_callback(self
, callback
):
99 self
.ip_callback
= callback
101 def set_arp_callback(self
, callback
):
102 self
.arp_callback
= callback
104 def send(self
, packet
):
106 print_mac(">>>>>>>>")
109 self
.tx_packets
.append(packet
)
111 def callback(self
, datas
):
112 packet
= MACPacket(datas
)
115 print_mac("<<<<<<<<")
120 if packet
.ethernet_type
== ethernet_type_ip
:
121 if self
.ip_callback
is not None:
122 self
.ip_callback(packet
)
123 elif packet
.ethernet_type
== ethernet_type_arp
:
124 if self
.arp_callback
is not None:
125 self
.arp_callback(packet
)
127 raise ValueError # XXX handle this properly
129 def gen_simulation(self
, selfp
):
130 self
.tx_packet
.done
= True
132 yield from self
.phy
.receive()
133 self
.callback(self
.phy
.packet
)
134 # XXX add full duplex
135 if len(self
.tx_packets
) != 0:
136 tx_packet
= self
.tx_packets
.pop(0)
137 yield from self
.phy
.send(tx_packet
)
139 if __name__
== "__main__":
140 from liteeth
.test
.model
.dumps
import *
142 packet
= MACPacket(arp_request
)
143 packet
.decode_remove_header()
145 errors
+= verify_packet(packet
, arp_request_infos
)
146 packet
.encode_header()
147 packet
.decode_remove_header()
149 errors
+= verify_packet(packet
, arp_request_infos
)
152 packet
= MACPacket(arp_reply
)
153 packet
.decode_remove_header()
154 errors
+= verify_packet(packet
, arp_reply_infos
)
155 packet
.encode_header()
156 packet
.decode_remove_header()
158 errors
+= verify_packet(packet
, arp_reply_infos
)
160 print("mac errors " + str(errors
))