2 # SPDX-License-Identifier: LGPL-2.1-or-later
3 # See Notices.txt for copyright information
5 from contextlib
import contextmanager
6 from ieee754
.part_mul_add
.multiply
import \
7 (PartitionPoints
, PartitionedAdder
, AddReduce
,
8 Mul8_16_32_64
, OP_MUL_LOW
, OP_MUL_SIGNED_HIGH
,
9 OP_MUL_SIGNED_UNSIGNED_HIGH
, OP_MUL_UNSIGNED_HIGH
)
10 from nmigen
import Signal
, Module
11 from nmigen
.back
.pysim
import Simulator
, Delay
, Tick
, Passive
12 from nmigen
.hdl
.ast
import Assign
, Value
13 from typing
import Any
, Generator
, List
, Union
, Optional
, Tuple
, Iterable
15 from hashlib
import sha256
18 from nmigen
.cli
import verilog
, rtlil
21 def create_ilang(dut
, traces
, test_name
):
22 vl
= rtlil
.convert(dut
, ports
=traces
)
23 with
open("%s.il" % test_name
, "w") as f
:
28 def create_simulator(module
: Any
,
31 create_ilang(module
, traces
, test_name
)
32 sim
= Simulator(module
)
33 with sim
.write_vcd(vcd_file
=open(test_name
+ ".vcd", "w"),
34 gtkw_file
=open(test_name
+ ".gtkw", "w"),
39 AsyncProcessCommand
= Union
[Delay
, Tick
, Passive
, Assign
, Value
]
40 ProcessCommand
= Optional
[AsyncProcessCommand
]
41 AsyncProcessGenerator
= Generator
[AsyncProcessCommand
, Union
[int, None], None]
42 ProcessGenerator
= Generator
[ProcessCommand
, Union
[int, None], None]
45 class TestPartitionPoints(unittest
.TestCase
):
46 def test(self
) -> None:
50 partition_point_10
= Signal()
51 partition_points
= PartitionPoints({1: True,
53 10: partition_point_10
})
54 module
.d
.comb
+= mask
.eq(partition_points
.as_mask(width
))
55 with
create_simulator(module
,
56 [mask
, partition_point_10
],
57 "partition_points") as sim
:
58 def async_process() -> AsyncProcessGenerator
:
59 self
.assertEqual((yield partition_points
[1]), True)
60 self
.assertEqual((yield partition_points
[5]), False)
61 yield partition_point_10
.eq(0)
63 self
.assertEqual((yield mask
), 0xFFFD)
64 yield partition_point_10
.eq(1)
66 self
.assertEqual((yield mask
), 0xFBFD)
68 sim
.add_process(async_process
)
72 class TestPartitionedAdder(unittest
.TestCase
):
73 def test(self
) -> None:
75 partition_nibbles
= Signal()
76 partition_bytes
= Signal()
77 module
= PartitionedAdder(width
,
78 {0x4: partition_nibbles
,
79 0x8: partition_bytes | partition_nibbles
,
80 0xC: partition_nibbles
})
81 with
create_simulator(module
,
87 "partitioned_adder") as sim
:
88 def async_process() -> AsyncProcessGenerator
:
89 def test_add(msg_prefix
: str,
90 *mask_list
: Tuple
[int, ...]) -> Any
:
91 for a
, b
in [(0x0000, 0x0000),
102 for mask
in mask_list
:
103 y |
= mask
& ((a
& mask
) + (b
& mask
))
104 output
= (yield module
.output
)
105 msg
= f
"{msg_prefix}: 0x{a:X} + 0x{b:X}" + \
106 f
" => 0x{y:X} != 0x{output:X}"
107 self
.assertEqual(y
, output
, msg
)
108 yield partition_nibbles
.eq(0)
109 yield partition_bytes
.eq(0)
110 yield from test_add("16-bit", 0xFFFF)
111 yield partition_nibbles
.eq(0)
112 yield partition_bytes
.eq(1)
113 yield from test_add("8-bit", 0xFF00, 0x00FF)
114 yield partition_nibbles
.eq(1)
115 yield partition_bytes
.eq(0)
116 yield from test_add("4-bit", 0xF000, 0x0F00, 0x00F0, 0x000F)
118 sim
.add_process(async_process
)
122 class GenOrCheck(enum
.Enum
):
123 Generate
= enum
.auto()
127 class TestAddReduce(unittest
.TestCase
):
128 def calculate_input_values(self
,
131 extra_keys
: List
[int] = []
132 ) -> (List
[int], List
[str]):
134 input_values_str
= []
135 for i
in range(input_count
):
143 hash_input
= f
"{input_count} {i} {key} {extra_keys}"
144 hash = sha256(hash_input
.encode()).digest()
145 value
= int.from_bytes(hash, byteorder
="little")
147 input_values
.append(value
)
148 input_values_str
.append(f
"0x{value:04X}")
149 return input_values
, input_values_str
151 def subtest_value(self
,
152 inputs
: List
[Signal
],
154 mask_list
: List
[int],
155 gen_or_check
: GenOrCheck
,
156 values
: List
[int]) -> AsyncProcessGenerator
:
157 if gen_or_check
== GenOrCheck
.Generate
:
158 for i
, v
in zip(inputs
, values
):
162 for mask
in mask_list
:
167 output
= (yield module
.o
.output
)
168 if gen_or_check
== GenOrCheck
.Check
:
169 self
.assertEqual(y
, output
, f
"0x{y:X} != 0x{output:X}")
172 def subtest_key(self
,
174 inputs
: List
[Signal
],
177 mask_list
: List
[int],
178 gen_or_check
: GenOrCheck
) -> AsyncProcessGenerator
:
179 values
, values_str
= self
.calculate_input_values(input_count
, key
)
180 if gen_or_check
== GenOrCheck
.Check
:
181 with self
.subTest(inputs
=values_str
):
182 yield from self
.subtest_value(inputs
,
188 yield from self
.subtest_value(inputs
,
194 def subtest_run_sim(self
,
199 inputs
: List
[Signal
],
201 delay_cycles
: int) -> None:
202 def generic_process(gen_or_check
: GenOrCheck
) -> AsyncProcessGenerator
:
203 for partition_4_value
, partition_8_value
, mask_list
in [
205 (0, 1, [0xFF00, 0x00FF]),
206 (1, 0, [0xFFF0, 0x000F]),
207 (1, 1, [0xFF00, 0x00F0, 0x000F])]:
209 if gen_or_check
== GenOrCheck
.Check
:
210 with self
.subTest(partition_4
=partition_4_value
,
211 partition_8
=partition_8_value
):
212 for key
in range(key_count
):
213 with self
.subTest(key
=key
):
214 yield from self
.subtest_key(input_count
,
221 if gen_or_check
== GenOrCheck
.Generate
:
222 yield partition_4
.eq(partition_4_value
)
223 yield partition_8
.eq(partition_8_value
)
224 for key
in range(key_count
):
225 yield from self
.subtest_key(input_count
,
232 def generate_process() -> AsyncProcessGenerator
:
233 yield from generic_process(GenOrCheck
.Generate
)
235 def check_process() -> AsyncProcessGenerator
:
236 if delay_cycles
!= 0:
237 for _
in range(delay_cycles
):
239 yield from generic_process(GenOrCheck
.Check
)
241 sim
.add_clock(2e-6, if_exists
=True)
242 sim
.add_process(generate_process
)
243 sim
.add_process(check_process
)
246 def subtest_file(self
,
248 register_levels
: List
[int]) -> None:
249 max_level
= AddReduce
.get_max_level(input_count
)
250 for level
in register_levels
:
251 if level
> max_level
:
253 partition_4
= Signal()
254 partition_8
= Signal()
255 partition_points
= PartitionPoints()
256 partition_points
[4] = partition_4
257 partition_points
[8] = partition_8
259 inputs
= [Signal(width
, name
=f
"input_{i}")
260 for i
in range(input_count
)]
261 module
= AddReduce(inputs
,
266 file_name
= "add_reduce"
267 if len(register_levels
) != 0:
268 file_name
+= f
"-{'_'.join(map(repr, register_levels))}"
269 file_name
+= f
"-{input_count:02d}"
270 ports
= [partition_4
, partition_8
, *inputs
, module
.o
.output
]
271 #create_ilang(module, ports, file_name)
272 with
create_simulator(module
, ports
, file_name
) as sim
:
273 self
.subtest_run_sim(input_count
,
279 len(register_levels
))
281 def subtest_register_levels(self
, register_levels
: List
[int]) -> None:
282 for input_count
in range(0, 16):
283 with self
.subTest(input_count
=input_count
,
284 register_levels
=repr(register_levels
)):
285 self
.subtest_file(input_count
, register_levels
)
287 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
288 def test_empty(self
) -> None:
289 self
.subtest_register_levels([])
291 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
292 def test_0(self
) -> None:
293 self
.subtest_register_levels([0])
295 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
296 def test_1(self
) -> None:
297 self
.subtest_register_levels([1])
299 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
300 def test_2(self
) -> None:
301 self
.subtest_register_levels([2])
303 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
304 def test_3(self
) -> None:
305 self
.subtest_register_levels([3])
307 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
308 def test_4(self
) -> None:
309 self
.subtest_register_levels([4])
311 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
312 def test_5(self
) -> None:
313 self
.subtest_register_levels([5])
315 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
316 def test_0(self
) -> None:
317 self
.subtest_register_levels([0])
319 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
320 def test_0_1(self
) -> None:
321 self
.subtest_register_levels([0, 1])
323 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
324 def test_0_1_2(self
) -> None:
325 self
.subtest_register_levels([0, 1, 2])
327 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
328 def test_0_1_2_3(self
) -> None:
329 self
.subtest_register_levels([0, 1, 2, 3])
331 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
332 def test_0_1_2_3_4(self
) -> None:
333 self
.subtest_register_levels([0, 1, 2, 3, 4])
335 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
336 def test_0_1_2_3_4_5(self
) -> None:
337 self
.subtest_register_levels([0, 1, 2, 3, 4, 5])
339 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
340 def test_0_2(self
) -> None:
341 self
.subtest_register_levels([0, 2])
343 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
344 def test_0_3(self
) -> None:
345 self
.subtest_register_levels([0, 3])
347 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
348 def test_0_4(self
) -> None:
349 self
.subtest_register_levels([0, 4])
351 @unittest.expectedFailure
# FIXME: NameError: name 'pspec' is not defined
352 def test_0_5(self
) -> None:
353 self
.subtest_register_levels([0, 5])
357 def __init__(self
, a_signed
, b_signed
, bit_width
, high_half
):
358 self
.a_signed
= a_signed
359 self
.b_signed
= b_signed
360 self
.bit_width
= bit_width
361 self
.high_half
= high_half
364 return f
"SIMDMulLane({self.a_signed}, {self.b_signed}, " +\
365 f
"{self.bit_width}, {self.high_half})"
368 def simd_mul(a
, b
, lanes
):
370 intermediate_output
= 0
373 a_signed
= lane
.a_signed
or not lane
.high_half
374 b_signed
= lane
.b_signed
or not lane
.high_half
375 mask
= (1 << lane
.bit_width
) - 1
376 sign_bit
= 1 << (lane
.bit_width
- 1)
377 a_part
= (a
>> shift
) & mask
378 if a_signed
and (a_part
& sign_bit
) != 0:
379 a_part
-= 1 << lane
.bit_width
380 b_part
= (b
>> shift
) & mask
381 if b_signed
and (b_part
& sign_bit
) != 0:
382 b_part
-= 1 << lane
.bit_width
383 value
= a_part
* b_part
384 value
&= (1 << (lane
.bit_width
* 2)) - 1
385 intermediate_output |
= value
<< (shift
* 2)
387 value
>>= lane
.bit_width
389 output |
= value
<< shift
390 shift
+= lane
.bit_width
391 return output
, intermediate_output
394 class TestMul8_16_32_64(unittest
.TestCase
):
397 def get_tst_cases(lanes
: List
[SIMDMulLane
],
398 keys
: Iterable
[int]) -> Iterable
[Tuple
[int, int]]:
401 hash_input
= f
"{i} {lanes} {list(keys)}"
402 hash = sha256(hash_input
.encode()).digest()
403 value
= int.from_bytes(hash, byteorder
="little")
404 yield (value
& mask
, value
>> 64)
409 a |
= 1 << (shift
+ lane
.bit_width
- 1)
410 b |
= 1 << (shift
+ lane
.bit_width
- 1)
411 shift
+= lane
.bit_width
414 def test_simd_mul_lane(self
):
415 self
.assertEqual(f
"{SIMDMulLane(True, True, 8, False)}",
416 "SIMDMulLane(True, True, 8, False)")
418 def test_simd_mul(self
):
419 lanes
= [SIMDMulLane(True,
435 a
= 0x0123456789ABCDEF
436 b
= 0xFEDCBA9876543210
437 output
= 0x0121FA00FE1C28FE
438 intermediate_output
= 0x0121FA0023E20B28C94DFE1C280AFEF0
439 self
.assertEqual(simd_mul(a
, b
, lanes
),
440 (output
, intermediate_output
))
441 a
= 0x8123456789ABCDEF
442 b
= 0xFEDCBA9876543210
443 output
= 0x81B39CB4FE1C28FE
444 intermediate_output
= 0x81B39CB423E20B28C94DFE1C280AFEF0
445 self
.assertEqual(simd_mul(a
, b
, lanes
),
446 (output
, intermediate_output
))
448 def test_signed_mul_from_unsigned(self
):
449 for i
in range(0, 0x10):
450 for j
in range(0, 0x10):
451 si
= i
if i
& 8 else i
- 0x10 # signed i
452 sj
= j
if j
& 8 else j
- 0x10 # signed j
456 with self
.subTest(i
=i
, j
=j
, si
=si
, sj
=sj
,
457 mulu
=mulu
, mulsu
=mulsu
, mul
=mul
):
462 self
.assertEqual(mulsu
& 0xFF, mulsu2
& 0xFF)
467 self
.assertEqual(mul
& 0xFF, mul2
& 0xFF)
469 def subtest_value(self
,
472 module
: Mul8_16_32_64
,
473 lanes
: List
[SIMDMulLane
],
474 gen_or_check
: GenOrCheck
) -> AsyncProcessGenerator
:
475 if gen_or_check
== GenOrCheck
.Generate
:
478 output2
, intermediate_output2
= simd_mul(a
, b
, lanes
)
480 if gen_or_check
== GenOrCheck
.Check
:
481 intermediate_output
= (yield module
.intermediate_output
)
482 self
.assertEqual(intermediate_output
,
483 intermediate_output2
,
484 f
"0x{intermediate_output:X} "
485 + f
"!= 0x{intermediate_output2:X}")
486 output
= (yield module
.output
)
487 self
.assertEqual(output
, output2
, f
"0x{output:X} != 0x{output2:X}")
490 def subtest_lanes_2(self
,
491 lanes
: List
[SIMDMulLane
],
492 module
: Mul8_16_32_64
,
493 gen_or_check
: GenOrCheck
) -> AsyncProcessGenerator
:
500 op
= OP_MUL_SIGNED_HIGH
502 op
= OP_MUL_SIGNED_UNSIGNED_HIGH
504 self
.assertFalse(lane
.b_signed
,
505 "unsigned * signed not supported")
506 op
= OP_MUL_UNSIGNED_HIGH
509 self
.assertEqual(lane
.bit_width
% 8, 0)
510 for i
in range(lane
.bit_width
// 8):
511 if gen_or_check
== GenOrCheck
.Generate
:
512 yield module
.part_ops
[part_index
].eq(op
)
514 for i
in range(lane
.bit_width
// 8 - 1):
515 if gen_or_check
== GenOrCheck
.Generate
:
516 yield module
.part_pts
[bit_index
].eq(0)
518 if bit_index
< 64 and gen_or_check
== GenOrCheck
.Generate
:
519 yield module
.part_pts
[bit_index
].eq(1)
521 self
.assertEqual(part_index
, 8)
522 for a
, b
in self
.get_tst_cases(lanes
, ()):
523 if gen_or_check
== GenOrCheck
.Check
:
524 with self
.subTest(a
=f
"{a:X}", b
=f
"{b:X}"):
525 yield from self
.subtest_value(a
, b
, module
, lanes
, gen_or_check
)
527 yield from self
.subtest_value(a
, b
, module
, lanes
, gen_or_check
)
529 def subtest_lanes(self
,
530 lanes
: List
[SIMDMulLane
],
531 module
: Mul8_16_32_64
,
532 gen_or_check
: GenOrCheck
) -> AsyncProcessGenerator
:
533 if gen_or_check
== GenOrCheck
.Check
:
534 with self
.subTest(lanes
=repr(lanes
)):
535 yield from self
.subtest_lanes_2(lanes
, module
, gen_or_check
)
537 yield from self
.subtest_lanes_2(lanes
, module
, gen_or_check
)
539 def subtest_file(self
,
540 register_levels
: List
[int]) -> None:
541 module
= Mul8_16_32_64(register_levels
)
542 file_name
= "mul8_16_32_64"
543 if len(register_levels
) != 0:
544 file_name
+= f
"-{'_'.join(map(repr, register_levels))}"
547 module
.intermediate_output
,
549 ports
.extend(module
.part_ops
)
550 ports
.extend(module
.part_pts
.values())
552 m
.submodules
+= module
553 m
.d
.sync
+= Signal().eq(0) # ensure sync domain is created
554 with
create_simulator(m
, ports
, file_name
) as sim
:
555 def process(gen_or_check
: GenOrCheck
) -> AsyncProcessGenerator
:
556 for a_signed
in False, True:
557 for b_signed
in False, True:
558 if not a_signed
and b_signed
:
560 for high_half
in False, True:
561 if not high_half
and not (a_signed
and b_signed
):
563 yield from self
.subtest_lanes(
564 [SIMDMulLane(a_signed
,
570 yield from self
.subtest_lanes(
571 [SIMDMulLane(a_signed
,
577 yield from self
.subtest_lanes(
578 [SIMDMulLane(a_signed
,
584 yield from self
.subtest_lanes(
585 [SIMDMulLane(a_signed
,
591 yield from self
.subtest_lanes([SIMDMulLane(False,
609 yield from self
.subtest_lanes([SIMDMulLane(True,
627 yield from self
.subtest_lanes([SIMDMulLane(True,
646 def generate_process() -> AsyncProcessGenerator
:
647 yield from process(GenOrCheck
.Generate
)
649 def check_process() -> AsyncProcessGenerator
:
650 if len(register_levels
) != 0:
651 for _
in register_levels
:
653 yield from process(GenOrCheck
.Check
)
655 sim
.add_clock(2e-6, if_exists
=True)
656 sim
.add_process(generate_process
)
657 sim
.add_process(check_process
)
660 def subtest_register_levels(self
, register_levels
: List
[int]) -> None:
661 with self
.subTest(register_levels
=repr(register_levels
)):
662 self
.subtest_file(register_levels
)
664 def test_empty(self
) -> None:
665 self
.subtest_register_levels([])
667 def test_0(self
) -> None:
668 self
.subtest_register_levels([0])
670 def test_1(self
) -> None:
671 self
.subtest_register_levels([1])
673 def test_2(self
) -> None:
674 self
.subtest_register_levels([2])
676 def test_3(self
) -> None:
677 self
.subtest_register_levels([3])
679 def test_4(self
) -> None:
680 self
.subtest_register_levels([4])
682 def test_5(self
) -> None:
683 self
.subtest_register_levels([5])
685 def test_6(self
) -> None:
686 self
.subtest_register_levels([6])
688 def test_7(self
) -> None:
689 self
.subtest_register_levels([7])
691 def test_8(self
) -> None:
692 self
.subtest_register_levels([8])
694 def test_9(self
) -> None:
695 self
.subtest_register_levels([9])
697 def test_10(self
) -> None:
698 self
.subtest_register_levels([10])
700 def test_0(self
) -> None:
701 self
.subtest_register_levels([0])
703 def test_0_1(self
) -> None:
704 self
.subtest_register_levels([0, 1])
706 def test_0_1_2(self
) -> None:
707 self
.subtest_register_levels([0, 1, 2])
709 def test_0_1_2_3(self
) -> None:
710 self
.subtest_register_levels([0, 1, 2, 3])
712 def test_0_1_2_3_4(self
) -> None:
713 self
.subtest_register_levels([0, 1, 2, 3, 4])
715 def test_0_1_2_3_4_5(self
) -> None:
716 self
.subtest_register_levels([0, 1, 2, 3, 4, 5])
718 def test_0_1_2_3_4_5_6(self
) -> None:
719 self
.subtest_register_levels([0, 1, 2, 3, 4, 5, 6])
721 def test_0_1_2_3_4_5_6_7(self
) -> None:
722 self
.subtest_register_levels([0, 1, 2, 3, 4, 5, 6, 7])
724 def test_0_1_2_3_4_5_6_7_8(self
) -> None:
725 self
.subtest_register_levels([0, 1, 2, 3, 4, 5, 6, 7, 8])
727 def test_0_1_2_3_4_5_6_7_8_9(self
) -> None:
728 self
.subtest_register_levels([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
730 def test_0_1_2_3_4_5_6_7_8_9_10(self
) -> None:
731 self
.subtest_register_levels([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
733 def test_0_2(self
) -> None:
734 self
.subtest_register_levels([0, 2])
736 def test_0_3(self
) -> None:
737 self
.subtest_register_levels([0, 3])
739 def test_0_4(self
) -> None:
740 self
.subtest_register_levels([0, 4])
742 def test_0_5(self
) -> None:
743 self
.subtest_register_levels([0, 5])
745 def test_0_6(self
) -> None:
746 self
.subtest_register_levels([0, 6])
748 def test_0_7(self
) -> None:
749 self
.subtest_register_levels([0, 7])
751 def test_0_8(self
) -> None:
752 self
.subtest_register_levels([0, 8])
754 def test_0_9(self
) -> None:
755 self
.subtest_register_levels([0, 9])
757 def test_0_10(self
) -> None:
758 self
.subtest_register_levels([0, 10])
761 if __name__
== '__main__':