1 from litesata
.common
import *
3 def _get_item(obj
, name
, width
):
5 item
= getattr(obj
, name
.replace("_lsb", ""))[:width
]
7 item
= getattr(obj
, name
.replace("_msb", ""))[width
:2*width
]
9 item
= getattr(obj
, name
)
12 def _encode_cmd(obj
, description
, signal
):
14 for k
, v
in sorted(description
.items()):
15 start
= v
.dword
*32 + v
.offset
17 item
= _get_item(obj
, k
, v
.width
)
18 r
.append(signal
[start
:end
].eq(item
))
21 def test_type(name
, signal
):
22 return signal
== fis_types
[name
]
24 class LiteSATATransportTX(Module
):
25 def __init__(self
, link
):
26 self
.sink
= sink
= Sink(transport_tx_description(32))
30 cmd_ndwords
= max(fis_reg_h2d_cmd_len
, fis_data_cmd_len
)
31 encoded_cmd
= Signal(cmd_ndwords
*32)
33 self
.counter
= counter
= Counter(max=cmd_ndwords
+1)
35 cmd_len
= Signal(counter
.width
)
36 cmd_with_data
= Signal()
43 update_fis_type
= Signal()
45 def test_type_tx(name
):
46 return test_type(name
, sink
.type)
48 self
.fsm
= fsm
= FSM(reset_state
="IDLE")
52 update_fis_type
.eq(1),
53 If(sink
.stb
& sink
.sop
,
54 If(test_type_tx("REG_H2D"),
55 NextState("SEND_CTRL_CMD")
56 ).Elif(test_type_tx("DATA"),
57 NextState("SEND_DATA_CMD")
66 If(update_fis_type
, fis_type
.eq(link
.source
.d
[:8]))
68 fsm
.act("SEND_CTRL_CMD",
69 _encode_cmd(sink
, fis_reg_h2d_layout
, encoded_cmd
),
70 cmd_len
.eq(fis_reg_h2d_cmd_len
-1),
77 fsm
.act("SEND_DATA_CMD",
79 _encode_cmd(sink
, fis_data_layout
, encoded_cmd
),
80 cmd_len
.eq(fis_data_cmd_len
-1),
84 NextState("SEND_DATA")
89 sink
.ack
.eq(link
.sink
.ack
),
90 If(sink
.stb
& sink
.eop
& sink
.ack
,
96 for i
in range(cmd_ndwords
):
97 cmd_cases
[i
] = [link
.sink
.d
.eq(encoded_cmd
[32*i
:32*(i
+1)])]
100 counter
.ce
.eq(sink
.stb
& link
.sink
.ack
),
101 cmd_done
.eq((counter
.value
== cmd_len
) & link
.sink
.stb
& link
.sink
.ack
),
103 link
.sink
.stb
.eq(sink
.stb
),
104 link
.sink
.sop
.eq(counter
.value
== 0),
105 link
.sink
.eop
.eq((counter
.value
== cmd_len
) & ~cmd_with_data
),
106 Case(counter
.value
, cmd_cases
)
108 link
.sink
.stb
.eq(sink
.stb
),
110 link
.sink
.eop
.eq(sink
.eop
),
111 link
.sink
.d
.eq(sink
.data
)
115 def _decode_cmd(signal
, description
, obj
):
117 for k
, v
in sorted(description
.items()):
118 start
= v
.dword
*32+v
.offset
120 item
= _get_item(obj
, k
, v
.width
)
121 r
.append(item
.eq(signal
[start
:end
]))
124 class LiteSATATransportRX(Module
):
125 def __init__(self
, link
):
126 self
.source
= source
= Source(transport_rx_description(32))
130 cmd_ndwords
= max(fis_reg_d2h_cmd_len
, fis_dma_activate_d2h_cmd_len
,
131 fis_pio_setup_d2h_cmd_len
, fis_data_cmd_len
)
132 encoded_cmd
= Signal(cmd_ndwords
*32)
134 self
.counter
= counter
= Counter(max=cmd_ndwords
+1)
136 cmd_len
= Signal(counter
.width
)
138 cmd_receive
= Signal()
139 data_receive
= Signal()
143 def test_type_rx(name
):
144 return test_type(name
, link
.source
.d
[:8])
146 self
.fsm
= fsm
= FSM(reset_state
="IDLE")
150 update_fis_type
= Signal()
153 link
.source
.ack
.eq(0),
155 update_fis_type
.eq(1),
156 If(link
.source
.stb
& link
.source
.sop
,
157 If(test_type_rx("REG_D2H"),
158 NextState("RECEIVE_CTRL_CMD")
159 ).Elif(test_type_rx("DMA_ACTIVATE_D2H"),
160 NextState("RECEIVE_CTRL_CMD")
161 ).Elif(test_type_rx("PIO_SETUP_D2H"),
162 NextState("RECEIVE_CTRL_CMD")
163 ).Elif(test_type_rx("DATA"),
164 NextState("RECEIVE_DATA_CMD"),
166 link
.source
.ack
.eq(1)
169 link
.source
.ack
.eq(1)
173 If(update_fis_type
, fis_type
.eq(link
.source
.d
[:8]))
175 fsm
.act("RECEIVE_CTRL_CMD",
176 If(test_type("REG_D2H", fis_type
),
177 cmd_len
.eq(fis_reg_d2h_cmd_len
-1)
178 ).Elif(test_type("DMA_ACTIVATE_D2H", fis_type
),
179 cmd_len
.eq(fis_dma_activate_d2h_cmd_len
-1)
181 cmd_len
.eq(fis_pio_setup_d2h_cmd_len
-1)
184 link
.source
.ack
.eq(1),
186 NextState("PRESENT_CTRL_CMD")
189 fsm
.act("PRESENT_CTRL_CMD",
193 If(test_type("REG_D2H", fis_type
),
194 _decode_cmd(encoded_cmd
, fis_reg_d2h_layout
, source
),
195 ).Elif(test_type("DMA_ACTIVATE_D2H", fis_type
),
196 _decode_cmd(encoded_cmd
, fis_dma_activate_d2h_layout
, source
),
198 _decode_cmd(encoded_cmd
, fis_pio_setup_d2h_layout
, source
),
200 If(source
.stb
& source
.ack
,
204 fsm
.act("RECEIVE_DATA_CMD",
205 cmd_len
.eq(fis_data_cmd_len
-1),
207 link
.source
.ack
.eq(1),
209 NextState("PRESENT_DATA")
212 fsm
.act("PRESENT_DATA",
214 source
.stb
.eq(link
.source
.stb
),
215 _decode_cmd(encoded_cmd
, fis_data_layout
, source
),
216 source
.sop
.eq(data_sop
),
217 source
.eop
.eq(link
.source
.eop
),
218 source
.error
.eq(link
.source
.error
),
219 source
.data
.eq(link
.source
.d
),
220 link
.source
.ack
.eq(source
.ack
),
221 If(source
.stb
& source
.eop
& source
.ack
,
227 If(fsm
.ongoing("RECEIVE_DATA_CMD"),
229 ).Elif(fsm
.ongoing("PRESENT_DATA"),
230 If(source
.stb
& source
.ack
,
236 for i
in range(cmd_ndwords
):
237 cmd_cases
[i
] = [encoded_cmd
[32*i
:32*(i
+1)].eq(link
.source
.d
)]
240 If(cmd_receive
& link
.source
.stb
,
245 Case(counter
.value
, cmd_cases
),
247 self
.comb
+= cmd_done
.eq((counter
.value
== cmd_len
) & link
.source
.ack
)
249 class LiteSATATransport(Module
):
250 def __init__(self
, link
):
251 self
.tx
= LiteSATATransportTX(link
)
252 self
.rx
= LiteSATATransportRX(link
)
253 self
.sink
, self
.source
= self
.tx
.sink
, self
.rx
.source