813691d738f7d524df8ca3a57094a8c302e9fbe5
3 from migen
.fhdl
.std
import *
4 from migen
.flow
.actor
import *
5 from migen
.fhdl
.specials
import *
7 from migen
.sim
.generic
import run_simulation
9 from misoclib
.com
.liteusb
.common
import *
10 from misoclib
.com
.liteusb
.core
import LiteUSBCore
11 from misoclib
.com
.liteusb
.test
.common
import *
13 # XXX for now use it from liteeth to avoid duplication
14 from misoclib
.com
.liteeth
.test
.common
import *
18 crc_bytes
= split_bytes(binascii
.crc32(bytes(l
)), 4, "little")
19 for byte
in crc_bytes
:
24 class USBPacket(Packet
):
25 def __init__(self
, init
=[]):
26 Packet
.__init
__(self
, init
)
27 self
.crc_error
= False
29 def check_remove_crc(self
):
30 if comp(self
[-4:], crc32(self
[:-4])):
37 def decode_remove_header(self
):
39 for byte
in self
[:packet_header
.length
]:
40 header
.append(self
.pop(0))
41 for k
, v
in sorted(packet_header
.fields
.items()):
42 setattr(self
, k
, get_field_data(v
, header
))
45 # XXX Header should be protected by CRC
46 self
.decode_remove_header()
47 self
.crc_error
= self
.check_remove_crc()
49 raise ValueError # XXX handle this properly
51 def encode_header(self
):
53 for k
, v
in sorted(packet_header
.fields
.items()):
54 value
= merge_bytes(split_bytes(getattr(self
, k
),
55 math
.ceil(v
.width
/8)),
57 header
+= (value
<< v
.offset
+(v
.byte
*8))
58 for d
in split_bytes(header
, packet_header
.length
):
66 # XXX Header should be protected by CRC
72 for k
in sorted(packet_header
.fields
.keys()):
73 r
+= k
+ " : 0x{:0x}\n".format(getattr(self
, k
))
76 r
+= "{:02x}".format(d
)
80 class PHYModel(Module
):
82 self
.sink
= Sink(phy_description(8))
83 self
.source
= Source(phy_description(8))
87 self
.submodules
.phy
= PHYModel()
88 self
.submodules
.core
= LiteUSBCore(self
.phy
)
90 self
.submodules
.phy_streamer
= PacketStreamer(phy_description(8))
91 self
.submodules
.phy_streamer_randomizer
= AckRandomizer(phy_description(8), level
=0)
93 self
.submodules
.phy_logger_randomizer
= AckRandomizer(phy_description(8), level
=0)
94 self
.submodules
.phy_logger
= PacketLogger(phy_description(8))
96 self
.submodules
.core_streamer
= PacketStreamer(user_description(8))
97 self
.submodules
.core_streamer_randomizer
= AckRandomizer(user_description(8), level
=10)
99 self
.submodules
.core_logger
= PacketLogger(user_description(8))
100 self
.submodules
.core_logger_randomizer
= AckRandomizer(user_description(8), level
=10)
103 user_port
= self
.core
.crossbar
.get_port(0x12)
107 Record
.connect(self
.phy_streamer
.source
, self
.phy_streamer_randomizer
.sink
),
108 Record
.connect(self
.phy_streamer_randomizer
.source
, self
.phy
.source
),
110 Record
.connect(self
.core_streamer
.source
, self
.core_streamer_randomizer
.sink
),
111 Record
.connect(self
.core_streamer_randomizer
.source
, user_port
.sink
),
113 Record
.connect(user_port
.source
, self
.core_logger_randomizer
.sink
),
114 Record
.connect(self
.core_logger_randomizer
.source
, self
.core_logger
.sink
),
116 Record
.connect(self
.phy
.sink
, self
.phy_logger_randomizer
.sink
),
117 Record
.connect(self
.phy_logger_randomizer
.source
, self
.phy_logger
.sink
)
120 def gen_simulation(self
, selfp
):
121 packet
= USBPacket([i
for i
in range(128)])
122 packet
.preamble
= 0x5AA55AA5
124 packet
.length
= 128 + 4
126 yield from self
.phy_streamer
.send(packet
)
129 print(self
.core_logger
.packet
)
131 selfp
.core_streamer
.source
.dst
= 0x12
132 selfp
.core_streamer
.source
.length
= 128 + 4
133 packet
= Packet([i
for i
in range(128)])
134 yield from self
.core_streamer
.send(packet
)
137 for d
in self
.phy_logger
.packet
:
138 print("%02x" %d, end
="")
140 packet
= USBPacket(self
.phy_logger
.packet
)
145 run_simulation(TB(), ncycles
=2000, vcd_name
="my.vcd", keep_files
=True)
147 if __name__
== "__main__":