1 from litesata
.common
import *
3 def _get_item(obj
, name
, width
):
5 item
= getattr(obj
, name
.replace("_lsb", ""))[:width
]
7 item
= getattr(obj
, name
.replace("_msb", ""))[width
:2*width
]
9 item
= getattr(obj
, name
)
12 def _encode_cmd(obj
, description
, signal
):
14 for k
, v
in sorted(description
.items()):
15 start
= v
.dword
*32 + v
.offset
17 item
= _get_item(obj
, k
, v
.width
)
18 r
.append(signal
[start
:end
].eq(item
))
21 def test_type(name
, signal
):
22 return signal
== fis_types
[name
]
24 class LiteSATATransportTX(Module
):
25 def __init__(self
, link
):
26 self
.sink
= sink
= Sink(transport_tx_description(32))
30 cmd_ndwords
= max(fis_reg_h2d_cmd_len
, fis_data_cmd_len
)
31 encoded_cmd
= Signal(cmd_ndwords
*32)
33 counter
= Counter(max=cmd_ndwords
+1)
34 self
.submodules
+= counter
36 cmd_len
= Signal(counter
.width
)
37 cmd_with_data
= Signal()
44 update_fis_type
= Signal()
46 def test_type_tx(name
):
47 return test_type(name
, sink
.type)
49 self
.fsm
= fsm
= FSM(reset_state
="IDLE")
50 self
.submodules
+= fsm
54 update_fis_type
.eq(1),
55 If(sink
.stb
& sink
.sop
,
56 If(test_type_tx("REG_H2D"),
57 NextState("SEND_CTRL_CMD")
58 ).Elif(test_type_tx("DATA"),
59 NextState("SEND_DATA_CMD")
68 If(update_fis_type
, fis_type
.eq(link
.source
.d
[:8]))
70 fsm
.act("SEND_CTRL_CMD",
71 _encode_cmd(sink
, fis_reg_h2d_layout
, encoded_cmd
),
72 cmd_len
.eq(fis_reg_h2d_cmd_len
-1),
79 fsm
.act("SEND_DATA_CMD",
81 _encode_cmd(sink
, fis_data_layout
, encoded_cmd
),
82 cmd_len
.eq(fis_data_cmd_len
-1),
86 NextState("SEND_DATA")
91 sink
.ack
.eq(link
.sink
.ack
),
92 If(sink
.stb
& sink
.eop
& sink
.ack
,
98 for i
in range(cmd_ndwords
):
99 cmd_cases
[i
] = [link
.sink
.d
.eq(encoded_cmd
[32*i
:32*(i
+1)])]
102 counter
.ce
.eq(sink
.stb
& link
.sink
.ack
),
103 cmd_done
.eq((counter
.value
== cmd_len
) & link
.sink
.stb
& link
.sink
.ack
),
105 link
.sink
.stb
.eq(sink
.stb
),
106 link
.sink
.sop
.eq(counter
.value
== 0),
107 link
.sink
.eop
.eq((counter
.value
== cmd_len
) & ~cmd_with_data
),
108 Case(counter
.value
, cmd_cases
)
110 link
.sink
.stb
.eq(sink
.stb
),
112 link
.sink
.eop
.eq(sink
.eop
),
113 link
.sink
.d
.eq(sink
.data
)
117 def _decode_cmd(signal
, description
, obj
):
119 for k
, v
in sorted(description
.items()):
120 start
= v
.dword
*32+v
.offset
122 item
= _get_item(obj
, k
, v
.width
)
123 r
.append(item
.eq(signal
[start
:end
]))
126 class LiteSATATransportRX(Module
):
127 def __init__(self
, link
):
128 self
.source
= source
= Source(transport_rx_description(32))
132 cmd_ndwords
= max(fis_reg_d2h_cmd_len
, fis_dma_activate_d2h_cmd_len
,
133 fis_pio_setup_d2h_cmd_len
, fis_data_cmd_len
)
134 encoded_cmd
= Signal(cmd_ndwords
*32)
136 counter
= Counter(max=cmd_ndwords
+1)
137 self
.submodules
+= counter
139 cmd_len
= Signal(counter
.width
)
141 cmd_receive
= Signal()
142 data_receive
= Signal()
146 def test_type_rx(name
):
147 return test_type(name
, link
.source
.d
[:8])
149 self
.fsm
= fsm
= FSM(reset_state
="IDLE")
150 self
.submodules
+= fsm
154 update_fis_type
= Signal()
157 link
.source
.ack
.eq(0),
159 update_fis_type
.eq(1),
160 If(link
.source
.stb
& link
.source
.sop
,
161 If(test_type_rx("REG_D2H"),
162 NextState("RECEIVE_CTRL_CMD")
163 ).Elif(test_type_rx("DMA_ACTIVATE_D2H"),
164 NextState("RECEIVE_CTRL_CMD")
165 ).Elif(test_type_rx("PIO_SETUP_D2H"),
166 NextState("RECEIVE_CTRL_CMD")
167 ).Elif(test_type_rx("DATA"),
168 NextState("RECEIVE_DATA_CMD"),
170 link
.source
.ack
.eq(1)
173 link
.source
.ack
.eq(1)
177 If(update_fis_type
, fis_type
.eq(link
.source
.d
[:8]))
179 fsm
.act("RECEIVE_CTRL_CMD",
180 If(test_type("REG_D2H", fis_type
),
181 cmd_len
.eq(fis_reg_d2h_cmd_len
-1)
182 ).Elif(test_type("DMA_ACTIVATE_D2H", fis_type
),
183 cmd_len
.eq(fis_dma_activate_d2h_cmd_len
-1)
185 cmd_len
.eq(fis_pio_setup_d2h_cmd_len
-1)
188 link
.source
.ack
.eq(1),
190 NextState("PRESENT_CTRL_CMD")
193 fsm
.act("PRESENT_CTRL_CMD",
197 If(test_type("REG_D2H", fis_type
),
198 _decode_cmd(encoded_cmd
, fis_reg_d2h_layout
, source
),
199 ).Elif(test_type("DMA_ACTIVATE_D2H", fis_type
),
200 _decode_cmd(encoded_cmd
, fis_dma_activate_d2h_layout
, source
),
202 _decode_cmd(encoded_cmd
, fis_pio_setup_d2h_layout
, source
),
204 If(source
.stb
& source
.ack
,
208 fsm
.act("RECEIVE_DATA_CMD",
209 cmd_len
.eq(fis_data_cmd_len
-1),
211 link
.source
.ack
.eq(1),
213 NextState("PRESENT_DATA")
216 fsm
.act("PRESENT_DATA",
218 source
.stb
.eq(link
.source
.stb
),
219 _decode_cmd(encoded_cmd
, fis_data_layout
, source
),
220 source
.sop
.eq(data_sop
),
221 source
.eop
.eq(link
.source
.eop
),
222 source
.error
.eq(link
.source
.error
),
223 source
.data
.eq(link
.source
.d
),
224 link
.source
.ack
.eq(source
.ack
),
225 If(source
.stb
& source
.eop
& source
.ack
,
231 If(fsm
.ongoing("RECEIVE_DATA_CMD"),
233 ).Elif(fsm
.ongoing("PRESENT_DATA"),
234 If(source
.stb
& source
.ack
,
240 for i
in range(cmd_ndwords
):
241 cmd_cases
[i
] = [encoded_cmd
[32*i
:32*(i
+1)].eq(link
.source
.d
)]
244 If(cmd_receive
& link
.source
.stb
,
249 Case(counter
.value
, cmd_cases
),
251 self
.comb
+= cmd_done
.eq((counter
.value
== cmd_len
) & link
.source
.ack
)
253 class LiteSATATransport(Module
):
254 def __init__(self
, link
):
255 self
.submodules
.tx
= LiteSATATransportTX(link
)
256 self
.submodules
.rx
= LiteSATATransportRX(link
)
257 self
.sink
, self
.source
= self
.tx
.sink
, self
.rx
.source