udpip_tb: able to send valid UDP msg to model
[litex.git] / liteeth / test / common.py
1 import random, copy
2
3 from migen.fhdl.std import *
4 from migen.flow.actor import Sink, Source
5 from migen.genlib.record import *
6
7 from liteeth.common import *
8
9 def print_with_prefix(s, prefix=""):
10 if not isinstance(s, str):
11 s = s.__repr__()
12 s = s.split("\n")
13 for l in s:
14 print(prefix + l)
15
16 def seed_to_data(seed, random=True):
17 if random:
18 return (seed * 0x31415979 + 1) & 0xffffffff
19 else:
20 return seed
21
22 def split_bytes(v, n, endianness="big"):
23 r = []
24 r_bytes = v.to_bytes(n, byteorder=endianness)
25 for byte in r_bytes:
26 r.append(int(byte))
27 return r
28
29 def merge_bytes(b, endianness="big"):
30 return int.from_bytes(bytes(b), endianness)
31
32 def get_field_data(field, datas):
33 v = merge_bytes(datas[field.byte:field.byte+math.ceil(field.width/8)])
34 return (v >> field.offset) & (2**field.width-1)
35
36 def comp(p1, p2):
37 r = True
38 for x, y in zip(p1, p2):
39 if x != y:
40 r = False
41 return r
42
43 def check(p1, p2):
44 p1 = copy.deepcopy(p1)
45 p2 = copy.deepcopy(p2)
46 if isinstance(p1, int):
47 return 0, 1, int(p1 != p2)
48 else:
49 if len(p1) >= len(p2):
50 ref, res = p1, p2
51 else:
52 ref, res = p2, p1
53 shift = 0
54 while((ref[0] != res[0]) and (len(res)>1)):
55 res.pop(0)
56 shift += 1
57 length = min(len(ref), len(res))
58 errors = 0
59 for i in range(length):
60 if ref.pop(0) != res.pop(0):
61 errors += 1
62 return shift, length, errors
63
64 def randn(max_n):
65 return random.randint(0, max_n-1)
66
67 class Packet(list):
68 def __init__(self, init=[]):
69 self.ongoing = False
70 self.done = False
71 for data in init:
72 self.append(data)
73
74 class PacketStreamer(Module):
75 def __init__(self, description, last_be=None):
76 self.source = Source(description)
77 self.last_be = last_be
78 ###
79 self.packets = []
80 self.packet = Packet()
81 self.packet.done = True
82
83 def send(self, packet):
84 packet = copy.deepcopy(packet)
85 self.packets.append(packet)
86 while not packet.done:
87 yield
88
89 def do_simulation(self, selfp):
90 if len(self.packets) and self.packet.done:
91 self.packet = self.packets.pop(0)
92 if not self.packet.ongoing and not self.packet.done:
93 selfp.source.stb = 1
94 selfp.source.sop = 1
95 selfp.source.data = self.packet.pop(0)
96 self.packet.ongoing = True
97 elif selfp.source.stb == 1 and selfp.source.ack == 1:
98 selfp.source.sop = 0
99 if len(self.packet) == 1:
100 selfp.source.eop = 1
101 if self.last_be is not None:
102 selfp.source.last_be = self.last_be
103 else:
104 selfp.source.eop = 0
105 if self.last_be is not None:
106 selfp.source.last_be = 0
107 if len(self.packet) > 0:
108 selfp.source.stb = 1
109 selfp.source.data = self.packet.pop(0)
110 else:
111 self.packet.done = True
112 selfp.source.stb = 0
113
114 class PacketLogger(Module):
115 def __init__(self, description):
116 self.sink = Sink(description)
117 ###
118 self.packet = Packet()
119
120 def receive(self):
121 self.packet.done = False
122 while not self.packet.done:
123 yield
124
125 def do_simulation(self, selfp):
126 selfp.sink.ack = 1
127 if selfp.sink.stb == 1 and selfp.sink.sop == 1:
128 self.packet = Packet()
129 self.packet.append(selfp.sink.data)
130 elif selfp.sink.stb:
131 self.packet.append(selfp.sink.data)
132 if selfp.sink.stb == 1 and selfp.sink.eop == 1:
133 self.packet.done = True
134
135 class AckRandomizer(Module):
136 def __init__(self, description, level=0):
137 self.level = level
138
139 self.sink = Sink(description)
140 self.source = Source(description)
141
142 self.run = Signal()
143
144 self.comb += \
145 If(self.run,
146 Record.connect(self.sink, self.source)
147 ).Else(
148 self.source.stb.eq(0),
149 self.sink.ack.eq(0),
150 )
151
152 def do_simulation(self, selfp):
153 n = randn(100)
154 if n < self.level:
155 selfp.run = 0
156 else:
157 selfp.run = 1
158