813691d738f7d524df8ca3a57094a8c302e9fbe5
[litex.git] / misoclib / com / liteusb / test / core_tb.py
1 import binascii
2
3 from migen.fhdl.std import *
4 from migen.flow.actor import *
5 from migen.fhdl.specials import *
6
7 from migen.sim.generic import run_simulation
8
9 from misoclib.com.liteusb.common import *
10 from misoclib.com.liteusb.core import LiteUSBCore
11 from misoclib.com.liteusb.test.common import *
12
13 # XXX for now use it from liteeth to avoid duplication
14 from misoclib.com.liteeth.test.common import *
15
16 def crc32(l):
17 crc = []
18 crc_bytes = split_bytes(binascii.crc32(bytes(l)), 4, "little")
19 for byte in crc_bytes:
20 crc.append(int(byte))
21 return crc
22
23
24 class USBPacket(Packet):
25 def __init__(self, init=[]):
26 Packet.__init__(self, init)
27 self.crc_error = False
28
29 def check_remove_crc(self):
30 if comp(self[-4:], crc32(self[:-4])):
31 for i in range(4):
32 self.pop()
33 return False
34 else:
35 return True
36
37 def decode_remove_header(self):
38 header = []
39 for byte in self[:packet_header.length]:
40 header.append(self.pop(0))
41 for k, v in sorted(packet_header.fields.items()):
42 setattr(self, k, get_field_data(v, header))
43
44 def decode(self):
45 # XXX Header should be protected by CRC
46 self.decode_remove_header()
47 self.crc_error = self.check_remove_crc()
48 if self.crc_error:
49 raise ValueError # XXX handle this properly
50
51 def encode_header(self):
52 header = 0
53 for k, v in sorted(packet_header.fields.items()):
54 value = merge_bytes(split_bytes(getattr(self, k),
55 math.ceil(v.width/8)),
56 "little")
57 header += (value << v.offset+(v.byte*8))
58 for d in split_bytes(header, packet_header.length):
59 self.insert(0, d)
60
61 def insert_crc(self):
62 for d in crc32(self):
63 self.append(d)
64
65 def encode(self):
66 # XXX Header should be protected by CRC
67 self.insert_crc()
68 self.encode_header()
69
70 def __repr__(self):
71 r = "--------\n"
72 for k in sorted(packet_header.fields.keys()):
73 r += k + " : 0x{:0x}\n".format(getattr(self, k))
74 r += "payload: "
75 for d in self:
76 r += "{:02x}".format(d)
77 return r
78
79
80 class PHYModel(Module):
81 def __init__(self):
82 self.sink = Sink(phy_description(8))
83 self.source = Source(phy_description(8))
84
85 class TB(Module):
86 def __init__(self):
87 self.submodules.phy = PHYModel()
88 self.submodules.core = LiteUSBCore(self.phy)
89
90 self.submodules.phy_streamer = PacketStreamer(phy_description(8))
91 self.submodules.phy_streamer_randomizer = AckRandomizer(phy_description(8), level=0)
92
93 self.submodules.phy_logger_randomizer = AckRandomizer(phy_description(8), level=0)
94 self.submodules.phy_logger = PacketLogger(phy_description(8))
95
96 self.submodules.core_streamer = PacketStreamer(user_description(8))
97 self.submodules.core_streamer_randomizer = AckRandomizer(user_description(8), level=10)
98
99 self.submodules.core_logger = PacketLogger(user_description(8))
100 self.submodules.core_logger_randomizer = AckRandomizer(user_description(8), level=10)
101
102
103 user_port = self.core.crossbar.get_port(0x12)
104
105
106 self.comb += [
107 Record.connect(self.phy_streamer.source, self.phy_streamer_randomizer.sink),
108 Record.connect(self.phy_streamer_randomizer.source, self.phy.source),
109
110 Record.connect(self.core_streamer.source, self.core_streamer_randomizer.sink),
111 Record.connect(self.core_streamer_randomizer.source, user_port.sink),
112
113 Record.connect(user_port.source, self.core_logger_randomizer.sink),
114 Record.connect(self.core_logger_randomizer.source, self.core_logger.sink),
115
116 Record.connect(self.phy.sink, self.phy_logger_randomizer.sink),
117 Record.connect(self.phy_logger_randomizer.source, self.phy_logger.sink)
118 ]
119
120 def gen_simulation(self, selfp):
121 packet = USBPacket([i for i in range(128)])
122 packet.preamble = 0x5AA55AA5
123 packet.dst = 0x12
124 packet.length = 128 + 4
125 packet.encode()
126 yield from self.phy_streamer.send(packet)
127 for i in range(32):
128 yield
129 print(self.core_logger.packet)
130
131 selfp.core_streamer.source.dst = 0x12
132 selfp.core_streamer.source.length = 128 + 4
133 packet = Packet([i for i in range(128)])
134 yield from self.core_streamer.send(packet)
135 for i in range(32):
136 yield
137 for d in self.phy_logger.packet:
138 print("%02x" %d, end="")
139 print("")
140 packet = USBPacket(self.phy_logger.packet)
141 packet.decode()
142 print(packet)
143
144 def main():
145 run_simulation(TB(), ncycles=2000, vcd_name="my.vcd", keep_files=True)
146
147 if __name__ == "__main__":
148 main()