etherbone_tb: add autocheck
[litex.git] / liteeth / test / model / etherbone.py
1 import math, copy
2
3 from liteeth.common import *
4 from liteeth.test.common import *
5
6 from liteeth.test.model import udp
7
8 def print_etherbone(s):
9 print_with_prefix(s, "[ETHERBONE]")
10
11 # Etherbone model
12 class EtherboneWrite:
13 def __init__(self, data):
14 self.data = data
15
16 def __repr__(self):
17 return "WR32 0x{:08x}".format(self.data)
18
19 class EtherboneRead:
20 def __init__(self, addr):
21 self.addr = addr
22
23 def __repr__(self):
24 return "RD32 @ 0x{:08x}".format(self.addr)
25
26 class EtherboneWrites(Packet):
27 def __init__(self, init=[], base_addr=0):
28 Packet.__init__(self, init)
29 self.base_addr = base_addr
30 self.writes = []
31 self.encoded = init != []
32
33 def add(self, write):
34 self.writes.append(write)
35
36 def encode(self):
37 if self.encoded:
38 raise ValueError
39 for byte in split_bytes(self.base_addr, 4):
40 self.append(byte)
41 for write in self.writes:
42 for byte in split_bytes(write.data, 4):
43 self.append(byte)
44 self.encoded = True
45
46 def decode(self):
47 if not self.encoded:
48 raise ValueError
49 base_addr = []
50 for i in range(4):
51 base_addr.append(self.pop(0))
52 self.base_addr = merge_bytes(base_addr)
53 self.writes = []
54 while len(self) != 0:
55 write = []
56 for i in range(4):
57 write.append(self.pop(0))
58 self.writes.append(EtherboneWrite(merge_bytes(write)))
59 self.encoded = False
60
61 def __repr__(self):
62 r = "Writes\n"
63 r += "--------\n"
64 r += "BaseAddr @ 0x{:08x}\n".format(self.base_addr)
65 for write in self.writes:
66 r += write.__repr__() + "\n"
67 return r
68
69 class EtherboneReads(Packet):
70 def __init__(self, init=[], base_ret_addr=0):
71 Packet.__init__(self, init)
72 self.base_ret_addr = base_ret_addr
73 self.reads = []
74 self.encoded = init != []
75
76 def add(self, read):
77 self.reads.append(read)
78
79 def encode(self):
80 if self.encoded:
81 raise ValueError
82 for byte in split_bytes(self.base_ret_addr, 4):
83 self.append(byte)
84 for read in self.reads:
85 for byte in split_bytes(read.addr, 4):
86 self.append(byte)
87 self.encoded = True
88
89 def decode(self):
90 if not self.encoded:
91 raise ValueError
92 base_ret_addr = []
93 for i in range(4):
94 base_ret_addr.append(self.pop(0))
95 self.base_ret_addr = merge_bytes(base_ret_addr)
96 self.reads = []
97 while len(self) != 0:
98 read = []
99 for i in range(4):
100 read.append(self.pop(0))
101 self.reads.append(EtherboneRead(merge_bytes(read)))
102 self.encoded = False
103
104 def __repr__(self):
105 r = "Reads\n"
106 r += "--------\n"
107 r += "BaseRetAddr @ 0x{:08x}\n".format(self.base_ret_addr)
108 for read in self.reads:
109 r += read.__repr__() + "\n"
110 return r
111
112 class EtherboneRecord(Packet):
113 def __init__(self, init=[]):
114 Packet.__init__(self, init)
115 self.writes = None
116 self.reads = None
117 self.encoded = init != []
118
119 def get_writes(self):
120 if self.wcount == 0:
121 return None
122 else:
123 writes = []
124 for i in range((self.wcount+1)*4):
125 writes.append(self.pop(0))
126 return EtherboneWrites(writes)
127
128 def get_reads(self):
129 if self.rcount == 0:
130 return None
131 else:
132 reads = []
133 for i in range((self.rcount+1)*4):
134 reads.append(self.pop(0))
135 return EtherboneReads(reads)
136
137 def decode(self):
138 if not self.encoded:
139 raise ValueError
140 header = []
141 for byte in self[:etherbone_record_header_len]:
142 header.append(self.pop(0))
143 for k, v in sorted(etherbone_record_header.items()):
144 setattr(self, k, get_field_data(v, header))
145 self.writes = self.get_writes()
146 if self.writes is not None:
147 self.writes.decode()
148 self.reads = self.get_reads()
149 if self.reads is not None:
150 self.reads.decode()
151 self.encoded = False
152
153 def set_writes(self, writes):
154 self.wcount = len(writes.writes)
155 writes.encode()
156 for byte in writes:
157 self.append(byte)
158
159 def set_reads(self, reads):
160 self.rcount = len(reads.reads)
161 reads.encode()
162 for byte in reads:
163 self.append(byte)
164
165 def encode(self):
166 if self.encoded:
167 raise ValueError
168 if self.writes is not None:
169 self.set_writes(self.writes)
170 if self.reads is not None:
171 self.set_reads(self.reads)
172 header = 0
173 for k, v in sorted(etherbone_record_header.items()):
174 value = merge_bytes(split_bytes(getattr(self, k), math.ceil(v.width/8)), "little")
175 header += (value << v.offset+(v.byte*8))
176 for d in split_bytes(header, etherbone_record_header_len):
177 self.insert(0, d)
178 self.encoded = True
179
180 def __repr__(self, n=0):
181 r = "Record {}\n".format(n)
182 r += "--------\n"
183 if self.encoded:
184 for d in self:
185 r += "{:02x}".format(d)
186 else:
187 for k in sorted(etherbone_record_header.keys()):
188 r += k + " : 0x{:0x}\n".format(getattr(self,k))
189 if self.wcount != 0:
190 r += self.writes.__repr__()
191 if self.rcount != 0:
192 r += self.reads.__repr__()
193 return r
194
195 class EtherbonePacket(Packet):
196 def __init__(self, init=[]):
197 Packet.__init__(self, init)
198 self.encoded = init != []
199 self.records = []
200
201 self.magic = etherbone_magic
202 self.version = etherbone_version
203 self.addr_size = 32//8
204 self.port_size = 32//8
205 self.nr = 0
206 self.pr = 0
207 self.pf = 0
208
209 def get_records(self):
210 records = []
211 done = False
212 payload = self
213 while len(payload) != 0:
214 record = EtherboneRecord(payload)
215 record.decode()
216 records.append(copy.deepcopy(record))
217 payload = record
218 return records
219
220 def decode(self):
221 if not self.encoded:
222 raise ValueError
223 header = []
224 for byte in self[:etherbone_packet_header_len]:
225 header.append(self.pop(0))
226 for k, v in sorted(etherbone_packet_header.items()):
227 setattr(self, k, get_field_data(v, header))
228 self.records = self.get_records()
229 self.encoded = False
230
231 def set_records(self, records):
232 for record in records:
233 record.encode()
234 for byte in record:
235 self.append(byte)
236
237 def encode(self):
238 if self.encoded:
239 raise ValueError
240 self.set_records(self.records)
241 header = 0
242 for k, v in sorted(etherbone_packet_header.items()):
243 value = merge_bytes(split_bytes(getattr(self, k), math.ceil(v.width/8)), "little")
244 header += (value << v.offset+(v.byte*8))
245 for d in split_bytes(header, etherbone_packet_header_len):
246 self.insert(0, d)
247 self.encoded = True
248
249 def __repr__(self):
250 r = "Packet\n"
251 r += "--------\n"
252 if self.encoded:
253 for d in self:
254 r += "{:02x}".format(d)
255 else:
256 for k in sorted(etherbone_packet_header.keys()):
257 r += k + " : 0x{:0x}\n".format(getattr(self,k))
258 for i, record in enumerate(self.records):
259 r += record.__repr__(i)
260 return r
261
262 class Etherbone(Module):
263 def __init__(self, udp, debug=False):
264 self.udp = udp
265 self.debug = debug
266 self.tx_packets = []
267 self.tx_packet = EtherbonePacket()
268 self.rx_packet = EtherbonePacket()
269
270 udp.set_etherbone_callback(self.callback)
271
272 def send(self, packet):
273 packet.encode()
274 if self.debug:
275 print_etherbone(">>>>>>>>")
276 print_etherbone(packet)
277 udp_packet = udp.UDPPacket(packet)
278 udp_packet.src_port = 0x1234 # XXX
279 udp_packet.dst_port = 20000 # XXX
280 udp_packet.length = len(packet)
281 udp_packet.checksum = 0
282 self.udp.send(udp_packet)
283
284 def receive(self):
285 self.rx_packet = EtherbonePacket()
286 while not self.rx_packet.done:
287 yield
288
289 def callback(self, packet):
290 packet = EtherbonePacket(packet)
291 packet.decode()
292 if self.debug:
293 print_etherbone("<<<<<<<<")
294 print_etherbone(packet)
295 self.rx_packet = packet
296 self.rx_packet.done = True
297 self.process(packet)
298
299 def process(self, packet):
300 pass
301
302 if __name__ == "__main__":
303 # Writes/Reads
304 writes = EtherboneWrites(base_addr=0x1000)
305 for i in range(16):
306 writes.add(EtherboneWrite(i))
307 reads = EtherboneReads(base_ret_addr=0x2000)
308 for i in range(16):
309 reads.add(EtherboneRead(i))
310
311 # Record
312 record = EtherboneRecord()
313 record.writes = writes
314 record.reads = reads
315 record.bca = 0
316 record.rca = 0
317 record.rff = 0
318 record.cyc = 0
319 record.wca = 0
320 record.wff = 0
321 record.byte_enable = 0
322 record.wcount = 16
323 record.rcount = 16
324
325 # Packet
326 packet = EtherbonePacket()
327 packet.records = [copy.deepcopy(record) for i in range(8)]
328 packet.nr = 0
329 packet.pr = 0
330 packet.pf = 0
331 #print(packet)
332 packet.encode()
333 #print(packet)
334
335 # Send packet over UDP to check against Wireshark dissector
336 import socket
337 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
338 sock.sendto(bytes(packet), ("192.168.1.1", 20000))
339
340 packet = EtherbonePacket(packet)
341 packet.decode()
342 print(packet)