MSCDIR = ../misoc
-CURDIR = ../sata-controller
+CURDIR = ../lite-sata
PYTHON = python3
TOOLCHAIN = vivado
PLATFORM = kc705
---------------------------------------------------------------------------------
__ _ __ _______ _________
/ / (_) /____ / __/ _ /_ __/ _ |
/ /__/ / __/ -_)\ \/ __ |/ / / __ |
Copyright 2014-2015 / Florent Kermarrec / florent@enjoy-digital.fr
A lite open-source SATA1/2/3 controller
- developed in partnership with M-Labs Ltd / HKU
---------------------------------------------------------------------------------
+ developed in partnership with M-Labs Ltd & HKU
[> Getting started
------------------
- link_tb
- command_tb
- bist_tb
- hdd.py is a HDD model with implementing all SATA layers.
+ hdd.py is a HDD model implementing all SATA layers.
To run a simulation, move to the simulation directory and run:
make simulation_name
[> Tests :
A synthetisable BIST is provided. It can be controled with ./test/bist.py
+ Using Miscope and the provided example ./test/test_link.py you are able to
+ visualize every event of the design and even inject your data in the HDD
+ model!
[> Contact
E-mail: florent@enjoy-digital.fr
)
)
-class SATABISTControl(Module, AutoCSR):
+class SATABISTUnitControl(Module, AutoCSR):
def __init__(self, bist_unit):
self._start = CSR()
self._sector = CSRStorage(48)
self.cycles_counter.reset.eq(bist_unit.start),
self.cycles_counter.ce.eq(~bist_unit.done)
]
+
+class SATABISTIdentify(Module):
+ def __init__(self, sata_master_port):
+ self.start = Signal()
+ self.done = Signal()
+
+ self.fifo = fifo = SyncFIFO([("data", 32)], 512, buffered=True)
+ self.source = self.fifo.source
+
+ ###
+
+ source, sink = sata_master_port.source, sata_master_port.sink
+
+ self.fsm = fsm = FSM(reset_state="IDLE")
+ fsm.act("IDLE",
+ self.done.eq(1),
+ If(self.start,
+ NextState("SEND_CMD")
+ )
+ )
+ self.comb += [
+ source.sop.eq(1),
+ source.eop.eq(1),
+ source.identify.eq(1),
+ ]
+ fsm.act("SEND_CMD",
+ source.stb.eq(1),
+ If(source.stb & source.ack,
+ NextState("WAIT_ACK")
+ )
+ )
+ fsm.act("WAIT_ACK",
+ If(sink.stb & sink.identify,
+ NextState("RECEIVE_DATA")
+ )
+ )
+ self.comb += fifo.sink.data.eq(sink.data)
+ fsm.act("RECEIVE_DATA",
+ sink.ack.eq(fifo.sink.ack),
+ If(sink.stb,
+ fifo.sink.stb.eq(1),
+ If(sink.eop,
+ NextState("IDLE")
+ )
+ )
+ )
+
+class SATABISTIdentifyControl(Module, AutoCSR):
+ def __init__(self, bist_identify):
+ self._start = CSR()
+ self._done = CSRStatus()
+ self._source_stb = CSRStatus()
+ self._source_ack = CSR()
+ self._source_data = CSRStatus(32)
+
+ ###
+ self.bist_identify = bist_identify
+ self.comb += [
+ bist_identify.start.eq(self._start.r & self._start.re),
+ self._done.status.eq(bist_identify.done),
+
+ self._source_stb.status.eq(bist_identify.source.stb),
+ self._source_data.status.eq(bist_identify.source.data),
+ bist_identify.source.ack.eq(self._source_ack.r & self._source_ack.re)
+ ]
+
class SATABIST(Module, AutoCSR):
def __init__(self, sata_master_ports, with_control=False):
generator = SATABISTGenerator(sata_master_ports[0])
checker = SATABISTChecker(sata_master_ports[1])
+ identify = SATABISTIdentify(sata_master_ports[2])
if with_control:
- self.generator = SATABISTControl(generator)
- self.checker = SATABISTControl(checker)
+ self.generator = SATABISTUnitControl(generator)
+ self.checker = SATABISTUnitControl(checker)
+ self.identify = SATABISTIdentifyControl(identify)
else:
self.generator = generator
self.checker = checker
+ self.identify = identify
tx_to_rx = [
("write", 1),
("read", 1),
+ ("identify", 1),
("count", 16)
]
NextState("SEND_WRITE_DMA_CMD")
).Elif(sink.read,
NextState("SEND_READ_DMA_CMD")
+ ).Elif(sink.identify,
+ NextState("SEND_IDENTIFY_CMD")
).Else(
sink.ack.eq(1)
)
NextState("IDLE")
)
)
+ fsm.act("SEND_IDENTIFY_CMD",
+ transport.sink.stb.eq(sink.stb),
+ transport.sink.sop.eq(1),
+ transport.sink.eop.eq(1),
+ transport.sink.type.eq(fis_types["REG_H2D"]),
+ transport.sink.c.eq(1),
+ transport.sink.command.eq(regs["IDENTIFY_DEVICE"]),
+ sink.ack.eq(transport.sink.ack),
+ If(sink.stb & sink.ack,
+ NextState("IDLE")
+ )
+ )
self.comb += [
If(sink.stb,
to_rx.write.eq(sink.write),
to_rx.read.eq(sink.read),
+ to_rx.identify.eq(sink.identify),
to_rx.count.eq(sink.count)
)
]
def test_type(name):
return transport.source.type == fis_types[name]
+ identify = Signal()
dma_activate = Signal()
read_ndwords = Signal(max=sectors2dwords(2**16))
self.dwords_counter = dwords_counter = Counter(max=sectors2dwords(2**16))
If(from_tx.write,
NextState("WAIT_WRITE_ACTIVATE_OR_REG_D2H")
).Elif(from_tx.read,
- NextState("WAIT_READ_DATA_OR_REG_D2H")
+ NextState("WAIT_READ_DATA_OR_REG_D2H"),
+ ).Elif(from_tx.identify,
+ NextState("WAIT_PIO_SETUP_D2H"),
)
)
+ self.sync += \
+ If(fsm.ongoing("IDLE"),
+ identify.eq(from_tx.identify)
+ )
fsm.act("WAIT_WRITE_ACTIVATE_OR_REG_D2H",
transport.source.ack.eq(1),
If(transport.source.stb,
)
)
)
+ fsm.act("WAIT_PIO_SETUP_D2H",
+ transport.source.ack.eq(1),
+ If(transport.source.stb,
+ transport.source.ack.eq(0),
+ If(test_type("PIO_SETUP_D2H"),
+ NextState("PRESENT_PIO_SETUP_D2H")
+ )
+ )
+ )
+ fsm.act("PRESENT_PIO_SETUP_D2H",
+ transport.source.ack.eq(1),
+ # XXX : Check error/ status
+ If(transport.source.stb & transport.source.eop,
+ NextState("WAIT_READ_DATA_OR_REG_D2H")
+ )
+ )
+
self.comb += [
data_buffer.sink.sop.eq(transport.source.sop),
data_buffer.sink.eop.eq(transport.source.eop),
If(data_buffer.sink.stb & data_buffer.sink.ack,
self.dwords_counter.ce.eq(~read_done),
If(data_buffer.sink.eop,
- If(read_done,
+ If(read_done & ~identify,
NextState("WAIT_READ_DATA_OR_REG_D2H")
).Else(
NextState("PRESENT_READ_RESPONSE")
)
fsm.act("PRESENT_READ_RESPONSE",
cmd_buffer.sink.stb.eq(1),
- cmd_buffer.sink.read.eq(1),
- cmd_buffer.sink.last.eq(read_done),
+ cmd_buffer.sink.read.eq(~identify),
+ cmd_buffer.sink.identify.eq(identify),
+ cmd_buffer.sink.last.eq(read_done | identify),
cmd_buffer.sink.success.eq(~read_error),
cmd_buffer.sink.failed.eq(read_error),
If(cmd_buffer.sink.stb & cmd_buffer.sink.ack,
If(cmd_buffer.sink.failed,
data_buffer.reset.eq(1)
),
- If(read_done,
+ If(read_done | identify,
NextState("IDLE")
).Else(
NextState("WAIT_READ_DATA_OR_REG_D2H")
self.out_fsm = out_fsm = FSM(reset_state="IDLE")
out_fsm.act("IDLE",
If(cmd_buffer.source.stb,
- If(cmd_buffer.source.read & cmd_buffer.source.success,
+ If((cmd_buffer.source.read | cmd_buffer.source.identify) & cmd_buffer.source.success,
NextState("PRESENT_RESPONSE_WITH_DATA"),
).Else(
NextState("PRESENT_RESPONSE_WITHOUT_DATA"),
self.comb += [
source.write.eq(cmd_buffer.source.write),
source.read.eq(cmd_buffer.source.read),
+ source.identify.eq(cmd_buffer.source.identify),
source.last.eq(cmd_buffer.source.last),
source.success.eq(cmd_buffer.source.success),
- source.failed.eq(cmd_buffer.source.success),
+ source.failed.eq(cmd_buffer.source.failed),
source.data.eq(data_buffer.source.data)
]
"REG_H2D": 0x27,
"REG_D2H": 0x34,
"DMA_ACTIVATE_D2H": 0x39,
+ "PIO_SETUP_D2H": 0x5F,
"DATA": 0x46
}
"pm_port": FISField(0, 8, 4)
}
+fis_pio_setup_d2h_cmd_len = 5
+fis_pio_setup_d2h_layout = {
+ "type": FISField(0, 0, 8),
+ "pm_port": FISField(0, 8, 4),
+ "d": FISField(0, 13, 1),
+ "i": FISField(0, 14, 1),
+ "status": FISField(0, 16, 8),
+ "error": FISField(0, 24, 8),
+
+ "lba_lsb": FISField(1, 0, 24),
+
+ "lba_msb": FISField(2, 0, 24),
+
+ "count": FISField(3, 0, 16),
+
+ "transfer_count": FISField(4, 0, 16),
+}
+
fis_data_cmd_len = 1
fis_data_layout = {
"type": FISField(0, 0, 8)
layout = [
("type", 8),
("pm_port", 4),
+ ("r", 1),
+ ("d", 1),
("i", 1),
("status", 8),
("error", 8),
("lba", 48),
("device", 8),
("count", 16),
+ ("transfer_count", 16),
("data", dw),
("error", 1)
]
# Command Layer
regs = {
"WRITE_DMA_EXT" : 0x35,
- "READ_DMA_EXT" : 0x25
+ "READ_DMA_EXT" : 0x25,
+ "IDENTIFY_DEVICE" : 0xEC
}
def command_tx_description(dw):
layout = [
("write", 1),
("read", 1),
+ ("identify", 1),
("sector", 48),
("count", 16),
("data", dw)
layout = [
("write", 1),
("read", 1),
+ ("identify", 1),
("last", 1),
("success", 1),
("failed", 1),
layout = [
("write", 1),
("read", 1),
+ ("identify", 1),
("last", 1),
("success", 1),
- ("failed", 1),
+ ("failed", 1)
]
return EndpointDescription(layout, packetized=False)
NextState("RECEIVE_REG_D2H_CMD")
).Elif(test_type("DMA_ACTIVATE_D2H"),
NextState("RECEIVE_DMA_ACTIVATE_D2H_CMD")
+ ).Elif(test_type("PIO_SETUP_D2H"),
+ NextState("RECEIVE_PIO_SETUP_D2H_CMD")
).Elif(test_type("DATA"),
NextState("RECEIVE_DATA_CMD"),
).Else(
NextState("IDLE")
)
)
+ fsm.act("RECEIVE_PIO_SETUP_D2H_CMD",
+ cmd_len.eq(fis_pio_setup_d2h_cmd_len-1),
+ cmd_receive.eq(1),
+ link.source.ack.eq(1),
+ If(cmd_done,
+ NextState("PRESENT_PIO_SETUP_D2H_CMD")
+ )
+ )
+ fsm.act("PRESENT_PIO_SETUP_D2H_CMD",
+ source.stb.eq(1),
+ source.sop.eq(1),
+ source.eop.eq(1),
+ _decode_cmd(encoded_cmd, fis_pio_setup_d2h_layout, source),
+ If(source.stb & source.ack,
+ NextState("IDLE")
+ )
+ )
fsm.act("RECEIVE_DATA_CMD",
cmd_len.eq(fis_data_cmd_len-1),
cmd_receive.eq(1),
from lib.sata.common import *
from lib.sata.phy import SATAPHY
from lib.sata import SATACON
-from lib.sata.bist import SATABIST, SATABISTControl
+from lib.sata.bist import SATABIST
class _CRG(Module):
def __init__(self, platform):
self.sata_con = SATACON(self.sata_phy)
# SATA BIST generator and checker
- self.sata_bist = SATABIST(self.sata_con.crossbar.get_ports(2), with_control=True)
+ self.sata_bist = SATABIST(self.sata_con.crossbar.get_ports(3), with_control=True)
# Status Leds
self.leds = BISTLeds(platform, self.sata_phy)
def __init__(self, platform, export_mila=False):
BISTSoC.__init__(self, platform, export_mila)
+ self.sata_con_link_rx_fsm_state = Signal(4)
+ self.sata_con_link_tx_fsm_state = Signal(4)
+ self.sata_con_transport_rx_fsm_state = Signal(4)
+ self.sata_con_transport_tx_fsm_state = Signal(4)
+ self.sata_con_command_rx_fsm_state = Signal(4)
+ self.sata_con_command_tx_fsm_state = Signal(4)
+
debug = (
self.sata_phy.ctrl.ready,
self.sata_con.command.sink.ack,
self.sata_con.command.sink.write,
self.sata_con.command.sink.read,
+ self.sata_con.command.sink.identify,
self.sata_con.command.source.stb,
self.sata_con.command.source.sop,
self.sata_con.command.source.ack,
self.sata_con.command.source.write,
self.sata_con.command.source.read,
+ self.sata_con.command.source.identify,
self.sata_con.command.source.success,
self.sata_con.command.source.failed,
- self.sata_con.command.source.data
+ self.sata_con.command.source.data,
+
+ self.sata_con_link_rx_fsm_state,
+ self.sata_con_link_tx_fsm_state,
+ self.sata_con_transport_rx_fsm_state,
+ self.sata_con_transport_tx_fsm_state,
+ self.sata_con_command_rx_fsm_state,
+ self.sata_con_command_tx_fsm_state,
)
self.mila = MiLa(depth=2048, dat=Cat(*debug))
if export_mila:
mila_filename = os.path.join(platform.soc_ext_path, "test", "mila.csv")
self.mila.export(self, debug, mila_filename)
+ def do_finalize(self):
+ BISTSoC.do_finalize(self)
+ self.comb += [
+ self.sata_con_link_rx_fsm_state.eq(self.sata_con.link.rx.fsm.state),
+ self.sata_con_link_tx_fsm_state.eq(self.sata_con.link.tx.fsm.state),
+ self.sata_con_transport_rx_fsm_state.eq(self.sata_con.transport.rx.fsm.state),
+ self.sata_con_transport_tx_fsm_state.eq(self.sata_con.transport.tx.fsm.state),
+ self.sata_con_command_rx_fsm_state.eq(self.sata_con.command.rx.fsm.state),
+ self.sata_con_command_tx_fsm_state.eq(self.sata_con.command.tx.fsm.state)
+ ]
-#default_subtarget = BISTSoC
-default_subtarget = BISTSoCDevel
+default_subtarget = BISTSoC
+#default_subtarget = BISTSoCDevel
logical_sector_size = 512
-class SATABISTDriver:
+class SATABISTUnitDriver:
def __init__(self, regs, name):
self.regs = regs
self.name = name
errors = self.errors.read()
return (speed, errors)
-class SATABISTGeneratorDriver(SATABISTDriver):
+class SATABISTGeneratorDriver(SATABISTUnitDriver):
def __init__(self, regs, name):
- SATABISTDriver.__init__(self, regs, name + "_generator")
+ SATABISTUnitDriver.__init__(self, regs, name + "_generator")
-class SATABISTCheckerDriver(SATABISTDriver):
+class SATABISTCheckerDriver(SATABISTUnitDriver):
def __init__(self, regs, name):
- SATABISTDriver.__init__(self, regs, name + "_checker")
+ SATABISTUnitDriver.__init__(self, regs, name + "_checker")
+
+class SATABISTIdentifyDriver:
+ def __init__(self, regs, name):
+ self.regs = regs
+ self.name = name
+ for s in ["start", "done", "source_stb", "source_ack", "source_data"]:
+ setattr(self, s, getattr(regs, name + "_identify_"+ s))
+ self.data = []
+
+ def read_fifo(self):
+ self.data = []
+ while self.source_stb.read():
+ self.data.append(self.source_data.read())
+ self.source_ack.write(1)
+
+ def run(self):
+ self.read_fifo() # flush the fifo before we start
+ self.start.write(1)
+ while (self.done.read() == 0):
+ pass
+ self.read_fifo()
+ self.decode()
+
+ def decode(self):
+ self.serial_number = ""
+ for i, dword in enumerate(self.data[10:20]):
+ s = dword.to_bytes(4, byteorder='big').decode("utf-8")
+ self.serial_number += s[2:] + s[:2]
+
+ def __repr__(self):
+ r = "Serial Number: " + self.serial_number
+ # XXX: enhance decode function
+ return r
KB = 1024
MB = 1024*KB
args = _get_args()
wb.open()
###
+ identify = SATABISTIdentifyDriver(wb.regs, "sata_bist")
generator = SATABISTGeneratorDriver(wb.regs, "sata_bist")
checker = SATABISTCheckerDriver(wb.regs, "sata_bist")
+ identify.run()
+ print(identify)
+
sector = 0
count = int(args.transfer_size)*MB//logical_sector_size
length = int(args.total_length)*MB
from miscope.host.drivers import MiLaDriver
mila = MiLaDriver(wb.regs, "mila")
+identify = SATABISTIdentifyDriver(wb.regs, "sata_bist")
generator = SATABISTGeneratorDriver(wb.regs, "sata_bist")
checker = SATABISTCheckerDriver(wb.regs, "sata_bist")
wb.open()
"bistsocdevel_sata_con_command_source_source_stb" : 1,
"bistsocdevel_sata_con_command_source_source_payload_read" : 1,
}
+conditions["id_cmd"] = {
+ "bistsocdevel_sata_con_command_sink_stb" : 1,
+ "bistsocdevel_sata_con_command_sink_payload_identify" : 1,
+}
+conditions["id_pio_setup"] = {
+ "bistsocdevel_sata_phy_source_source_payload_data" : primitives["X_RDY"],
+}
mila.prog_term(port=0, cond=conditions[sys.argv[1]])
mila.prog_sum("term")
# Trigger / wait / receive
mila.trigger(offset=512, length=2000)
+identify.run()
generator.run(0, 2, 0)
checker.run(0, 2, 0)
mila.wait_done()