change name to LiteEth (LiteEthernet is too long...)
[litex.git] / liteeth / mac / test / common.py
1 import random, copy
2
3 from migen.fhdl.std import *
4 from migen.flow.actor import Sink, Source
5 from migen.genlib.record import *
6
7 from misoclib.ethmac.common import *
8
9 def seed_to_data(seed, random=True):
10 if random:
11 return (seed * 0x31415979 + 1) & 0xffffffff
12 else:
13 return seed
14
15 def check(p1, p2):
16 p1 = copy.deepcopy(p1)
17 p2 = copy.deepcopy(p2)
18 if isinstance(p1, int):
19 return 0, 1, int(p1 != p2)
20 else:
21 if len(p1) >= len(p2):
22 ref, res = p1, p2
23 else:
24 ref, res = p2, p1
25 shift = 0
26 while((ref[0] != res[0]) and (len(res)>1)):
27 res.pop(0)
28 shift += 1
29 length = min(len(ref), len(res))
30 errors = 0
31 for i in range(length):
32 if ref.pop(0) != res.pop(0):
33 errors += 1
34 return shift, length, errors
35
36 def randn(max_n):
37 return random.randint(0, max_n-1)
38
39 class Packet(list):
40 def __init__(self, init=[]):
41 self.ongoing = False
42 self.done = False
43 for data in init:
44 self.append(data)
45
46 class PacketStreamer(Module):
47 def __init__(self, description):
48 self.source = Source(description)
49 ###
50 self.packets = []
51 self.packet = Packet()
52 self.packet.done = 1
53
54 def send(self, packet):
55 packet = copy.deepcopy(packet)
56 self.packets.append(packet)
57
58 def do_simulation(self, selfp):
59 if len(self.packets) and self.packet.done:
60 self.packet = self.packets.pop(0)
61 if not self.packet.ongoing and not self.packet.done:
62 selfp.source.stb = 1
63 selfp.source.sop = 1
64 selfp.source.d = self.packet.pop(0)
65 self.packet.ongoing = True
66 elif selfp.source.stb == 1 and selfp.source.ack == 1:
67 selfp.source.sop = 0
68 selfp.source.eop = (len(self.packet) == 1)
69 if len(self.packet) > 0:
70 selfp.source.stb = 1
71 selfp.source.d = self.packet.pop(0)
72 else:
73 self.packet.done = 1
74 selfp.source.stb = 0
75
76 class PacketLogger(Module):
77 def __init__(self, description):
78 self.sink = Sink(description)
79 ###
80 self.packet = Packet()
81
82 def receive(self):
83 self.packet.done = 0
84 while self.packet.done == 0:
85 yield
86
87 def do_simulation(self, selfp):
88 selfp.sink.ack = 1
89 if selfp.sink.stb == 1 and selfp.sink.sop == 1:
90 self.packet = Packet()
91 self.packet.append(selfp.sink.d)
92 elif selfp.sink.stb:
93 self.packet.append(selfp.sink.d)
94 if selfp.sink.stb == 1 and selfp.sink.eop == 1:
95 self.packet.done = True
96
97 class AckRandomizer(Module):
98 def __init__(self, description, level=0):
99 self.level = level
100
101 self.sink = Sink(description)
102 self.source = Source(description)
103
104 self.run = Signal()
105
106 self.comb += \
107 If(self.run,
108 Record.connect(self.sink, self.source)
109 ).Else(
110 self.source.stb.eq(0),
111 self.sink.ack.eq(0),
112 )
113
114 def do_simulation(self, selfp):
115 n = randn(100)
116 if n < self.level:
117 selfp.run = 0
118 else:
119 selfp.run = 1
120