--- /dev/null
+from migen.fhdl.std import *
+from migen.genlib.fsm import FSM, NextState
+
+from lib.sata.std import *
+from lib.sata.transport.std import *
+
+regs = {
+ "WRITE_DMA_EXT" : 0x35,
+ "READ_DMA_EXT" : 0x25,
+ "IDENTIFY_DEVICE_DMA" : 0xEE
+}
+
+from_rx = [
+ ("dma_activate", 1)
+]
+
+class SATACommandTX(Module):
+ def __init__(self, transport):
+ self.sink = sink = Sink(command_tx_layout(32))
+ self.from_rx = Sink(from_rx)
+
+ ###
+
+ self.comb += [
+ transport.sink.pm_port.eq(0),
+ transport.sink.features.eq(0),
+ transport.sink.lba.eq(sink.address), # XXX need adaptation?
+ transport.sink.device.eq(0xe0),
+ transport.sink.count.eq(sink.length), # XXX need adaptation?
+ transport.sink.icc.eq(0),
+ transport.sink.control.eq(0),
+ ]
+
+ fsm = FSM(reset_state="IDLE")
+ self.submodules += fsm
+
+ fsm.act("IDLE",
+ If(sink.stb & sink.sop,
+ If(sink.write,
+ NextState("SEND_WRITE_DMA_CMD")
+ ).Elif(sink.read,
+ NextState("SEND_READ_DMA_CMD")
+ ).Elif(sink.identify,
+ NextState("SEND_IDENTIFY_CMD")
+ ).Else(
+ sink.ack.eq(1)
+ )
+ ).Else(
+ sink.ack.eq(1)
+ )
+ )
+ fsm.act("SEND_WRITE_DMA_CMD",
+ transport.sink.stb.eq(1),
+ transport.sink.sop.eq(1),
+ transport.sink.eop.eq(1),
+ transport.sink.type.eq(fis_types["REG_H2D"]),
+ transport.sink.c.eq(1),
+ transport.sink.command.eq(regs["WRITE_DMA_EXT"]),
+ If(transport.sink.ack,
+ NextState("WAIT_DMA_ACTIVATE")
+ )
+ )
+ fsm.act("WAIT_DMA_ACTIVATE",
+ If(self.from_rx.dma_activate,
+ NextState("SEND_DATA")
+ )
+ )
+ fsm.act("SEND_DATA",
+ transport.sink.stb.eq(sink.stb),
+ transport.sink.sop.eq(sink.sop),
+ transport.sink.eop.eq(sink.eop),
+ transport.sink.type.eq(fis_types["DATA"]),
+ transport.sink.data.eq(sink.data),
+ sink.ack.eq(transport.sink.ack),
+ If(sink.stb & sink.ack & sink.eop,
+ NextState("IDLE")
+ )
+ )
+ fsm.act("SEND_READ_DMA_CMD",
+ transport.sink.stb.eq(sink.stb),
+ transport.sink.sop.eq(1),
+ transport.sink.eop.eq(1),
+ transport.sink.type.eq(fis_types["REG_H2D"]),
+ transport.sink.c.eq(1),
+ transport.sink.command.eq(regs["READ_DMA_EXT"]),
+ sink.ack.eq(transport.sink.ack),
+ If(sink.stb & sink.ack,
+ NextState("IDLE")
+ )
+ )
+ fsm.act("SEND_IDENTIFY_CMD",
+ transport.sink.stb.eq(sink.stb),
+ transport.sink.sop.eq(1),
+ transport.sink.eop.eq(1),
+ transport.sink.type.eq(fis_types["REG_H2D"]),
+ transport.sink.c.eq(1),
+ transport.sink.command.eq(regs["IDENTIFY_DEVICE_DMA"]),
+ sink.ack.eq(transport.sink.ack),
+ If(sink.stb & sink.ack,
+ NextState("IDLE")
+ )
+ )
+
+class SATACommandRX(Module):
+ def __init__(self, transport):
+ self.source = source = Source(command_tx_layout(32))
+ self.to_tx = Source(from_rx)
+
+ ###
+
+ self.comb += [
+ transport.source.ack.eq(1),
+ If(transport.source.stb & (transport.source.type == fis_types["DMA_ACTIVATE_D2H"]),
+ self.to_tx.dma_activate.eq(1)
+ )
+ ]
+
+class SATACommand(Module):
+ def __init__(self, transport):
+ self.submodules.tx = SATACommandTX(transport)
+ self.submodules.rx = SATACommandRX(transport)
+ self.sink, self.source = self.tx.sink, self.rx.source
+ self.comb += self.rx.to_tx.connect(self.tx.from_rx)
class FIS_REG_H2D(FIS):
def __init__(self, packet=[0]*fis_reg_h2d_cmd_len):
FIS.__init__(self, packet,fis_reg_h2d_layout)
+ self.type = fis_types["REG_H2D"]
def __repr__(self):
r = "FIS_REG_H2D\n"
class FIS_REG_D2H(FIS):
def __init__(self, packet=[0]*fis_reg_d2h_cmd_len):
FIS.__init__(self, packet,fis_reg_d2h_layout)
+ self.type = fis_types["REG_D2H"]
def __repr__(self):
r = "FIS_REG_D2H\n"
class FIS_DMA_ACTIVATE_D2H(FIS):
def __init__(self, packet=[0]*fis_dma_activate_d2h_cmd_len):
FIS.__init__(self, packet,fis_dma_activate_d2h_layout)
+ self.type = fis_types["DMA_ACTIVATE_D2H"]
def __repr__(self):
r = "FIS_DMA_ACTIVATE_D2H\n"
class FIS_DATA(FIS):
def __init__(self, packet=[0]):
FIS.__init__(self, packet,fis_data_layout)
+ self.type = fis_types["DATA"]
def __repr__(self):
r = "FIS_DATA\n"
self.loopback = loopback
self.link.set_transport_callback(self.callback)
+ def set_command_callback(self, callback):
+ self.command_callback = callback
+
+ def send(self, fis):
+ fis.encode()
+ packet = LinkTXPacket(fis.packet)
+ self.link.tx_packets.append(packet)
+ if self.debug and not self.loopback:
+ print(fis)
+
def callback(self, packet):
- fis_type = packet[0]
+ fis_type = packet[0] & 0xff
if fis_type == fis_types["REG_H2D"]:
fis = FIS_REG_H2D(packet)
elif fis_type == fis_types["REG_D2H"]:
if self.debug:
print(fis)
if self.loopback:
- packet = LinkTXPacket(fis.packet)
- self.link.tx_packets.append(packet)
+ self.send(fis)
+ else:
+ self.command_callback(fis)
+
+regs = {
+ "WRITE_DMA_EXT" : 0x35,
+ "READ_DMA_EXT" : 0x25,
+ "IDENTIFY_DEVICE_DMA" : 0xEE
+}
+
+class CommandLayer(Module):
+ def __init__(self, transport, debug=False):
+ self.transport = transport
+ self.debug = debug
+ self.transport.set_command_callback(self.callback)
+
+ def callback(self, fis):
+ if isinstance(fis, FIS_REG_H2D):
+ if fis.command == regs["WRITE_DMA_EXT"]:
+ # XXX add checks
+ dma_activate = FIS_DMA_ACTIVATE_D2H()
+ # XXX fill dma_activate
+ self.transport.send(dma_activate)
class BFM(Module):
def __init__(self,
phy_debug=False,
link_debug=False, link_random_level=0,
- transport_debug=False, transport_loopback=False
+ transport_debug=False, transport_loopback=False,
+ command_debug=False
):
###
self.submodules.phy = PHYLayer(phy_debug)
self.submodules.link = LinkLayer(self.phy, link_debug, link_random_level)
self.submodules.transport = TransportLayer(self.link, transport_debug, transport_loopback)
+ self.submodules.command = CommandLayer(self.transport, command_debug)
--- /dev/null
+import random
+
+from migen.fhdl.std import *
+from migen.genlib.record import *
+from migen.sim.generic import run_simulation
+
+from lib.sata.std import *
+from lib.sata.link import SATALink
+from lib.sata.transport import SATATransport
+from lib.sata.command import SATACommand
+
+from lib.sata.test.bfm import *
+from lib.sata.test.common import *
+
+class TB(Module):
+ def __init__(self):
+ self.submodules.bfm = BFM(phy_debug=False,
+ link_random_level=0, transport_debug=True, transport_loopback=False)
+ self.submodules.link = SATALink(self.bfm.phy)
+ self.submodules.transport = SATATransport(self.link)
+ self.submodules.command = SATACommand(self.transport)
+
+ def gen_simulation(self, selfp):
+ for i in range(100):
+ yield
+ for i in range(32):
+ selfp.command.sink.stb = 1
+ selfp.command.sink.sop = (i==0)
+ selfp.command.sink.eop = (i==31)
+ selfp.command.sink.write = 1
+ selfp.command.sink.address = 0x1234
+ selfp.command.sink.length = 32
+ selfp.command.sink.data = i
+ yield
+ while selfp.command.sink.ack == 0:
+ yield
+ selfp.command.sink.ack = 0
+
+if __name__ == "__main__":
+ run_simulation(TB(), ncycles=512, vcd_name="my.vcd", keep_files=True)