1 # This file is Copyright (c) 2020 Antmicro <www.antmicro.com>
9 from litex
.soc
.interconnect
.axi
import *
10 from litex
.soc
.interconnect
import wishbone
, csr_bus
12 # Helpers ------------------------------------------------------------------------------------------
14 def _int_or_call(int_or_func
):
15 if callable(int_or_func
):
20 def timeout_generator(ticks
):
22 for i
in range(ticks
):
23 if os
.environ
.get("TIMEOUT_DEBUG", "") == "1":
24 print("tick {}".format(i
))
26 raise TimeoutError("Timeout after %d ticks" % ticks
)
29 def __init__(self
, ready_latency
=0, response_latency
=0, rdata_generator
=None):
30 self
.ready_latency
= ready_latency
31 self
.response_latency
= response_latency
32 self
.rdata_generator
= rdata_generator
or (lambda adr
: 0xbaadc0de)
33 self
.writes
= [] # (addr, data, strb)
34 self
.reads
= [] # (addr, data)
36 def delay(self
, latency
):
37 for _
in range(_int_or_call(latency
)):
40 def handle_write(self
, axi_lite
):
42 while not (yield axi_lite
.aw
.valid
):
44 yield from self
.delay(self
.ready_latency
)
45 addr
= (yield axi_lite
.aw
.addr
)
46 yield axi_lite
.aw
.ready
.eq(1)
48 yield axi_lite
.aw
.ready
.eq(0)
49 while not (yield axi_lite
.w
.valid
):
51 yield from self
.delay(self
.ready_latency
)
53 data
= (yield axi_lite
.w
.data
)
54 strb
= (yield axi_lite
.w
.strb
)
55 yield axi_lite
.w
.ready
.eq(1)
57 yield axi_lite
.w
.ready
.eq(0)
58 yield from self
.delay(self
.response_latency
)
60 yield axi_lite
.b
.valid
.eq(1)
61 yield axi_lite
.b
.resp
.eq(RESP_OKAY
)
63 while not (yield axi_lite
.b
.ready
):
65 yield axi_lite
.b
.valid
.eq(0)
66 self
.writes
.append((addr
, data
, strb
))
68 def handle_read(self
, axi_lite
):
70 while not (yield axi_lite
.ar
.valid
):
72 yield from self
.delay(self
.ready_latency
)
73 addr
= (yield axi_lite
.ar
.addr
)
74 yield axi_lite
.ar
.ready
.eq(1)
76 yield axi_lite
.ar
.ready
.eq(0)
77 yield from self
.delay(self
.response_latency
)
79 data
= self
.rdata_generator(addr
)
80 yield axi_lite
.r
.valid
.eq(1)
81 yield axi_lite
.r
.resp
.eq(RESP_OKAY
)
82 yield axi_lite
.r
.data
.eq(data
)
84 while not (yield axi_lite
.r
.ready
):
86 yield axi_lite
.r
.valid
.eq(0)
87 yield axi_lite
.r
.data
.eq(0)
88 self
.reads
.append((addr
, data
))
91 def handler(self
, axi_lite
):
93 if (yield axi_lite
.aw
.valid
):
94 yield from self
.handle_write(axi_lite
)
95 if (yield axi_lite
.ar
.valid
):
96 yield from self
.handle_read(axi_lite
)
100 def _write_handler(self
, axi_lite
):
102 yield from self
.handle_write(axi_lite
)
106 def _read_handler(self
, axi_lite
):
108 yield from self
.handle_read(axi_lite
)
111 def parallel_handlers(self
, axi_lite
):
112 return self
._write
_handler
(axi_lite
), self
._read
_handler
(axi_lite
)
114 class AXILitePatternGenerator
:
115 def __init__(self
, axi_lite
, pattern
, delay
=0):
116 # patter: (rw, addr, data)
117 self
.axi_lite
= axi_lite
118 self
.pattern
= pattern
121 self
.read_errors
= []
122 self
.resp_errors
= {"w": 0, "r": 0}
125 for rw
, addr
, data
in self
.pattern
:
126 assert rw
in ["w", "r"]
128 strb
= 2**len(self
.axi_lite
.w
.strb
) - 1
129 resp
= (yield from self
.axi_lite
.write(addr
, data
, strb
))
131 rdata
, resp
= (yield from self
.axi_lite
.read(addr
))
133 self
.read_errors
.append((rdata
, data
))
135 if resp
!= RESP_OKAY
:
136 self
.resp_errors
[rw
] += 1
138 for _
in range(_int_or_call(self
.delay
)):
143 # TestAXILite --------------------------------------------------------------------------------------
145 class TestAXILite(unittest
.TestCase
):
146 def test_wishbone2axi2wishbone(self
):
149 self
.wishbone
= wishbone
.Interface(data_width
=32)
153 axi
= AXILiteInterface(data_width
=32, address_width
=32)
154 wb
= wishbone
.Interface(data_width
=32)
156 wishbone2axi
= Wishbone2AXILite(self
.wishbone
, axi
)
157 axi2wishbone
= AXILite2Wishbone(axi
, wb
)
158 self
.submodules
+= wishbone2axi
, axi2wishbone
160 sram
= wishbone
.SRAM(1024, init
=[0x12345678, 0xa55aa55a])
161 self
.submodules
+= sram
162 self
.comb
+= wb
.connect(sram
.bus
)
166 if (yield from dut
.wishbone
.read(0)) != 0x12345678:
168 if (yield from dut
.wishbone
.read(1)) != 0xa55aa55a:
171 yield from dut
.wishbone
.write(i
, i
)
173 if (yield from dut
.wishbone
.read(i
)) != i
:
177 run_simulation(dut
, [generator(dut
)])
178 self
.assertEqual(dut
.errors
, 0)
180 def test_axilite2csr(self
):
182 def csr_mem_handler(csr
, mem
):
184 adr
= (yield csr
.adr
)
185 yield csr
.dat_r
.eq(mem
[adr
])
187 mem
[adr
] = (yield csr
.dat_w
)
192 self
.axi_lite
= AXILiteInterface()
193 self
.csr
= csr_bus
.Interface()
194 self
.submodules
.axilite2csr
= AXILite2CSR(self
.axi_lite
, self
.csr
)
197 prng
= random
.Random(42)
198 mem_ref
= [prng
.randrange(255) for i
in range(100)]
203 for adr
, ref
in enumerate(mem_ref
):
205 data
, resp
= (yield from dut
.axi_lite
.read(adr
))
206 self
.assertEqual(resp
, 0b00)
210 write_data
= [prng
.randrange(255) for _
in mem_ref
]
212 for adr
, wdata
in enumerate(write_data
):
214 resp
= (yield from dut
.axi_lite
.write(adr
, wdata
))
215 self
.assertEqual(resp
, 0b00)
216 rdata
, resp
= (yield from dut
.axi_lite
.read(adr
))
217 self
.assertEqual(resp
, 0b00)
222 mem
= [v
for v
in mem_ref
]
223 run_simulation(dut
, [generator(dut
), csr_mem_handler(dut
.csr
, mem
)])
224 self
.assertEqual(dut
.errors
, 0)
226 def test_axilite_sram(self
):
228 def __init__(self
, size
, init
):
229 self
.axi_lite
= AXILiteInterface()
230 self
.submodules
.sram
= AXILiteSRAM(size
, init
=init
, bus
=self
.axi_lite
)
233 def generator(dut
, ref_init
):
234 for adr
, ref
in enumerate(ref_init
):
236 data
, resp
= (yield from dut
.axi_lite
.read(adr
))
237 self
.assertEqual(resp
, 0b00)
241 write_data
= [prng
.randrange(255) for _
in ref_init
]
243 for adr
, wdata
in enumerate(write_data
):
245 resp
= (yield from dut
.axi_lite
.write(adr
, wdata
))
246 self
.assertEqual(resp
, 0b00)
247 rdata
, resp
= (yield from dut
.axi_lite
.read(adr
))
248 self
.assertEqual(resp
, 0b00)
252 prng
= random
.Random(42)
253 init
= [prng
.randrange(2**32) for i
in range(100)]
255 dut
= DUT(size
=len(init
)*4, init
=[v
for v
in init
])
256 run_simulation(dut
, [generator(dut
, init
)])
257 self
.assertEqual(dut
.errors
, 0)
259 def converter_test(self
, width_from
, width_to
, parallel_rw
=False,
260 write_pattern
=None, write_expected
=None,
261 read_pattern
=None, read_expected
=None):
262 assert not (write_pattern
is None and read_pattern
is None)
264 if write_pattern
is None:
267 elif len(write_pattern
[0]) == 2:
269 write_pattern
= [(adr
, data
, 2**(width_from
//8)-1) for adr
, data
in write_pattern
]
271 if read_pattern
is None:
276 def __init__(self
, width_from
, width_to
):
277 self
.master
= AXILiteInterface(data_width
=width_from
)
278 self
.slave
= AXILiteInterface(data_width
=width_to
)
279 self
.submodules
.converter
= AXILiteConverter(self
.master
, self
.slave
)
281 prng
= random
.Random(42)
283 def write_generator(axi_lite
):
284 for addr
, data
, strb
in write_pattern
or []:
285 resp
= (yield from axi_lite
.write(addr
, data
, strb
))
286 self
.assertEqual(resp
, RESP_OKAY
)
287 for _
in range(prng
.randrange(3)):
292 def read_generator(axi_lite
):
293 for addr
, refdata
in read_pattern
or []:
294 data
, resp
= (yield from axi_lite
.read(addr
))
295 self
.assertEqual(resp
, RESP_OKAY
)
296 self
.assertEqual(data
, refdata
)
297 for _
in range(prng
.randrange(3)):
302 def sequential_generator(axi_lite
):
303 yield from write_generator(axi_lite
)
304 yield from read_generator(axi_lite
)
306 def rdata_generator(adr
):
307 for a
, v
in read_expected
:
315 _latency
= (_latency
+ 1) % 3
318 dut
= DUT(width_from
=width_from
, width_to
=width_to
)
319 checker
= AXILiteChecker(ready_latency
=latency
, rdata_generator
=rdata_generator
)
321 generators
= [write_generator(dut
.master
), read_generator(dut
.master
)]
323 generators
= [sequential_generator(dut
.master
)]
324 generators
+= checker
.parallel_handlers(dut
.slave
)
325 run_simulation(dut
, generators
)
326 self
.assertEqual(checker
.writes
, write_expected
)
327 self
.assertEqual(checker
.reads
, read_expected
)
329 def test_axilite_down_converter_32to16(self
):
331 (0x00000000, 0x22221111),
332 (0x00000004, 0x44443333),
333 (0x00000008, 0x66665555),
334 (0x00000100, 0x88887777),
337 (0x00000000, 0x1111, 0b11),
338 (0x00000002, 0x2222, 0b11),
339 (0x00000004, 0x3333, 0b11),
340 (0x00000006, 0x4444, 0b11),
341 (0x00000008, 0x5555, 0b11),
342 (0x0000000a, 0x6666, 0b11),
343 (0x00000100, 0x7777, 0b11),
344 (0x00000102, 0x8888, 0b11),
346 read_pattern
= write_pattern
347 read_expected
= [(adr
, data
) for (adr
, data
, _
) in write_expected
]
348 for parallel
in [False, True]:
349 with self
.subTest(parallel
=parallel
):
350 self
.converter_test(width_from
=32, width_to
=16, parallel_rw
=parallel
,
351 write_pattern
=write_pattern
, write_expected
=write_expected
,
352 read_pattern
=read_pattern
, read_expected
=read_expected
)
354 def test_axilite_down_converter_32to8(self
):
356 (0x00000000, 0x44332211),
357 (0x00000004, 0x88776655),
360 (0x00000000, 0x11, 0b1),
361 (0x00000001, 0x22, 0b1),
362 (0x00000002, 0x33, 0b1),
363 (0x00000003, 0x44, 0b1),
364 (0x00000004, 0x55, 0b1),
365 (0x00000005, 0x66, 0b1),
366 (0x00000006, 0x77, 0b1),
367 (0x00000007, 0x88, 0b1),
369 read_pattern
= write_pattern
370 read_expected
= [(adr
, data
) for (adr
, data
, _
) in write_expected
]
371 for parallel
in [False, True]:
372 with self
.subTest(parallel
=parallel
):
373 self
.converter_test(width_from
=32, width_to
=8, parallel_rw
=parallel
,
374 write_pattern
=write_pattern
, write_expected
=write_expected
,
375 read_pattern
=read_pattern
, read_expected
=read_expected
)
377 def test_axilite_down_converter_64to32(self
):
379 (0x00000000, 0x2222222211111111),
380 (0x00000008, 0x4444444433333333),
383 (0x00000000, 0x11111111, 0b1111),
384 (0x00000004, 0x22222222, 0b1111),
385 (0x00000008, 0x33333333, 0b1111),
386 (0x0000000c, 0x44444444, 0b1111),
388 read_pattern
= write_pattern
389 read_expected
= [(adr
, data
) for (adr
, data
, _
) in write_expected
]
390 for parallel
in [False, True]:
391 with self
.subTest(parallel
=parallel
):
392 self
.converter_test(width_from
=64, width_to
=32, parallel_rw
=parallel
,
393 write_pattern
=write_pattern
, write_expected
=write_expected
,
394 read_pattern
=read_pattern
, read_expected
=read_expected
)
396 def test_axilite_down_converter_strb(self
):
398 (0x00000000, 0x22221111, 0b1100),
399 (0x00000004, 0x44443333, 0b1111),
400 (0x00000008, 0x66665555, 0b1011),
401 (0x00000100, 0x88887777, 0b0011),
404 (0x00000002, 0x2222, 0b11),
405 (0x00000004, 0x3333, 0b11),
406 (0x00000006, 0x4444, 0b11),
407 (0x00000008, 0x5555, 0b11),
408 (0x0000000a, 0x6666, 0b10),
409 (0x00000100, 0x7777, 0b11),
411 self
.converter_test(width_from
=32, width_to
=16,
412 write_pattern
=write_pattern
, write_expected
=write_expected
)
414 # TestAXILiteInterconnet ---------------------------------------------------------------------------
416 class TestAXILiteInterconnect(unittest
.TestCase
):
417 def test_interconnect_p2p(self
):
420 self
.master
= master
= AXILiteInterface()
421 self
.slave
= slave
= AXILiteInterface()
422 self
.submodules
.interconnect
= AXILiteInterconnectPointToPoint(master
, slave
)
425 ("w", 0x00000004, 0x11111111),
426 ("w", 0x0000000c, 0x22222222),
427 ("r", 0x00000010, 0x33333333),
428 ("r", 0x00000018, 0x44444444),
431 def rdata_generator(adr
):
432 for rw
, a
, v
in pattern
:
433 if rw
== "r" and a
== adr
:
438 checker
= AXILiteChecker(rdata_generator
=rdata_generator
)
440 AXILitePatternGenerator(dut
.master
, pattern
).handler(),
441 checker
.handler(dut
.slave
),
443 run_simulation(dut
, generators
)
444 self
.assertEqual(checker
.writes
, [(addr
, data
, 0b1111) for rw
, addr
, data
in pattern
if rw
== "w"])
445 self
.assertEqual(checker
.reads
, [(addr
, data
) for rw
, addr
, data
in pattern
if rw
== "r"])
447 def test_timeout(self
):
450 self
.master
= master
= AXILiteInterface()
451 self
.slave
= slave
= AXILiteInterface()
452 self
.submodules
.interconnect
= AXILiteInterconnectPointToPoint(master
, slave
)
453 self
.submodules
.timeout
= AXILiteTimeout(master
, 16)
455 def generator(axi_lite
):
456 resp
= (yield from axi_lite
.write(0x00001000, 0x11111111))
457 self
.assertEqual(resp
, RESP_OKAY
)
458 resp
= (yield from axi_lite
.write(0x00002000, 0x22222222))
459 self
.assertEqual(resp
, RESP_SLVERR
)
460 data
, resp
= (yield from axi_lite
.read(0x00003000))
461 self
.assertEqual(resp
, RESP_SLVERR
)
462 self
.assertEqual(data
, 0xffffffff)
465 def checker(axi_lite
):
468 yield axi_lite
.aw
.ready
.eq(1)
469 yield axi_lite
.w
.ready
.eq(1)
471 yield axi_lite
.aw
.ready
.eq(0)
472 yield axi_lite
.w
.ready
.eq(0)
473 yield axi_lite
.b
.valid
.eq(1)
475 while not (yield axi_lite
.b
.ready
):
477 yield axi_lite
.b
.valid
.eq(0)
481 generator(dut
.master
),
483 timeout_generator(300),
485 run_simulation(dut
, generators
)
487 def test_arbiter_order(self
):
489 def __init__(self
, n_masters
):
490 self
.masters
= [AXILiteInterface() for _
in range(n_masters
)]
491 self
.slave
= AXILiteInterface()
492 self
.submodules
.arbiter
= AXILiteArbiter(self
.masters
, self
.slave
)
494 def generator(n
, axi_lite
, delay
=0):
499 resp
= (yield from axi_lite
.write(gen(i
), gen(i
)))
500 self
.assertEqual(resp
, RESP_OKAY
)
501 for _
in range(delay
):
504 data
, resp
= (yield from axi_lite
.read(gen(i
)))
505 self
.assertEqual(resp
, RESP_OKAY
)
506 for _
in range(delay
):
513 # with no delay each master will do all transfers at once
514 with self
.subTest(delay
=0):
516 checker
= AXILiteChecker()
517 generators
= [generator(i
, master
, delay
=0) for i
, master
in enumerate(dut
.masters
)]
518 generators
+= [timeout_generator(300), checker
.handler(dut
.slave
)]
519 run_simulation(dut
, generators
)
520 order
= [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
521 self
.assertEqual([addr
for addr
, data
, strb
in checker
.writes
], order
)
522 self
.assertEqual([addr
for addr
, data
in checker
.reads
], order
)
524 # with some delay, the round-robin arbiter will iterate over masters
525 with self
.subTest(delay
=1):
527 checker
= AXILiteChecker()
528 generators
= [generator(i
, master
, delay
=1) for i
, master
in enumerate(dut
.masters
)]
529 generators
+= [timeout_generator(300), checker
.handler(dut
.slave
)]
530 run_simulation(dut
, generators
)
531 order
= [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
532 self
.assertEqual([addr
for addr
, data
, strb
in checker
.writes
], order
)
533 self
.assertEqual([addr
for addr
, data
in checker
.reads
], order
)
535 def test_arbiter_holds_grant_until_response(self
):
537 def __init__(self
, n_masters
):
538 self
.masters
= [AXILiteInterface() for _
in range(n_masters
)]
539 self
.slave
= AXILiteInterface()
540 self
.submodules
.arbiter
= AXILiteArbiter(self
.masters
, self
.slave
)
542 def generator(n
, axi_lite
, delay
=0):
547 resp
= (yield from axi_lite
.write(gen(i
), gen(i
)))
548 self
.assertEqual(resp
, RESP_OKAY
)
549 for _
in range(delay
):
552 data
, resp
= (yield from axi_lite
.read(gen(i
)))
553 self
.assertEqual(resp
, RESP_OKAY
)
554 for _
in range(delay
):
561 # with no delay each master will do all transfers at once
562 with self
.subTest(delay
=0):
564 checker
= AXILiteChecker(response_latency
=lambda: 3)
565 generators
= [generator(i
, master
, delay
=0) for i
, master
in enumerate(dut
.masters
)]
566 generators
+= [timeout_generator(300), checker
.handler(dut
.slave
)]
567 run_simulation(dut
, generators
)
568 order
= [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
569 self
.assertEqual([addr
for addr
, data
, strb
in checker
.writes
], order
)
570 self
.assertEqual([addr
for addr
, data
in checker
.reads
], order
)
572 # with some delay, the round-robin arbiter will iterate over masters
573 with self
.subTest(delay
=1):
575 checker
= AXILiteChecker(response_latency
=lambda: 3)
576 generators
= [generator(i
, master
, delay
=1) for i
, master
in enumerate(dut
.masters
)]
577 generators
+= [timeout_generator(300), checker
.handler(dut
.slave
)]
578 run_simulation(dut
, generators
)
579 order
= [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
580 self
.assertEqual([addr
for addr
, data
, strb
in checker
.writes
], order
)
581 self
.assertEqual([addr
for addr
, data
in checker
.reads
], order
)
583 def address_decoder(self
, i
, size
=0x100, python
=False):
584 # bytes to 32-bit words aligned
586 _origin
= (size
* i
) >> 2
587 if python
: # for python integers
588 shift
= log2_int(_size
)
589 return lambda a
: ((a
>> shift
) == (_origin
>> shift
))
591 return lambda a
: (a
[log2_int(_size
):] == (_origin
>> log2_int(_size
)))
593 def decoder_test(self
, n_slaves
, pattern
, generator_delay
=0):
595 def __init__(self
, decoders
):
596 self
.master
= AXILiteInterface()
597 self
.slaves
= [AXILiteInterface() for _
in range(len(decoders
))]
598 slaves
= list(zip(decoders
, self
.slaves
))
599 self
.submodules
.decoder
= AXILiteDecoder(self
.master
, slaves
)
601 def rdata_generator(adr
):
602 for rw
, a
, v
in pattern
:
603 if rw
== "r" and a
== adr
:
607 dut
= DUT([self
.address_decoder(i
) for i
in range(n_slaves
)])
608 checkers
= [AXILiteChecker(rdata_generator
=rdata_generator
) for _
in dut
.slaves
]
610 generators
= [AXILitePatternGenerator(dut
.master
, pattern
, delay
=generator_delay
).handler()]
611 generators
+= [checker
.handler(slave
) for (slave
, checker
) in zip(dut
.slaves
, checkers
)]
612 generators
+= [timeout_generator(300)]
613 run_simulation(dut
, generators
)
617 def test_decoder_write(self
):
618 for delay
in [0, 1, 0]:
619 with self
.subTest(delay
=delay
):
620 slaves
= self
.decoder_test(n_slaves
=3, pattern
=[
630 ], generator_delay
=delay
)
632 def addr(checker_list
):
633 return [entry
[0] for entry
in checker_list
]
635 self
.assertEqual(addr(slaves
[0].writes
), [0x010, 0x011, 0x012])
636 self
.assertEqual(addr(slaves
[1].writes
), [0x110, 0x111, 0x112])
637 self
.assertEqual(addr(slaves
[2].writes
), [0x210, 0x211, 0x212])
639 self
.assertEqual(slave
.reads
, [])
641 def test_decoder_read(self
):
643 with self
.subTest(delay
=delay
):
644 slaves
= self
.decoder_test(n_slaves
=3, pattern
=[
654 ], generator_delay
=delay
)
656 def addr(checker_list
):
657 return [entry
[0] for entry
in checker_list
]
659 self
.assertEqual(addr(slaves
[0].reads
), [0x010, 0x011, 0x012])
660 self
.assertEqual(addr(slaves
[1].reads
), [0x110, 0x111, 0x112])
661 self
.assertEqual(addr(slaves
[2].reads
), [0x210, 0x211, 0x212])
663 self
.assertEqual(slave
.writes
, [])
665 def test_decoder_read_write(self
):
667 with self
.subTest(delay
=delay
):
668 slaves
= self
.decoder_test(n_slaves
=3, pattern
=[
675 ], generator_delay
=delay
)
677 def addr(checker_list
):
678 return [entry
[0] for entry
in checker_list
]
680 self
.assertEqual(addr(slaves
[0].writes
), [0x010])
681 self
.assertEqual(addr(slaves
[0].reads
), [0x011])
682 self
.assertEqual(addr(slaves
[1].writes
), [0x110])
683 self
.assertEqual(addr(slaves
[1].reads
), [0x111])
684 self
.assertEqual(addr(slaves
[2].writes
), [0x210])
685 self
.assertEqual(addr(slaves
[2].reads
), [0x211])
687 def test_decoder_stall(self
):
688 with self
.assertRaises(TimeoutError
):
689 self
.decoder_test(n_slaves
=3, pattern
=[
692 with self
.assertRaises(TimeoutError
):
693 self
.decoder_test(n_slaves
=3, pattern
=[
697 def interconnect_test(self
, master_patterns
, slave_decoders
,
698 master_delay
=0, slave_ready_latency
=0, slave_response_latency
=0,
699 disconnected_slaves
=None, timeout
=300, interconnect
=AXILiteInterconnectShared
,
701 # number of masters/slaves is defined by the number of patterns/decoders
702 # master_patterns: list of patterns per master, pattern = list(tuple(rw, addr, data))
703 # slave_decoders: list of address decoders per slave
704 # delay/latency: control the speed of masters/slaves
705 # disconnected_slaves: list of slave numbers that shouldn't respond to any transactions
707 def __init__(self
, n_masters
, decoders
, **kwargs
):
708 self
.masters
= [AXILiteInterface(name
="master") for _
in range(n_masters
)]
709 self
.slaves
= [AXILiteInterface(name
="slave") for _
in range(len(decoders
))]
710 slaves
= list(zip(decoders
, self
.slaves
))
711 self
.submodules
.interconnect
= interconnect(self
.masters
, slaves
, **kwargs
)
713 class ReadDataGenerator
:
714 # Generates data based on decoded addresses and data defined in master_patterns
715 def __init__(self
, patterns
):
717 for pattern
in patterns
:
718 for rw
, addr
, val
in pattern
:
720 assert addr
not in self
.mem
724 # on miss will give default data depending on slave n
725 return lambda addr
: self
.mem
.get(addr
, 0xbaad0000 + n
)
727 def new_checker(rdata_generator
):
728 return AXILiteChecker(ready_latency
=slave_ready_latency
,
729 response_latency
=slave_response_latency
,
730 rdata_generator
=rdata_generator
)
733 dut
= DUT(len(master_patterns
), slave_decoders
, **kwargs
)
734 rdata_generator
= ReadDataGenerator(master_patterns
)
735 checkers
= [new_checker(rdata_generator
.getter(i
)) for i
, _
in enumerate(master_patterns
)]
736 pattern_generators
= [AXILitePatternGenerator(dut
.masters
[i
], pattern
, delay
=master_delay
)
737 for i
, pattern
in enumerate(master_patterns
)]
740 generators
= [gen
.handler() for gen
in pattern_generators
]
741 generators
+= [checker
.handler(slave
)
742 for i
, (slave
, checker
) in enumerate(zip(dut
.slaves
, checkers
))
743 if i
not in (disconnected_slaves
or [])]
744 generators
+= [timeout_generator(timeout
)]
745 run_simulation(dut
, generators
)
747 return pattern_generators
, checkers
749 def test_interconnect_shared_basic(self
):
751 [("w", 0x000, 0), ("w", 0x101, 0), ("w", 0x202, 0)],
752 [("w", 0x010, 0), ("w", 0x111, 0), ("w", 0x112, 0)],
753 [("w", 0x220, 0), ("w", 0x221, 0), ("w", 0x222, 0)],
755 slave_decoders
= [self
.address_decoder(i
) for i
in range(3)]
757 generators
, checkers
= self
.interconnect_test(master_patterns
, slave_decoders
,
760 for gen
in generators
:
761 self
.assertEqual(gen
.errors
, 0)
763 def addr(checker_list
):
764 return [entry
[0] for entry
in checker_list
]
766 self
.assertEqual(addr(checkers
[0].writes
), [0x000, 0x010])
767 self
.assertEqual(addr(checkers
[1].writes
), [0x101, 0x111, 0x112])
768 self
.assertEqual(addr(checkers
[2].writes
), [0x220, 0x221, 0x202, 0x222])
769 self
.assertEqual(addr(checkers
[0].reads
), [])
770 self
.assertEqual(addr(checkers
[1].reads
), [])
771 self
.assertEqual(addr(checkers
[2].reads
), [])
773 def interconnect_stress_test(self
, timeout
=1000, **kwargs
):
774 prng
= random
.Random(42)
779 slave_region_size
= 0x10000000
780 # for testing purpose each master will access only its own region of a slave
781 master_region_size
= 0x1000
782 assert n_masters
*master_region_size
< slave_region_size
784 def gen_pattern(n
, length
):
785 assert length
< master_region_size
786 for i_access
in range(length
):
787 rw
= "w" if prng
.randint(0, 1) == 0 else "r"
788 i_slave
= prng
.randrange(n_slaves
)
789 addr
= i_slave
*slave_region_size
+ n
*master_region_size
+ i_access
793 master_patterns
= [list(gen_pattern(i
, pattern_length
)) for i
in range(n_masters
)]
794 slave_decoders
= [self
.address_decoder(i
, size
=slave_region_size
) for i
in range(n_slaves
)]
795 slave_decoders_py
= [self
.address_decoder(i
, size
=slave_region_size
, python
=True)
796 for i
in range(n_slaves
)]
798 generators
, checkers
= self
.interconnect_test(master_patterns
, slave_decoders
,
799 timeout
=timeout
, **kwargs
)
801 for gen
in generators
:
802 read_errors
= [" 0x{:08x} vs 0x{:08x}".format(v
, ref
) for v
, ref
in gen
.read_errors
]
803 msg
= "\ngen.resp_errors = {}\ngen.read_errors = \n{}".format(
804 gen
.resp_errors
, "\n".join(read_errors
))
805 if not kwargs
.get("disconnected_slaves", None):
806 self
.assertEqual(gen
.errors
, 0, msg
=msg
)
807 else: # when some slaves are disconnected we should have some errors
808 self
.assertNotEqual(gen
.errors
, 0, msg
=msg
)
810 # make sure all the accesses at slave side are in correct address region
811 for i_slave
, (checker
, decoder
) in enumerate(zip(checkers
, slave_decoders_py
)):
812 for addr
in (entry
[0] for entry
in checker
.writes
+ checker
.reads
):
813 # compensate for the fact that decoders work on word-aligned addresses
814 self
.assertNotEqual(decoder(addr
>> 2), 0)
816 def test_interconnect_shared_stress_no_delay(self
):
817 self
.interconnect_stress_test(timeout
=1000,
819 slave_ready_latency
=0,
820 slave_response_latency
=0)
822 def test_interconnect_shared_stress_rand_short(self
):
823 prng
= random
.Random(42)
824 rand
= lambda: prng
.randrange(4)
825 self
.interconnect_stress_test(timeout
=2000,
827 slave_ready_latency
=rand
,
828 slave_response_latency
=rand
)
830 def test_interconnect_shared_stress_rand_long(self
):
831 prng
= random
.Random(42)
832 rand
= lambda: prng
.randrange(16)
833 self
.interconnect_stress_test(timeout
=4000,
835 slave_ready_latency
=rand
,
836 slave_response_latency
=rand
)
838 def test_interconnect_shared_stress_timeout(self
):
839 self
.interconnect_stress_test(timeout
=4000,
840 disconnected_slaves
=[1],
843 def test_crossbar_stress_no_delay(self
):
844 self
.interconnect_stress_test(timeout
=1000,
846 slave_ready_latency
=0,
847 slave_response_latency
=0,
848 interconnect
=AXILiteCrossbar
)
850 def test_crossbar_stress_rand(self
):
851 prng
= random
.Random(42)
852 rand
= lambda: prng
.randrange(4)
853 self
.interconnect_stress_test(timeout
=2000,
855 slave_ready_latency
=rand
,
856 slave_response_latency
=rand
,
857 interconnect
=AXILiteCrossbar
)