1 from nmigen
import Elaboratable
, Module
2 from .config
import MemoryPipeConfig
3 from .memory_queue_entry
import MemoryQueueEntry
4 from typing
import Optional
7 class MemoryQueueChunk(Elaboratable
):
8 next_back_chunk
: Optional
["MemoryQueueChunk"]
10 def __init__(self
, config
: MemoryPipeConfig
, chunk_index
: int):
12 self
.chunk_index
= chunk_index
13 start
= config
.memory_queue_chunk_entries_start_index(chunk_index
)
14 end
= config
.memory_queue_chunk_entries_end_index(chunk_index
)
15 self
.entries
= [MemoryQueueEntry(config
)
16 for i
in range(start
, end
)]
18 def elaborate(self
, platform
):
20 for i
in range(len(self
.entries
)):
21 entry
= self
.entries
[i
]
22 entry_index
= self
.config
.memory_queue_entry_index(
24 setattr(m
.submodules
, f
"entry_{entry_index}", entry
)
25 if self
.next_back_chunk
is not None and i
< len(self
.next_back_chunk
.entries
):
26 m
.d
.comb
+= entry
.next_back_chunks_next_op
.eq(
27 self
.next_back_chunk
.entries
[i
])
29 m
.d
.comb
+= entry
.next_back_chunks_next_op
.eq_empty()
33 class MemoryQueue(Elaboratable
):
34 def __init__(self
, config
: MemoryPipeConfig
):
36 self
.chunks
= [MemoryQueueChunk(config
, i
)
37 for i
in range(config
.memory_queue_chunk_count
)]
39 for chunk
in self
.chunks
:
40 self
.entries
.extend(chunk
.entries
)
42 def elaborate(self
, platform
):
44 for i
in range(self
.config
.memory_queue_chunk_count
):
45 chunk
= self
.chunks
[i
]
46 setattr(m
.submodules
, f
"chunk_{i}", chunk
)
48 self
.chunks
[i
- 1].next_back_chunk
= chunk