2 # SPDX-License-Identifier: LGPL-2.1-or-later
3 # See Notices.txt for copyright information
5 from ieee754
.part_mul_add
.multiply
import \
6 (PartitionPoints
, PartitionedAdder
, AddReduce
,
7 Mul8_16_32_64
, OP_MUL_LOW
, OP_MUL_SIGNED_HIGH
,
8 OP_MUL_SIGNED_UNSIGNED_HIGH
, OP_MUL_UNSIGNED_HIGH
)
9 from nmigen
import Signal
, Module
10 from nmigen
.back
.pysim
import Simulator
, Delay
, Tick
, Passive
11 from nmigen
.hdl
.ast
import Assign
, Value
12 from typing
import Any
, Generator
, List
, Union
, Optional
, Tuple
, Iterable
14 from hashlib
import sha256
17 from nmigen
.cli
import verilog
, rtlil
20 def create_ilang(dut
, traces
, test_name
):
21 vl
= rtlil
.convert(dut
, ports
=traces
)
22 with
open("%s.il" % test_name
, "w") as f
:
26 def create_simulator(module
: Any
,
28 test_name
: str) -> Simulator
:
29 create_ilang(module
, traces
, test_name
)
30 return Simulator(module
,
31 vcd_file
=open(test_name
+ ".vcd", "w"),
32 gtkw_file
=open(test_name
+ ".gtkw", "w"),
36 AsyncProcessCommand
= Union
[Delay
, Tick
, Passive
, Assign
, Value
]
37 ProcessCommand
= Optional
[AsyncProcessCommand
]
38 AsyncProcessGenerator
= Generator
[AsyncProcessCommand
, Union
[int, None], None]
39 ProcessGenerator
= Generator
[ProcessCommand
, Union
[int, None], None]
42 class TestPartitionPoints(unittest
.TestCase
):
43 def test(self
) -> None:
47 partition_point_10
= Signal()
48 partition_points
= PartitionPoints({1: True,
50 10: partition_point_10
})
51 module
.d
.comb
+= mask
.eq(partition_points
.as_mask(width
))
52 with
create_simulator(module
,
53 [mask
, partition_point_10
],
54 "partition_points") as sim
:
55 def async_process() -> AsyncProcessGenerator
:
56 self
.assertEqual((yield partition_points
[1]), True)
57 self
.assertEqual((yield partition_points
[5]), False)
58 yield partition_point_10
.eq(0)
60 self
.assertEqual((yield mask
), 0xFFFD)
61 yield partition_point_10
.eq(1)
63 self
.assertEqual((yield mask
), 0xFBFD)
65 sim
.add_process(async_process
)
69 class TestPartitionedAdder(unittest
.TestCase
):
70 def test(self
) -> None:
72 partition_nibbles
= Signal()
73 partition_bytes
= Signal()
74 module
= PartitionedAdder(width
,
75 {0x4: partition_nibbles
,
76 0x8: partition_bytes | partition_nibbles
,
77 0xC: partition_nibbles
})
78 with
create_simulator(module
,
84 "partitioned_adder") as sim
:
85 def async_process() -> AsyncProcessGenerator
:
86 def test_add(msg_prefix
: str,
87 *mask_list
: Tuple
[int, ...]) -> Any
:
88 for a
, b
in [(0x0000, 0x0000),
99 for mask
in mask_list
:
100 y |
= mask
& ((a
& mask
) + (b
& mask
))
101 output
= (yield module
.output
)
102 msg
= f
"{msg_prefix}: 0x{a:X} + 0x{b:X}" + \
103 f
" => 0x{y:X} != 0x{output:X}"
104 self
.assertEqual(y
, output
, msg
)
105 yield partition_nibbles
.eq(0)
106 yield partition_bytes
.eq(0)
107 yield from test_add("16-bit", 0xFFFF)
108 yield partition_nibbles
.eq(0)
109 yield partition_bytes
.eq(1)
110 yield from test_add("8-bit", 0xFF00, 0x00FF)
111 yield partition_nibbles
.eq(1)
112 yield partition_bytes
.eq(0)
113 yield from test_add("4-bit", 0xF000, 0x0F00, 0x00F0, 0x000F)
115 sim
.add_process(async_process
)
119 class GenOrCheck(enum
.Enum
):
120 Generate
= enum
.auto()
124 class TestAddReduce(unittest
.TestCase
):
125 def calculate_input_values(self
,
128 extra_keys
: List
[int] = []
129 ) -> (List
[int], List
[str]):
131 input_values_str
= []
132 for i
in range(input_count
):
140 hash_input
= f
"{input_count} {i} {key} {extra_keys}"
141 hash = sha256(hash_input
.encode()).digest()
142 value
= int.from_bytes(hash, byteorder
="little")
144 input_values
.append(value
)
145 input_values_str
.append(f
"0x{value:04X}")
146 return input_values
, input_values_str
148 def subtest_value(self
,
149 inputs
: List
[Signal
],
151 mask_list
: List
[int],
152 gen_or_check
: GenOrCheck
,
153 values
: List
[int]) -> AsyncProcessGenerator
:
154 if gen_or_check
== GenOrCheck
.Generate
:
155 for i
, v
in zip(inputs
, values
):
159 for mask
in mask_list
:
164 output
= (yield module
.o
.output
)
165 if gen_or_check
== GenOrCheck
.Check
:
166 self
.assertEqual(y
, output
, f
"0x{y:X} != 0x{output:X}")
169 def subtest_key(self
,
171 inputs
: List
[Signal
],
174 mask_list
: List
[int],
175 gen_or_check
: GenOrCheck
) -> AsyncProcessGenerator
:
176 values
, values_str
= self
.calculate_input_values(input_count
, key
)
177 if gen_or_check
== GenOrCheck
.Check
:
178 with self
.subTest(inputs
=values_str
):
179 yield from self
.subtest_value(inputs
,
185 yield from self
.subtest_value(inputs
,
191 def subtest_run_sim(self
,
196 inputs
: List
[Signal
],
198 delay_cycles
: int) -> None:
199 def generic_process(gen_or_check
: GenOrCheck
) -> AsyncProcessGenerator
:
200 for partition_4_value
, partition_8_value
, mask_list
in [
202 (0, 1, [0xFF00, 0x00FF]),
203 (1, 0, [0xFFF0, 0x000F]),
204 (1, 1, [0xFF00, 0x00F0, 0x000F])]:
206 if gen_or_check
== GenOrCheck
.Check
:
207 with self
.subTest(partition_4
=partition_4_value
,
208 partition_8
=partition_8_value
):
209 for key
in range(key_count
):
210 with self
.subTest(key
=key
):
211 yield from self
.subtest_key(input_count
,
218 if gen_or_check
== GenOrCheck
.Generate
:
219 yield partition_4
.eq(partition_4_value
)
220 yield partition_8
.eq(partition_8_value
)
221 for key
in range(key_count
):
222 yield from self
.subtest_key(input_count
,
229 def generate_process() -> AsyncProcessGenerator
:
230 yield from generic_process(GenOrCheck
.Generate
)
232 def check_process() -> AsyncProcessGenerator
:
233 if delay_cycles
!= 0:
234 for _
in range(delay_cycles
):
236 yield from generic_process(GenOrCheck
.Check
)
238 sim
.add_clock(2e-6, if_exists
=True)
239 sim
.add_process(generate_process
)
240 sim
.add_process(check_process
)
243 def subtest_file(self
,
245 register_levels
: List
[int]) -> None:
246 max_level
= AddReduce
.get_max_level(input_count
)
247 for level
in register_levels
:
248 if level
> max_level
:
250 partition_4
= Signal()
251 partition_8
= Signal()
252 partition_points
= PartitionPoints()
253 partition_points
[4] = partition_4
254 partition_points
[8] = partition_8
256 inputs
= [Signal(width
, name
=f
"input_{i}")
257 for i
in range(input_count
)]
258 module
= AddReduce(inputs
,
263 file_name
= "add_reduce"
264 if len(register_levels
) != 0:
265 file_name
+= f
"-{'_'.join(map(repr, register_levels))}"
266 file_name
+= f
"-{input_count:02d}"
267 ports
= [partition_4
, partition_8
, *inputs
, module
.o
.output
]
268 #create_ilang(module, ports, file_name)
269 with
create_simulator(module
, ports
, file_name
) as sim
:
270 self
.subtest_run_sim(input_count
,
276 len(register_levels
))
278 def subtest_register_levels(self
, register_levels
: List
[int]) -> None:
279 for input_count
in range(0, 16):
280 with self
.subTest(input_count
=input_count
,
281 register_levels
=repr(register_levels
)):
282 self
.subtest_file(input_count
, register_levels
)
284 def test_empty(self
) -> None:
285 self
.subtest_register_levels([])
287 def test_0(self
) -> None:
288 self
.subtest_register_levels([0])
290 def test_1(self
) -> None:
291 self
.subtest_register_levels([1])
293 def test_2(self
) -> None:
294 self
.subtest_register_levels([2])
296 def test_3(self
) -> None:
297 self
.subtest_register_levels([3])
299 def test_4(self
) -> None:
300 self
.subtest_register_levels([4])
302 def test_5(self
) -> None:
303 self
.subtest_register_levels([5])
305 def test_0(self
) -> None:
306 self
.subtest_register_levels([0])
308 def test_0_1(self
) -> None:
309 self
.subtest_register_levels([0, 1])
311 def test_0_1_2(self
) -> None:
312 self
.subtest_register_levels([0, 1, 2])
314 def test_0_1_2_3(self
) -> None:
315 self
.subtest_register_levels([0, 1, 2, 3])
317 def test_0_1_2_3_4(self
) -> None:
318 self
.subtest_register_levels([0, 1, 2, 3, 4])
320 def test_0_1_2_3_4_5(self
) -> None:
321 self
.subtest_register_levels([0, 1, 2, 3, 4, 5])
323 def test_0_2(self
) -> None:
324 self
.subtest_register_levels([0, 2])
326 def test_0_3(self
) -> None:
327 self
.subtest_register_levels([0, 3])
329 def test_0_4(self
) -> None:
330 self
.subtest_register_levels([0, 4])
332 def test_0_5(self
) -> None:
333 self
.subtest_register_levels([0, 5])
337 def __init__(self
, a_signed
, b_signed
, bit_width
, high_half
):
338 self
.a_signed
= a_signed
339 self
.b_signed
= b_signed
340 self
.bit_width
= bit_width
341 self
.high_half
= high_half
344 return f
"SIMDMulLane({self.a_signed}, {self.b_signed}, " +\
345 f
"{self.bit_width}, {self.high_half})"
348 def simd_mul(a
, b
, lanes
):
350 intermediate_output
= 0
353 a_signed
= lane
.a_signed
or not lane
.high_half
354 b_signed
= lane
.b_signed
or not lane
.high_half
355 mask
= (1 << lane
.bit_width
) - 1
356 sign_bit
= 1 << (lane
.bit_width
- 1)
357 a_part
= (a
>> shift
) & mask
358 if a_signed
and (a_part
& sign_bit
) != 0:
359 a_part
-= 1 << lane
.bit_width
360 b_part
= (b
>> shift
) & mask
361 if b_signed
and (b_part
& sign_bit
) != 0:
362 b_part
-= 1 << lane
.bit_width
363 value
= a_part
* b_part
364 value
&= (1 << (lane
.bit_width
* 2)) - 1
365 intermediate_output |
= value
<< (shift
* 2)
367 value
>>= lane
.bit_width
369 output |
= value
<< shift
370 shift
+= lane
.bit_width
371 return output
, intermediate_output
374 class TestMul8_16_32_64(unittest
.TestCase
):
377 def get_tst_cases(lanes
: List
[SIMDMulLane
],
378 keys
: Iterable
[int]) -> Iterable
[Tuple
[int, int]]:
381 hash_input
= f
"{i} {lanes} {list(keys)}"
382 hash = sha256(hash_input
.encode()).digest()
383 value
= int.from_bytes(hash, byteorder
="little")
384 yield (value
& mask
, value
>> 64)
389 a |
= 1 << (shift
+ lane
.bit_width
- 1)
390 b |
= 1 << (shift
+ lane
.bit_width
- 1)
391 shift
+= lane
.bit_width
394 def test_simd_mul_lane(self
):
395 self
.assertEqual(f
"{SIMDMulLane(True, True, 8, False)}",
396 "SIMDMulLane(True, True, 8, False)")
398 def test_simd_mul(self
):
399 lanes
= [SIMDMulLane(True,
415 a
= 0x0123456789ABCDEF
416 b
= 0xFEDCBA9876543210
417 output
= 0x0121FA00FE1C28FE
418 intermediate_output
= 0x0121FA0023E20B28C94DFE1C280AFEF0
419 self
.assertEqual(simd_mul(a
, b
, lanes
),
420 (output
, intermediate_output
))
421 a
= 0x8123456789ABCDEF
422 b
= 0xFEDCBA9876543210
423 output
= 0x81B39CB4FE1C28FE
424 intermediate_output
= 0x81B39CB423E20B28C94DFE1C280AFEF0
425 self
.assertEqual(simd_mul(a
, b
, lanes
),
426 (output
, intermediate_output
))
428 def test_signed_mul_from_unsigned(self
):
429 for i
in range(0, 0x10):
430 for j
in range(0, 0x10):
431 si
= i
if i
& 8 else i
- 0x10 # signed i
432 sj
= j
if j
& 8 else j
- 0x10 # signed j
436 with self
.subTest(i
=i
, j
=j
, si
=si
, sj
=sj
,
437 mulu
=mulu
, mulsu
=mulsu
, mul
=mul
):
442 self
.assertEqual(mulsu
& 0xFF, mulsu2
& 0xFF)
447 self
.assertEqual(mul
& 0xFF, mul2
& 0xFF)
449 def subtest_value(self
,
452 module
: Mul8_16_32_64
,
453 lanes
: List
[SIMDMulLane
],
454 gen_or_check
: GenOrCheck
) -> AsyncProcessGenerator
:
455 if gen_or_check
== GenOrCheck
.Generate
:
458 output2
, intermediate_output2
= simd_mul(a
, b
, lanes
)
460 if gen_or_check
== GenOrCheck
.Check
:
461 intermediate_output
= (yield module
.intermediate_output
)
462 self
.assertEqual(intermediate_output
,
463 intermediate_output2
,
464 f
"0x{intermediate_output:X} "
465 + f
"!= 0x{intermediate_output2:X}")
466 output
= (yield module
.output
)
467 self
.assertEqual(output
, output2
, f
"0x{output:X} != 0x{output2:X}")
470 def subtest_lanes_2(self
,
471 lanes
: List
[SIMDMulLane
],
472 module
: Mul8_16_32_64
,
473 gen_or_check
: GenOrCheck
) -> AsyncProcessGenerator
:
480 op
= OP_MUL_SIGNED_HIGH
482 op
= OP_MUL_SIGNED_UNSIGNED_HIGH
484 self
.assertFalse(lane
.b_signed
,
485 "unsigned * signed not supported")
486 op
= OP_MUL_UNSIGNED_HIGH
489 self
.assertEqual(lane
.bit_width
% 8, 0)
490 for i
in range(lane
.bit_width
// 8):
491 if gen_or_check
== GenOrCheck
.Generate
:
492 yield module
.part_ops
[part_index
].eq(op
)
494 for i
in range(lane
.bit_width
// 8 - 1):
495 if gen_or_check
== GenOrCheck
.Generate
:
496 yield module
.part_pts
[bit_index
].eq(0)
498 if bit_index
< 64 and gen_or_check
== GenOrCheck
.Generate
:
499 yield module
.part_pts
[bit_index
].eq(1)
501 self
.assertEqual(part_index
, 8)
502 for a
, b
in self
.get_tst_cases(lanes
, ()):
503 if gen_or_check
== GenOrCheck
.Check
:
504 with self
.subTest(a
=f
"{a:X}", b
=f
"{b:X}"):
505 yield from self
.subtest_value(a
, b
, module
, lanes
, gen_or_check
)
507 yield from self
.subtest_value(a
, b
, module
, lanes
, gen_or_check
)
509 def subtest_lanes(self
,
510 lanes
: List
[SIMDMulLane
],
511 module
: Mul8_16_32_64
,
512 gen_or_check
: GenOrCheck
) -> AsyncProcessGenerator
:
513 if gen_or_check
== GenOrCheck
.Check
:
514 with self
.subTest(lanes
=repr(lanes
)):
515 yield from self
.subtest_lanes_2(lanes
, module
, gen_or_check
)
517 yield from self
.subtest_lanes_2(lanes
, module
, gen_or_check
)
519 def subtest_file(self
,
520 register_levels
: List
[int]) -> None:
521 module
= Mul8_16_32_64(register_levels
)
522 file_name
= "mul8_16_32_64"
523 if len(register_levels
) != 0:
524 file_name
+= f
"-{'_'.join(map(repr, register_levels))}"
527 module
.intermediate_output
,
529 ports
.extend(module
.part_ops
)
530 ports
.extend(module
.part_pts
.values())
531 with
create_simulator(module
, ports
, file_name
) as sim
:
532 def process(gen_or_check
: GenOrCheck
) -> AsyncProcessGenerator
:
533 for a_signed
in False, True:
534 for b_signed
in False, True:
535 if not a_signed
and b_signed
:
537 for high_half
in False, True:
538 if not high_half
and not (a_signed
and b_signed
):
540 yield from self
.subtest_lanes(
541 [SIMDMulLane(a_signed
,
547 yield from self
.subtest_lanes(
548 [SIMDMulLane(a_signed
,
554 yield from self
.subtest_lanes(
555 [SIMDMulLane(a_signed
,
561 yield from self
.subtest_lanes(
562 [SIMDMulLane(a_signed
,
568 yield from self
.subtest_lanes([SIMDMulLane(False,
586 yield from self
.subtest_lanes([SIMDMulLane(True,
604 yield from self
.subtest_lanes([SIMDMulLane(True,
623 def generate_process() -> AsyncProcessGenerator
:
624 yield from process(GenOrCheck
.Generate
)
626 def check_process() -> AsyncProcessGenerator
:
627 if len(register_levels
) != 0:
628 for _
in register_levels
:
630 yield from process(GenOrCheck
.Check
)
632 sim
.add_clock(2e-6, if_exists
=True)
633 sim
.add_process(generate_process
)
634 sim
.add_process(check_process
)
637 def subtest_register_levels(self
, register_levels
: List
[int]) -> None:
638 with self
.subTest(register_levels
=repr(register_levels
)):
639 self
.subtest_file(register_levels
)
641 def test_empty(self
) -> None:
642 self
.subtest_register_levels([])
644 def test_0(self
) -> None:
645 self
.subtest_register_levels([0])
647 def test_1(self
) -> None:
648 self
.subtest_register_levels([1])
650 def test_2(self
) -> None:
651 self
.subtest_register_levels([2])
653 def test_3(self
) -> None:
654 self
.subtest_register_levels([3])
656 def test_4(self
) -> None:
657 self
.subtest_register_levels([4])
659 def test_5(self
) -> None:
660 self
.subtest_register_levels([5])
662 def test_6(self
) -> None:
663 self
.subtest_register_levels([6])
665 def test_7(self
) -> None:
666 self
.subtest_register_levels([7])
668 def test_8(self
) -> None:
669 self
.subtest_register_levels([8])
671 def test_9(self
) -> None:
672 self
.subtest_register_levels([9])
674 def test_10(self
) -> None:
675 self
.subtest_register_levels([10])
677 def test_0(self
) -> None:
678 self
.subtest_register_levels([0])
680 def test_0_1(self
) -> None:
681 self
.subtest_register_levels([0, 1])
683 def test_0_1_2(self
) -> None:
684 self
.subtest_register_levels([0, 1, 2])
686 def test_0_1_2_3(self
) -> None:
687 self
.subtest_register_levels([0, 1, 2, 3])
689 def test_0_1_2_3_4(self
) -> None:
690 self
.subtest_register_levels([0, 1, 2, 3, 4])
692 def test_0_1_2_3_4_5(self
) -> None:
693 self
.subtest_register_levels([0, 1, 2, 3, 4, 5])
695 def test_0_1_2_3_4_5_6(self
) -> None:
696 self
.subtest_register_levels([0, 1, 2, 3, 4, 5, 6])
698 def test_0_1_2_3_4_5_6_7(self
) -> None:
699 self
.subtest_register_levels([0, 1, 2, 3, 4, 5, 6, 7])
701 def test_0_1_2_3_4_5_6_7_8(self
) -> None:
702 self
.subtest_register_levels([0, 1, 2, 3, 4, 5, 6, 7, 8])
704 def test_0_1_2_3_4_5_6_7_8_9(self
) -> None:
705 self
.subtest_register_levels([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
707 def test_0_1_2_3_4_5_6_7_8_9_10(self
) -> None:
708 self
.subtest_register_levels([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
710 def test_0_2(self
) -> None:
711 self
.subtest_register_levels([0, 2])
713 def test_0_3(self
) -> None:
714 self
.subtest_register_levels([0, 3])
716 def test_0_4(self
) -> None:
717 self
.subtest_register_levels([0, 4])
719 def test_0_5(self
) -> None:
720 self
.subtest_register_levels([0, 5])
722 def test_0_6(self
) -> None:
723 self
.subtest_register_levels([0, 6])
725 def test_0_7(self
) -> None:
726 self
.subtest_register_levels([0, 7])
728 def test_0_8(self
) -> None:
729 self
.subtest_register_levels([0, 8])
731 def test_0_9(self
) -> None:
732 self
.subtest_register_levels([0, 9])
734 def test_0_10(self
) -> None:
735 self
.subtest_register_levels([0, 10])
738 if __name__
== '__main__':