from lib.sata.link.scrambler import SATAScrambler
# Todo:
-# - TX: (optional) insert COND and scramble between COND and primitives
-# - RX: manage COND, HOLD from device
+# - TX: insert COND and scramble between COND and primitives
+# - RX: manage COND
class SATALinkLayer(Module):
def __init__(self, phy):
# graph
self.comb += [
- If(fsm.ongoing("H2D_COPY") & (rx_det == 0),
+ If(fsm.ongoing("D2H_COPY") & (rx_det == 0),
descrambler.sink.stb.eq(phy.source.stb & (phy.source.charisk == 0)),
descrambler.sink.d.eq(phy.source.data),
),
)
)
fsm.act("H2D_COPY",
- If(scrambler.source.stb & scrambler.source.eop & scrambler.source.ack,
+ If(rx_det == primitives["HOLD"],
+ tx_insert.eq(primitives["HOLDA"]),
+ ).Elif(~scrambler.source.stb,
+ tx_insert.eq(primitives["HOLD"]),
+ ).Elif(scrambler.source.stb & scrambler.source.eop & scrambler.source.ack,
NextState("H2D_EOF")
)
)
)
)
fsm.act("D2H_COPY",
- If(rx_det == primitives["EOF"],
+ If(rx_det == primitives["HOLD"],
+ tx_insert.eq(primitives["HOLDA"])
+ ).Elif(rx_det == primitives["EOF"],
NextState("D2H_WTRM")
)
)
from migen.fhdl.std import *
from lib.sata.std import *
+from lib.sata.link.test.common import *
class BFMDword():
def __init__(self, dat=0):
def do_simulation(self, selfp):
if len(self.dwords) and self.dword.done:
self.dword = self.dwords.pop(0)
- if not self.dword.done:
- selfp.source.stb = 1
- selfp.source.charisk = 0b0000
- for k, v in primitives.items():
- if v == self.dword.dat:
- selfp.source.charisk = 0b0001
- selfp.source.data = self.dword.dat
+ selfp.source.stb = 1
+ selfp.source.charisk = 0b0000
+ for k, v in primitives.items():
+ if v == self.dword.dat:
+ selfp.source.charisk = 0b0001
+ selfp.source.data = self.dword.dat
if selfp.source.stb == 1 and selfp.source.ack == 1:
self.dword.done = 1
- selfp.source.stb = 0
class BFMSink(Module):
def __init__(self, dw):
self.rx_dword = self.bfm_sink.dword.dat
class BFM(Module):
- def __init__(self, dw, debug=False):
+ def __init__(self, dw, debug=False, level=0):
self.debug = debug
+ self.level = level
###
def packet_callback(self, packet):
packet = self.descramble(packet)
packet = self.check_crc(packet)
+ print("----")
for v in packet:
print("%08x" %v)
+ print("----")
def dword_callback(self, dword):
- print("%08x " %dword, end="")
+ rx = "%08x " %dword
for k, v in primitives.items():
if dword == v:
- print(k, end="")
+ rx += k
+ rx += " "*(16-len(rx))
+ print(rx, end="")
+
+ tx = "%08x " %self.phy.bfm_source.dword.dat
+ for k, v in primitives.items():
+ if self.phy.bfm_source.dword.dat == v:
+ tx += k
+ tx += " "*(16-len(tx))
+ print(tx, end="")
+
print("")
+
# X_RDY / WTRM response
if dword == primitives["X_RDY"]:
self.phy.bfm_source.dwords.append(BFMDword(primitives["R_RDY"]))
- if dword == primitives["WTRM"]:
+
+ elif dword == primitives["WTRM"]:
self.phy.bfm_source.dwords.append(BFMDword(primitives["R_OK"]))
+ # HOLD response
+ elif dword == primitives["HOLD"]:
+ self.phy.bfm_source.dwords.append(BFMDword(primitives["HOLDA"]))
+
# packet capture
- if dword == primitives["EOF"]:
+ elif dword == primitives["EOF"]:
self.rx_packet_ongoing = False
self.packet_callback(self.rx_packet)
- if self.rx_packet_ongoing:
- self.rx_packet.append(dword)
-
- if dword == primitives["SOF"]:
+ elif self.rx_packet_ongoing:
+ if dword != primitives["HOLD"]:
+ n = randn(100)
+ if n < self.level:
+ self.phy.bfm_source.dwords.append(BFMDword(primitives["HOLD"]))
+ else:
+ self.phy.bfm_source.dwords.append(BFMDword(primitives["R_RDY"]))
+ if dword != primitives["HOLDA"]:
+ self.rx_packet.append(dword)
+
+ elif dword == primitives["SOF"]:
self.rx_packet_ongoing = True
self.rx_packet = []
+import random
+
+from lib.sata.std import *
+
def seed_to_data(seed, random=True):
if random:
return (seed * 0x31415979 + 1) & 0xffffffff
if ref.pop(0) != res.pop(0):
errors += 1
return shift, length, errors
+
+def randn(max_n):
+ return random.randint(0, max_n-1)
+
+class AckRandomizer(Module):
+ def __init__(self, description, level=0):
+ self.level = level
+
+ self.sink = Sink(description)
+ self.source = Source(description)
+
+ self.run = Signal()
+
+ self.comb += \
+ If(self.run,
+ Record.connect(self.sink, self.source)
+ ).Else(
+ self.source.stb.eq(0),
+ self.sink.ack.eq(0),
+ )
+
+ def do_simulation(self, selfp):
+ n = randn(100)
+ if n < self.level:
+ selfp.run = 0
+ else:
+ selfp.run = 1
+import random
+
from migen.fhdl.std import *
from migen.genlib.record import *
from migen.sim.generic import run_simulation
from lib.sata.link import SATALinkLayer
from lib.sata.link.test.bfm import *
+from lib.sata.link.test.common import *
class LinkPacket():
def __init__(self, d=[]):
class TB(Module):
def __init__(self):
- self.submodules.bfm = BFM(32, debug=True)
+ self.submodules.bfm = BFM(32, debug=True, level=50)
self.submodules.link_layer = SATALinkLayer(self.bfm.phy)
self.submodules.streamer = LinkStreamer(32)
+ streamer_ack_randomizer = AckRandomizer(link_layout(32), level=50)
+ self.submodules += streamer_ack_randomizer
self.submodules.logger = LinkLogger(32)
self.comb += [
- Record.connect(self.streamer.source, self.link_layer.sink),
+ Record.connect(self.streamer.source, streamer_ack_randomizer.sink),
+ Record.connect(streamer_ack_randomizer.source, self.link_layer.sink),
Record.connect(self.link_layer.source, self.logger.sink)
]
for i in range(200):
yield
for i in range(8):
- yield from self.streamer.send(LinkPacket([0, 1, 2, 3]))
+ yield from self.streamer.send(LinkPacket([i for i in range(16)]))
if __name__ == "__main__":
run_simulation(TB(), ncycles=512, vcd_name="my.vcd", keep_files=True)
"SOF" : 0x3737B57C,
"EOF" : 0xD5D5B57C,
"HOLD" : 0xD5D5AA7C,
- "HOLD" : 0X9595AA7C
+ "HOLDA" : 0X9595AA7C
}
def ones(width):