bbd69011f1ac106fca590ddedd140b313a6e47ba
[litex.git] / litesata / test / cont_tb.py
1 from litesata.common import *
2 from litesata.core.link.cont import LiteSATACONTInserter, LiteSATACONTRemover
3
4 from litesata.test.common import *
5
6 class ContPacket(list):
7 def __init__(self, data=[]):
8 self.ongoing = False
9 self.done = False
10 for d in data:
11 self.append(d)
12
13 class ContStreamer(PacketStreamer):
14 def __init__(self):
15 PacketStreamer.__init__(self, phy_description(32), ContPacket)
16
17 def do_simulation(self, selfp):
18 PacketStreamer.do_simulation(self, selfp)
19 selfp.source.charisk = 0
20 # Note: for simplicity we generate charisk by detecting
21 # primitives in data
22 for k, v in primitives.items():
23 try:
24 if self.source_data == v:
25 selfp.source.charisk = 0b0001
26 except:
27 pass
28
29 class ContLogger(PacketLogger):
30 def __init__(self):
31 PacketLogger.__init__(self, phy_description(32), ContPacket)
32
33 class TB(Module):
34 def __init__(self):
35 self.streamer = ContStreamer()
36 self.streamer_randomizer = Randomizer(phy_description(32), level=50)
37 self.inserter = LiteSATACONTInserter(phy_description(32))
38 self.remover = LiteSATACONTRemover(phy_description(32))
39 self.logger_randomizer = Randomizer(phy_description(32), level=50)
40 self.logger = ContLogger()
41
42 self.pipeline = Pipeline(
43 self.streamer,
44 self.streamer_randomizer,
45 self.inserter,
46 self.remover,
47 self.logger_randomizer,
48 self.logger
49 )
50
51 def gen_simulation(self, selfp):
52 test_packet = ContPacket([
53 primitives["SYNC"],
54 primitives["SYNC"],
55 primitives["SYNC"],
56 primitives["SYNC"],
57 primitives["SYNC"],
58 primitives["SYNC"],
59 primitives["ALIGN"],
60 primitives["ALIGN"],
61 primitives["SYNC"],
62 primitives["SYNC"],
63 #primitives["SYNC"],
64 0x00000000,
65 0x00000001,
66 0x00000002,
67 0x00000003,
68 0x00000004,
69 0x00000005,
70 0x00000006,
71 0x00000007,
72 primitives["SYNC"],
73 primitives["SYNC"],
74 primitives["SYNC"],
75 primitives["SYNC"],
76 primitives["ALIGN"],
77 primitives["ALIGN"],
78 primitives["SYNC"],
79 primitives["SYNC"],
80 primitives["SYNC"],
81 primitives["SYNC"]]*4
82 )
83 streamer_packet = ContPacket(test_packet)
84 yield from self.streamer.send(streamer_packet)
85 yield from self.logger.receive(len(test_packet))
86 #for d in self.logger.packet:
87 # print("%08x" %d)
88
89 # check results
90 s, l, e = check(streamer_packet, self.logger.packet)
91 print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
92
93
94 if __name__ == "__main__":
95 run_simulation(TB(), ncycles=1024, vcd_name="my.vcd", keep_files=True)