from lib.sata.link.scrambler import SATAScrambler
from lib.sata.link.cont import SATACONTInserter, SATACONTRemover
-# TODO:
-# - Do more tests
+#TODO:
+# -Test HOLD on RX path
from_rx = [
("idle", 1),
# FSM
fsm.act("IDLE",
- insert.eq(primitives["SYNC"]),
- If(scrambler.source.stb & scrambler.source.sop,
- If(self.from_rx.idle,
- NextState("RDY")
+ scrambler.reset.eq(1),
+ If(self.from_rx.idle,
+ insert.eq(primitives["SYNC"]),
+ If(scrambler.source.stb & scrambler.source.sop,
+ NextState("RDY"),
)
)
)
fsm.act("RDY",
insert.eq(primitives["X_RDY"]),
- If(self.from_rx.det == primitives["R_RDY"],
+ If(~self.from_rx.idle,
+ NextState("IDLE")
+ ).Elif(self.from_rx.det == primitives["R_RDY"],
NextState("SOF")
)
)
crc = SATACRCChecker(link_layout(32))
self.submodules += crc
+ sop = Signal()
+ self.sync += \
+ If(fsm.ongoing("RDY"),
+ sop.eq(1)
+ ).Elif(scrambler.sink.stb & scrambler.sink.ack,
+ sop.eq(0)
+ )
+
# graph
- self.comb += [
+ self.sync += \
If(fsm.ongoing("COPY") & (det == 0),
scrambler.sink.stb.eq(cont.source.stb & (cont.source.charisk == 0)),
scrambler.sink.d.eq(cont.source.data),
- ),
+ ).Else(
+ scrambler.sink.stb.eq(0)
+ )
+ self.comb += [
+ scrambler.sink.sop.eq(sop),
+ scrambler.sink.eop.eq(det == primitives["EOF"]),
cont.source.ack.eq(1),
Record.connect(scrambler.source, crc.sink),
Record.connect(crc.source, self.source)
# FSM
fsm.act("IDLE",
+ scrambler.reset.eq(1),
If(det == primitives["X_RDY"],
NextState("RDY")
)
)
)
fsm.act("COPY",
+ insert.eq(primitives["R_IP"]),
If(det == primitives["HOLD"],
insert.eq(primitives["HOLDA"])
).Elif(det == primitives["EOF"],
self.dword.dat = selfp.sink.data
class PHYLayer(Module):
- def __init__(self, debug):
+ def __init__(self, debug=False):
self.debug = debug
self.submodules.rx = PHYSink()
class LinkTXPacket(LinkPacket):
def encode(self):
- self.scramble()
self.insert_crc()
+ self.scramble()
def scramble(self):
for i in range(len(self)):
def insert_crc(self):
stdin = ""
- for v in self[:-1]:
+ for v in self:
stdin += "0x%08x " %v
stdin += "exit"
with subprocess.Popen("./crc", stdin=subprocess.PIPE, stdout=subprocess.PIPE) as process:
self.append(crc)
class LinkLayer(Module):
- def __init__(self, phy, debug, hold_random_level=0):
+ def __init__(self, phy, debug=False, random_level=0):
self.phy = phy
self.debug = debug
- self.hold_random_level = hold_random_level
+ self.random_level = random_level
+ self.tx_packets = []
self.tx_packet = LinkTXPacket()
self.rx_packet = LinkRXPacket()
self.rx_cont = False
self.transport_callback = None
+ self.send_state = ""
+ self.send_states = ["RDY", "SOF", "DATA", "EOF", "WTRM"]
+
def set_transport_callback(self, callback):
self.transport_callback = callback
if dword == primitives["X_RDY"]:
self.phy.send(primitives["R_RDY"])
-
elif dword == primitives["WTRM"]:
self.phy.send(primitives["R_OK"])
-
+ if self.rx_packet.ongoing:
+ self.rx_packet.decode()
+ if self.transport_callback is not None:
+ self.transport_callback(self.rx_packet)
+ self.rx_packet.ongoing = False
elif dword == primitives["HOLD"]:
self.phy.send(primitives["HOLDA"])
-
elif dword == primitives["EOF"]:
- self.rx_packet.decode()
- if self.transport_callback is not None:
- self.transport_callback(self.rx_packet)
- self.rx_packet.ongoing = False
-
+ pass
elif self.rx_packet.ongoing:
if dword != primitives["HOLD"]:
n = randn(100)
- if n < self.hold_random_level:
+ if n < self.random_level:
self.phy.send(primitives["HOLD"])
else:
- self.phy.send(primitives["R_RDY"])
+ self.phy.send(primitives["R_IP"])
if not is_primitive(dword):
if not self.rx_cont:
self.rx_packet.append(dword)
-
elif dword == primitives["SOF"]:
self.rx_packet = LinkRXPacket()
self.rx_packet.ongoing = True
- def send(self, packet):
- pass
+ def send(self, dword):
+ if self.send_state == "RDY":
+ self.phy.send(primitives["X_RDY"])
+ if dword == primitives["R_RDY"]:
+ self.send_state = "SOF"
+ elif self.send_state == "SOF":
+ self.phy.send(primitives["SOF"])
+ self.send_state = "DATA"
+ elif self.send_state == "DATA":
+ self.phy.send(self.tx_packet.pop(0))
+ if len(self.tx_packet) == 0:
+ self.send_state = "EOF"
+ elif self.send_state == "EOF":
+ self.phy.send(primitives["EOF"])
+ self.send_state = "WTRM"
+ elif self.send_state == "WTRM":
+ self.phy.send(primitives["WTRM"])
+ if dword == primitives["R_OK"]:
+ self.tx_packet.done = True
+ elif dword == primitives["R_ERR"]:
+ self.tx_packet.done = True
def gen_simulation(self, selfp):
- self.phy.send(primitives["SYNC"])
+ self.tx_packet.done = True
while True:
yield from self.phy.receive()
- self.callback(self.phy.rx.dword.dat)
+ self.phy.send(primitives["SYNC"])
+ rx_dword = self.phy.rx.dword.dat
+ if len(self.tx_packets) != 0:
+ if self.tx_packet.done:
+ self.tx_packet = self.tx_packets.pop(0)
+ self.tx_packet.encode()
+ self.send_state = "RDY"
+ if not self.tx_packet.done:
+ self.send(rx_dword)
+ else:
+ self.callback(self.phy.rx.dword.dat)
def get_field_data(field, packet):
return (packet[field.dword] >> field.offset) & (2**field.width-1)
return r
class TransportLayer(Module):
- def __init__(self, link):
- pass
+ def __init__(self, link, debug=False, loopback=False):
+ self.link = link
+ self.debug = debug
+ self.loopback = loopback
+ self.link.set_transport_callback(self.callback)
def callback(self, packet):
fis_type = packet[0]
fis = FIS_PIO_SETUP_D2H(packet)
else:
fis = FIS_UNKNOWN(packet)
- print(fis)
+ if self.debug:
+ print(fis)
+ if self.loopback:
+ packet = LinkTXPacket(fis.packet)
+ self.link.tx_packets.append(packet)
class BFM(Module):
- def __init__(self, dw, debug=False, hold_random_level=0):
+ def __init__(self,
+ phy_debug=False,
+ link_debug=False, link_random_level=0,
+ transport_debug=False, transport_loopback=False
+ ):
###
- self.submodules.phy = PHYLayer(debug)
- self.submodules.link = LinkLayer(self.phy, debug, hold_random_level)
- self.submodules.transport = TransportLayer(self.link)
- self.link.set_transport_callback(self.transport.callback)
+ self.submodules.phy = PHYLayer(phy_debug)
+ self.submodules.link = LinkLayer(self.phy, link_debug, link_random_level)
+ self.submodules.transport = TransportLayer(self.link, transport_debug, transport_loopback)
from lib.sata.test.common import *
class LinkStreamer(Module):
- def __init__(self, dw):
- self.source = Source(link_layout(dw))
+ def __init__(self):
+ self.source = Source(link_layout(32))
###
self.packets = []
self.packet = LinkTXPacket()
selfp.source.stb = 0
class LinkLogger(Module):
- def __init__(self, dw):
- self.sink = Sink(link_layout(dw))
+ def __init__(self):
+ self.sink = Sink(link_layout(32))
###
self.packet = LinkRXPacket()
selfp.sink.ack = 1
if selfp.sink.stb == 1 and selfp.sink.sop == 1:
self.packet = LinkRXPacket()
+ print("rx : %08x" %selfp.sink.d)
self.packet.append(selfp.sink.d)
elif selfp.sink.stb:
self.packet.append(selfp.sink.d)
class TB(Module):
def __init__(self):
- self.submodules.bfm = BFM(32, debug=True, hold_random_level=50)
+ self.submodules.bfm = BFM(phy_debug=False,
+ link_random_level=50, transport_debug=False, transport_loopback=True)
self.submodules.link_layer = SATALinkLayer(self.bfm.phy)
- self.submodules.streamer = LinkStreamer(32)
+ self.submodules.streamer = LinkStreamer()
streamer_ack_randomizer = AckRandomizer(link_layout(32), level=50)
self.submodules += streamer_ack_randomizer
- self.submodules.logger = LinkLogger(32)
+ self.submodules.logger = LinkLogger()
self.comb += [
Record.connect(self.streamer.source, streamer_ack_randomizer.sink),
Record.connect(streamer_ack_randomizer.source, self.link_layer.sink),
]
def gen_simulation(self, selfp):
- for i in range(200):
+ for i in range(24):
yield
for i in range(8):
yield from self.streamer.send(LinkTXPacket([i for i in range(16)]))
+ yield from self.logger.receive()
+ print("Logger:")
+ print("-------")
+ for v in self.logger.packet:
+ print("%08x" %v)
+
if __name__ == "__main__":
run_simulation(TB(), ncycles=512, vcd_name="my.vcd", keep_files=True)