from migen.bank.description import *
+
class LiteSATA(Module, AutoCSR):
def __init__(self, phy, buffer_depth=2*fis_max_dwords,
with_bist=False, with_bist_csr=False):
"HOLDA" : 0X9595AA7C
}
+
def is_primitive(dword):
for k, v in primitives.items():
if dword == v:
return True
return False
+
def decode_primitive(dword):
for k, v in primitives.items():
if dword == v:
return k
return ""
+
def phy_description(dw):
layout = [
("data", dw),
]
return EndpointDescription(layout, packetized=False)
+
def link_description(dw):
layout = [
("d", dw),
"DATA": 0x46
}
+
class FISField():
def __init__(self, dword, offset, width):
self.dword = dword
"type": FISField(0, 0, 8)
}
+
def transport_tx_description(dw):
layout = [
("type", 8),
]
return EndpointDescription(layout, packetized=True)
+
def transport_rx_description(dw):
layout = [
("type", 8),
"err" : 0
}
+
def command_tx_description(dw):
layout = [
("write", 1),
]
return EndpointDescription(layout, packetized=True)
+
def command_rx_description(dw):
layout = [
("write", 1),
]
return EndpointDescription(layout, packetized=True)
+
def command_rx_cmd_description(dw):
layout = [
("write", 1),
]
return EndpointDescription(layout, packetized=False)
+
def command_rx_data_description(dw):
layout = [
("data", dw)
# HDD
logical_sector_size = 512 # constant since all HDDs use this
+
def dwords2sectors(n):
return math.ceil(n*4/logical_sector_size)
+
def sectors2dwords(n):
return n*logical_sector_size//4
+
# Generic modules
class BufferizeEndpoints(ModuleTransformer):
def __init__(self, *names):
submodule.comb += Record.connect(source, buf.d)
setattr(self, name, buf.q)
+
class EndpointPacketStatus(Module):
def __init__(self, endpoint):
self.start = Signal()
)
self.comb += self.ongoing.eq((self.start | ongoing) & ~self.done)
+
class PacketBuffer(Module):
def __init__(self, description, data_depth, cmd_depth=4, almost_full=None):
self.sink = sink = Sink(description)
from misoclib.mem.litesata.core.transport import LiteSATATransport
from misoclib.mem.litesata.core.command import LiteSATACommand
+
class LiteSATACore(Module):
def __init__(self, phy, buffer_depth):
self.submodules.link = LiteSATALink(phy, buffer_depth)
("d2h_error", 1)
]
+
class LiteSATACommandTX(Module):
def __init__(self, transport):
self.sink = sink = Sink(command_tx_description(32))
)
]
+
class LiteSATACommandRX(Module):
def __init__(self, transport):
self.source = source = Source(command_rx_description(32))
to_tx.d2h_error.eq(d2h_error)
]
+
class LiteSATACommand(Module):
def __init__(self, transport):
self.submodules.tx = LiteSATACommandTX(transport)
("det", 32)
]
+
class LiteSATALinkTX(Module):
def __init__(self, phy):
self.sink = Sink(link_description(32))
)
)
+
class LiteSATALinkRX(Module):
def __init__(self, phy):
self.source = Source(link_description(32))
self.to_tx.det.eq(det)
]
+
class LiteSATALink(Module):
def __init__(self, phy, buffer_depth):
self.submodules.tx_buffer = PacketBuffer(link_description(32), buffer_depth)
from misoclib.mem.litesata.common import *
from misoclib.mem.litesata.core.link.scrambler import Scrambler
+
class LiteSATACONTInserter(Module):
def __init__(self, description):
self.sink = sink = Sink(description)
)
]
+
class LiteSATACONTRemover(Module):
def __init__(self, description):
self.sink = sink = Sink(description)
from collections import OrderedDict
from misoclib.mem.litesata.common import *
+
class CRCEngine(Module):
"""Cyclic Redundancy Check Engine
xors += [new[n]]
self.comb += self.next[i].eq(optree("^", xors))
+
@DecorateModule(InsertReset)
@DecorateModule(InsertCE)
class LiteSATACRC(Module):
)
self.comb += self.busy.eq(~fsm.ongoing("IDLE"))
+
class CRCChecker(Module):
"""CRC Checker
)
self.comb += self.busy.eq(~fsm.ongoing("IDLE"))
+
class LiteSATACRCInserter(CRCInserter):
def __init__(self, description):
CRCInserter.__init__(self, LiteSATACRC, description)
+
class LiteSATACRCChecker(CRCChecker):
def __init__(self, description):
CRCChecker.__init__(self, LiteSATACRC, description)
from misoclib.mem.litesata.common import *
+
@DecorateModule(InsertCE)
class Scrambler(Module):
"""SATA Scrambler
self.comb += self.value.eq(next_value)
+
@DecorateModule(InsertReset)
class LiteSATAScrambler(Module):
def __init__(self, description):
from misoclib.mem.litesata.common import *
+
def _get_item(obj, name, width):
if "_lsb" in name:
item = getattr(obj, name.replace("_lsb", ""))[:width]
item = getattr(obj, name)
return item
+
def _encode_cmd(obj, description, signal):
r = []
for k, v in sorted(description.items()):
r.append(signal[start:end].eq(item))
return r
+
def test_type(name, signal):
return signal == fis_types[name]
+
class LiteSATATransportTX(Module):
def __init__(self, link):
self.sink = sink = Sink(transport_tx_description(32))
)
]
+
def _decode_cmd(signal, description, obj):
r = []
for k, v in sorted(description.items()):
r.append(item.eq(signal[start:end]))
return r
+
class LiteSATATransportRX(Module):
def __init__(self, link):
self.source = source = Source(transport_rx_description(32))
)
self.comb += cmd_done.eq((counter.value == cmd_len) & link.source.ack)
+
class LiteSATATransport(Module):
def __init__(self, link):
self.submodules.tx = LiteSATATransportTX(link)
from misoclib.soc import cpuif
from misoclib.mem.litesata.common import *
+
def _import(default, name):
return importlib.import_module(default + "." + name)
+
def _get_args():
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description="""\
)
]
+
class Platform(kc705.Platform):
def __init__(self, *args, **kwargs):
kc705.Platform.__init__(self, *args, **kwargs)
),
]
+
class Platform(XilinxPlatform):
def __init__(self, device="xc7k325t", programmer=""):
XilinxPlatform.__init__(self, device, _io)
from misoclib.mem.litesata.phy import LiteSATAPHY
from misoclib.mem.litesata import LiteSATA
+
class _CRG(Module):
def __init__(self, platform):
self.clock_domains.cd_sys = ClockDomain()
AsyncResetSynchronizer(self.cd_sys, ~pll_locked | platform.request("cpu_reset") | self.reset),
]
+
class BISTLeds(Module):
def __init__(self, platform, sata_phy):
# 1Hz blinking leds (sata_rx and sata_tx clocks)
self.comb += platform.request("user_led", 2).eq(sata_phy.crg.ready)
self.comb += platform.request("user_led", 3).eq(sata_phy.ctrl.ready)
+
class BISTSoC(SoC, AutoCSR):
default_platform = "kc705"
csr_map = {
# Status Leds
self.submodules.leds = BISTLeds(platform, self.sata_phy)
+
class BISTSoCDevel(BISTSoC, AutoCSR):
csr_map = {
"la": 20
from misoclib.mem.litesata.phy import LiteSATAPHY
from misoclib.mem.litesata import LiteSATA
+
class LiteSATACore(Module):
default_platform = "verilog_backend"
def __init__(self, platform, clk_freq=166*1000000, nports=4):
ios = ios.union({obj})
return ios
-
default_subtarget = LiteSATACore
logical_sector_size = 512
+
class Timer:
def __init__(self):
self.value = None
self._stop = time.time()
self.value = max(self._stop - self._start, 1/1000000)
+
class LiteSATABISTUnitDriver:
def __init__(self, regs, name):
self.regs = regs
errors = -1
return (aborted, errors, speed)
+
class LiteSATABISTGeneratorDriver(LiteSATABISTUnitDriver):
def __init__(self, regs, name):
LiteSATABISTUnitDriver.__init__(self, regs, name + "_generator")
+
class LiteSATABISTCheckerDriver(LiteSATABISTUnitDriver):
def __init__(self, regs, name):
LiteSATABISTUnitDriver.__init__(self, regs, name + "_checker")
+
class LiteSATABISTIdentifyDriver:
def __init__(self, regs, name):
self.regs = regs
info += k + ": " + str(v) + "\n"
print(info, end="")
+
def _get_args():
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description="""\
#!/usr/bin/env python3
import argparse, importlib
+
def _get_args():
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--bridge", default="uart", help="Bridge to use")
from test_bist import *
from litescope.host.driver.la import LiteScopeLADriver
+
def main(wb):
la = LiteScopeLADriver(wb.regs, "la")
identify = LiteSATABISTIdentifyDriver(wb.regs, "sata_bist")
"HOLDA" : 0X9595AA7C
}
+
def decode_primitive(dword):
for k, v in primitives.items():
if dword == v:
return k
return ""
+
def link_trace(mila, tx_data_name, rx_data_name):
r = ""
dump = Dump()
from migen.genlib.roundrobin import *
+
class LiteSATAArbiter(Module):
def __init__(self, users, master):
self.rr = RoundRobin(len(users))
from migen.bank.description import *
+
class LiteSATABISTGenerator(Module):
def __init__(self, user_port):
self.start = Signal()
)
self.sync += If(sink.stb & sink.ack, self.aborted.eq(sink.failed))
+
class LiteSATABISTChecker(Module):
def __init__(self, user_port):
self.start = Signal()
)
self.sync += If(sink.stb & sink.ack, self.aborted.eq(sink.failed))
+
class LiteSATABISTUnitCSR(Module, AutoCSR):
def __init__(self, bist_unit):
self._start = CSR()
self._cycles.status.eq(cycles_counter.value)
]
+
class LiteSATABISTIdentify(Module):
def __init__(self, user_port):
self.start = Signal()
)
)
+
class LiteSATABISTIdentifyCSR(Module, AutoCSR):
def __init__(self, bist_identify):
self._start = CSR()
bist_identify.source.ack.eq(self._source_ack.r & self._source_ack.re)
]
+
class LiteSATABIST(Module, AutoCSR):
def __init__(self, crossbar, with_csr=False):
generator = LiteSATABISTGenerator(crossbar.get_port())
from misoclib.mem.litesata.common import *
+
class LiteSATAMasterPort:
def __init__(self, dw):
self.source = Source(command_tx_description(dw))
Record.connect(slave.source, self.sink)
]
+
class LiteSATASlavePort:
def __init__(self, dw):
self.sink = Sink(command_tx_description(dw))
Record.connect(master.sink, self.source)
]
+
class LiteSATAUserPort(LiteSATASlavePort):
def __init__(self, dw):
LiteSATASlavePort.__init__(self, dw)
from misoclib.mem.litesata.frontend.common import *
from misoclib.mem.litesata.frontend.arbiter import LiteSATAArbiter
+
class LiteSATACrossbar(Module):
def __init__(self, core):
self.users = []
from misoclib.mem.litesata.phy.ctrl import *
from misoclib.mem.litesata.phy.datapath import *
+
class LiteSATAPHY(Module):
def __init__(self, device, pads, revision, clk_freq):
self.pads = pads
from misoclib.mem.litesata.common import *
+
def us(t, clk_freq):
clk_period_us = 1000000/clk_freq
return math.ceil(t/clk_period_us)
+
class LiteSATAPHYCtrl(Module):
def __init__(self, trx, crg, clk_freq):
self.ready = Signal()
from misoclib.mem.litesata.common import *
+
class LiteSATAPHYDatapathRX(Module):
def __init__(self):
self.sink = sink = Sink(phy_description(16))
Record.connect(fifo.source, source)
]
+
class LiteSATAPHYDatapathTX(Module):
def __init__(self):
self.sink = sink = Sink(phy_description(32))
Record.connect(converter.source, source)
]
+
class LiteSATAPHYAlignInserter(Module):
def __init__(self, ctrl):
self.sink = sink = Sink(phy_description(32))
)
]
+
class LiteSATAPHYAlignRemover(Module):
def __init__(self):
self.sink = sink = Sink(phy_description(32))
Record.connect(sink, source)
)
+
class LiteSATAPHYDatapath(Module):
def __init__(self, trx, ctrl):
self.sink = sink = Sink(phy_description(32))
from misoclib.mem.litesata.common import *
+
class K7LiteSATAPHYCRG(Module):
def __init__(self, pads, gtx, revision, clk_freq):
self.reset = Signal()
from misoclib.mem.litesata.common import *
+
def ones(width):
return 2**width-1
+
class _PulseSynchronizer(PulseSynchronizer):
def __init__(self, i, idomain, o, odomain):
PulseSynchronizer.__init__(self, idomain, odomain)
o.eq(self.o)
]
+
class _RisingEdge(Module):
def __init__(self, i, o):
i_d = Signal()
self.sync += i_d.eq(i)
self.comb += o.eq(i & ~i_d)
+
class K7LiteSATAPHYTRX(Module):
def __init__(self, pads, revision):
# Common signals
from misoclib.mem.litesata.test.hdd import *
from misoclib.mem.litesata.test.common import *
+
class TB(Module):
def __init__(self):
self.submodules.hdd = HDD(
from misoclib.mem.litesata.test.hdd import *
from misoclib.mem.litesata.test.common import *
+
class CommandTXPacket(list):
def __init__(self, write=0, read=0, sector=0, count=0, data=[]):
self.ongoing = False
for d in data:
self.append(d)
+
class CommandStreamer(PacketStreamer):
def __init__(self):
PacketStreamer.__init__(self, command_tx_description(32), CommandTXPacket)
selfp.source.sector = self.packet.sector
selfp.source.count = self.packet.count
+
class CommandRXPacket(list):
def __init__(self):
self.ongoing = False
self.read = 0
self.failed = 0
+
class CommandLogger(PacketLogger):
def __init__(self):
PacketLogger.__init__(self, command_rx_description(32), CommandRXPacket)
if selfp.sink.stb == 1 and selfp.sink.eop == 1:
self.packet.done = True
+
class TB(Module):
def __init__(self):
self.submodules.hdd = HDD(
from misoclib.mem.litesata.common import *
+
def seed_to_data(seed, random=True):
if random:
return (seed * 0x31415979 + 1) & 0xffffffff
else:
return seed
+
def check(p1, p2):
p1 = copy.deepcopy(p1)
p2 = copy.deepcopy(p2)
errors += 1
return shift, length, errors
+
def randn(max_n):
return random.randint(0, max_n-1)
+
class PacketStreamer(Module):
def __init__(self, description, packet_class):
self.source = Source(description)
self.packet.done = 1
selfp.source.stb = 0
+
class PacketLogger(Module):
def __init__(self, description, packet_class):
self.sink = Sink(description)
if selfp.sink.stb == 1 and selfp.sink.eop == 1:
self.packet.done = True
+
class Randomizer(Module):
def __init__(self, description, level=0):
self.level = level
from misoclib.mem.litesata.test.common import *
+
class ContPacket(list):
def __init__(self, data=[]):
self.ongoing = False
for d in data:
self.append(d)
+
class ContStreamer(PacketStreamer):
def __init__(self):
PacketStreamer.__init__(self, phy_description(32), ContPacket)
except:
pass
+
class ContLogger(PacketLogger):
def __init__(self):
PacketLogger.__init__(self, phy_description(32), ContPacket)
+
class TB(Module):
def __init__(self):
self.submodules.streamer = ContStreamer()
from misoclib.mem.litesata.test.common import *
+
class TB(Module):
def __init__(self, length, random):
self.submodules.crc = LiteSATACRC()
from misoclib.mem.litesata.common import *
from misoclib.mem.litesata.test.common import *
+
def print_with_prefix(s, prefix=""):
if not isinstance(s, str):
s = s.__repr__()
for l in s:
print(prefix + l)
+
# PHY Layer model
class PHYDword:
def __init__(self, dat=0):
self.start = 1
self.done = 0
+
class PHYSource(Module):
def __init__(self):
self.source = Source(phy_description(32))
selfp.source.charisk = 0b0001
selfp.source.data = self.dword.dat
+
class PHYSink(Module):
def __init__(self):
self.sink = Sink(phy_description(32))
self.dword.done = 1
self.dword.dat = selfp.sink.data
+
class PHYLayer(Module):
def __init__(self):
return receiving + sending
+
# Link Layer model
def print_link(s):
print_with_prefix(s, "[LNK]: ")
+
def import_scrambler_datas():
with subprocess.Popen(["./scrambler"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as process:
process.stdin.write("0x10000".encode("ASCII"))
out, err = process.communicate()
return [int(e, 16) for e in out.decode("utf-8").split("\n")[:-1]]
+
class LinkPacket(list):
def __init__(self, init=[]):
self.ongoing = False
for dword in init:
self.append(dword)
+
class LinkRXPacket(LinkPacket):
def descramble(self):
for i in range(len(self)):
self.descramble()
return self.check_crc()
+
class LinkTXPacket(LinkPacket):
def insert_crc(self):
stdin = ""
self.insert_crc()
self.scramble()
+
class LinkLayer(Module):
def __init__(self, phy, debug=False, random_level=0):
self.phy = phy
self.callback(rx_dword)
self.insert_cont()
+
# Transport Layer model
def print_transport(s):
print_with_prefix(s, "[TRN]: ")
+
def get_field_data(field, packet):
return (packet[field.dword] >> field.offset) & (2**field.width-1)
+
class FIS:
def __init__(self, packet, description, direction="H2D"):
self.packet = packet
r += k + " : 0x%x" %getattr(self,k) + "\n"
return r
+
class FIS_REG_H2D(FIS):
def __init__(self, packet=[0]*fis_reg_h2d_cmd_len):
FIS.__init__(self, packet, fis_reg_h2d_layout)
r += FIS.__repr__(self)
return r
+
class FIS_REG_D2H(FIS):
def __init__(self, packet=[0]*fis_reg_d2h_cmd_len):
FIS.__init__(self, packet, fis_reg_d2h_layout)
r += FIS.__repr__(self)
return r
+
class FIS_DMA_ACTIVATE_D2H(FIS):
def __init__(self, packet=[0]*fis_dma_activate_d2h_cmd_len):
FIS.__init__(self, packet, fis_dma_activate_d2h_layout)
r += FIS.__repr__(self)
return r
+
class FIS_DATA(FIS):
def __init__(self, packet=[0], direction="H2D"):
FIS.__init__(self, packet, fis_data_layout, direction)
r += "%08x\n" %data
return r
+
class FIS_UNKNOWN(FIS):
def __init__(self, packet=[0], direction="H2D"):
FIS.__init__(self, packet, {}, direction)
r += "%08x\n" %dword
return r
+
class TransportLayer(Module):
def __init__(self, link, debug=False, loopback=False):
self.link = link
else:
self.command_callback(fis)
+
# Command Layer model
class CommandLayer(Module):
def __init__(self, transport):
for packet in resp:
self.transport.send(packet)
+
# HDD model
def print_hdd(s):
print_with_prefix(s, "[HDD]: ")
+
class HDDMemRegion:
def __init__(self, base, count, sector_size):
self.base = base
self.count = count
self.data = [0]*(count*sector_size//4)
+
class HDD(Module):
def __init__(self,
link_debug=False, link_random_level=0,
from misoclib.mem.litesata.test.common import *
from misoclib.mem.litesata.test.hdd import *
+
class LinkStreamer(PacketStreamer):
def __init__(self):
PacketStreamer.__init__(self, link_description(32), LinkTXPacket)
+
class LinkLogger(PacketLogger):
def __init__(self):
PacketLogger.__init__(self, link_description(32), LinkRXPacket)
+
class TB(Module):
def __init__(self):
self.submodules.hdd = HDD(
from misoclib.mem.litesata.test.common import *
+
class DataPacket(list):
def __init__(self, data=[]):
self.ongoing = False
for d in data:
self.append(d)
+
class DataStreamer(PacketStreamer):
def __init__(self):
PacketStreamer.__init__(self, phy_description(32), DataPacket)
except:
pass
+
class DataLogger(PacketLogger):
def __init__(self):
PacketLogger.__init__(self, phy_description(32), DataPacket)
+
class TRX(Module):
def __init__(self):
self.sink = Sink(phy_description(32))
self.source = Source(phy_description(32))
self.comb += Record.connect(self.sink, self.source)
+
class CTRL(Module):
def __init__(self):
self.sink = Sink(phy_description(32))
self.source = Source(phy_description(32))
self.ready = Signal(reset=1)
+
class TB(Module):
def __init__(self):
# use sys_clk for each clock_domain
from misoclib.mem.litesata.test.common import *
+
class TB(Module):
def __init__(self, length):
self.submodules.scrambler = InsertReset(Scrambler())