from misoclib.mem.litesata import LiteSATA
from misoclib.mem.litesata.frontend.bist import LiteSATABISTGenerator, LiteSATABISTChecker
-from misoclib.mem.litesata.test.hdd import *
from misoclib.mem.litesata.test.common import *
+from misoclib.mem.litesata.test.model.hdd import *
class TB(Module):
from misoclib.mem.litesata.common import *
from misoclib.mem.litesata.core import LiteSATACore
-from misoclib.mem.litesata.test.hdd import *
from misoclib.mem.litesata.test.common import *
+from misoclib.mem.litesata.test.model.hdd import *
class CommandTXPacket(list):
from misoclib.mem.litesata.common import *
+def print_with_prefix(s, prefix=""):
+ if not isinstance(s, str):
+ s = s.__repr__()
+ s = s.split("\n")
+ for l in s:
+ print(prefix + l)
+
def seed_to_data(seed, random=True):
if random:
+++ /dev/null
-import subprocess
-import math
-
-from misoclib.mem.litesata.common import *
-from misoclib.mem.litesata.test.common import *
-
-
-def print_with_prefix(s, prefix=""):
- if not isinstance(s, str):
- s = s.__repr__()
- s = s.split("\n")
- for l in s:
- print(prefix + l)
-
-
-# PHY Layer model
-class PHYDword:
- def __init__(self, dat=0):
- self.dat = dat
- self.start = 1
- self.done = 0
-
-
-class PHYSource(Module):
- def __init__(self):
- self.source = Source(phy_description(32))
-
- # # #
-
- self.dword = PHYDword()
-
- def send(self, dword):
- self.dword = dword
-
- def do_simulation(self, selfp):
- selfp.source.stb = 1
- selfp.source.charisk = 0b0000
- for k, v in primitives.items():
- if v == self.dword.dat:
- selfp.source.charisk = 0b0001
- selfp.source.data = self.dword.dat
-
-
-class PHYSink(Module):
- def __init__(self):
- self.sink = Sink(phy_description(32))
-
- # # #
-
- self.dword = PHYDword()
-
- def receive(self):
- self.dword.done = 0
- while self.dword.done == 0:
- yield
-
- def do_simulation(self, selfp):
- self.dword.done = 0
- selfp.sink.ack = 1
- if selfp.sink.stb == 1:
- self.dword.done = 1
- self.dword.dat = selfp.sink.data
-
-
-class PHYLayer(Module):
- def __init__(self):
-
- self.submodules.rx = PHYSink()
- self.submodules.tx = PHYSource()
-
- self.source = self.tx.source
- self.sink = self.rx.sink
-
- def send(self, dword):
- packet = PHYDword(dword)
- self.tx.send(packet)
-
- def receive(self):
- yield from self.rx.receive()
-
- def __repr__(self):
- receiving = "{:08x} ".format(self.rx.dword.dat)
- receiving += decode_primitive(self.rx.dword.dat)
- receiving += " "*(16-len(receiving))
-
- sending = "{:08x} ".format(self.tx.dword.dat)
- sending += decode_primitive(self.tx.dword.dat)
- sending += " "*(16-len(sending))
-
- return receiving + sending
-
-
-# Link Layer model
-def print_link(s):
- print_with_prefix(s, "[LNK]: ")
-
-
-def import_scrambler_datas():
- with subprocess.Popen(["./scrambler"],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE) as process:
- process.stdin.write("0x10000".encode("ASCII"))
- out, err = process.communicate()
- return [int(e, 16) for e in out.decode("utf-8").split("\n")[:-1]]
-
-
-class LinkPacket(list):
- def __init__(self, init=[]):
- self.ongoing = False
- self.done = False
- self.scrambled_datas = import_scrambler_datas()
- for dword in init:
- self.append(dword)
-
-
-class LinkRXPacket(LinkPacket):
- def descramble(self):
- for i in range(len(self)):
- self[i] = self[i] ^ self.scrambled_datas[i]
-
- def check_crc(self):
- stdin = ""
- for v in self[:-1]:
- stdin += "0x{:08x} ".format(v)
- stdin += "exit"
- with subprocess.Popen("./crc",
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE) as process:
- process.stdin.write(stdin.encode("ASCII"))
- out, err = process.communicate()
- crc = int(out.decode("ASCII"), 16)
- r = (self[-1] == crc)
- self.pop()
- return r
-
- def decode(self):
- self.descramble()
- return self.check_crc()
-
-
-class LinkTXPacket(LinkPacket):
- def insert_crc(self):
- stdin = ""
- for v in self:
- stdin += "0x{:08x} ".foramt(v)
- stdin += "exit"
- with subprocess.Popen("./crc",
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE) as process:
- process.stdin.write(stdin.encode("ASCII"))
- out, err = process.communicate()
- crc = int(out.decode("ASCII"), 16)
- self.append(crc)
-
- def scramble(self):
- for i in range(len(self)):
- self[i] = self[i] ^ self.scrambled_datas[i]
-
- def encode(self):
- self.insert_crc()
- self.scramble()
-
-
-class LinkLayer(Module):
- def __init__(self, phy, debug=False, random_level=0):
- self.phy = phy
- self.debug = debug
- self.random_level = random_level
- self.tx_packets = []
- self.tx_packet = LinkTXPacket()
- self.rx_packet = LinkRXPacket()
-
- self.rx_cont = False
- self.rx_last = 0
- self.tx_cont = False
- self.tx_cont_nb = -1
- self.tx_lasts = [0, 0, 0]
-
- self.scrambled_datas = import_scrambler_datas()
-
- self.transport_callback = None
-
- self.send_state = ""
- self.send_states = ["RDY", "SOF", "DATA", "EOF", "WTRM"]
-
- def set_transport_callback(self, callback):
- self.transport_callback = callback
-
- def send(self, dword):
- if self.send_state == "RDY":
- self.phy.send(primitives["X_RDY"])
- if dword == primitives["R_RDY"]:
- self.send_state = "SOF"
- elif self.send_state == "SOF":
- self.phy.send(primitives["SOF"])
- self.send_state = "DATA"
- elif self.send_state == "DATA":
- if dword == primitives["HOLD"]:
- self.phy.send(primitives["HOLDA"])
- else:
- self.phy.send(self.tx_packet.pop(0))
- if len(self.tx_packet) == 0:
- self.send_state = "EOF"
- elif self.send_state == "EOF":
- self.phy.send(primitives["EOF"])
- self.send_state = "WTRM"
- elif self.send_state == "WTRM":
- self.phy.send(primitives["WTRM"])
- if dword == primitives["R_OK"]:
- self.tx_packet.done = True
- elif dword == primitives["R_ERR"]:
- self.tx_packet.done = True
- if self.tx_packet.done:
- self.phy.send(primitives["SYNC"])
-
- def insert_cont(self):
- self.tx_lasts.pop(0)
- self.tx_lasts.append(self.phy.tx.dword.dat)
- self.tx_cont = True
- for i in range(3):
- if not is_primitive(self.tx_lasts[i]):
- self.tx_cont = False
- if self.tx_lasts[i] != self.tx_lasts[0]:
- self.tx_cont = False
- if self.tx_cont:
- if self.tx_cont_nb == 0:
- self.phy.send(primitives["CONT"])
- else:
- self.phy.send(self.scrambled_datas[self.tx_cont_nb])
- self.tx_cont_nb += 1
- else:
- self.tx_cont_nb = 0
-
- def remove_cont(self, dword):
- if dword == primitives["HOLD"]:
- if self.rx_cont:
- self.tx_lasts = [0, 0, 0]
- if dword == primitives["CONT"]:
- self.rx_cont = True
- elif is_primitive(dword):
- self.rx_last = dword
- self.rx_cont = False
- if self.rx_cont:
- dword = self.rx_last
- return dword
-
- def callback(self, dword):
- if dword == primitives["X_RDY"]:
- self.phy.send(primitives["R_RDY"])
- elif dword == primitives["WTRM"]:
- self.phy.send(primitives["R_OK"])
- if self.rx_packet.ongoing:
- self.rx_packet.decode()
- if self.transport_callback is not None:
- self.transport_callback(self.rx_packet)
- self.rx_packet.ongoing = False
- elif dword == primitives["HOLD"]:
- self.phy.send(primitives["HOLDA"])
- elif dword == primitives["EOF"]:
- pass
- elif self.rx_packet.ongoing:
- if dword != primitives["HOLD"]:
- n = randn(100)
- if n < self.random_level:
- self.phy.send(primitives["HOLD"])
- else:
- self.phy.send(primitives["R_IP"])
- if not is_primitive(dword):
- self.rx_packet.append(dword)
- elif dword == primitives["SOF"]:
- self.rx_packet = LinkRXPacket()
- self.rx_packet.ongoing = True
-
- def gen_simulation(self, selfp):
- self.tx_packet.done = True
- self.phy.send(primitives["SYNC"])
- while True:
- yield from self.phy.receive()
- if self.debug:
- print_link(self.phy)
- self.phy.send(primitives["SYNC"])
- rx_dword = self.phy.rx.dword.dat
- rx_dword = self.remove_cont(rx_dword)
- if len(self.tx_packets) != 0:
- if self.tx_packet.done:
- self.tx_packet = self.tx_packets.pop(0)
- self.tx_packet.encode()
- self.send_state = "RDY"
- if not self.tx_packet.done:
- self.send(rx_dword)
- else:
- self.callback(rx_dword)
- self.insert_cont()
-
-
-# Transport Layer model
-def print_transport(s):
- print_with_prefix(s, "[TRN]: ")
-
-
-def get_field_data(field, packet):
- return (packet[field.dword] >> field.offset) & (2**field.width-1)
-
-
-class FIS:
- def __init__(self, packet, description, direction="H2D"):
- self.packet = packet
- self.description = description
- self.direction = direction
- self.decode()
-
- def decode(self):
- for k, v in self.description.items():
- setattr(self, k, get_field_data(v, self.packet))
-
- def encode(self):
- for k, v in self.description.items():
- self.packet[v.dword] |= (getattr(self, k) << v.offset)
-
- def __repr__(self):
- if self.direction == "H2D":
- r = ">>>>>>>>\n"
- else:
- r = "<<<<<<<<\n"
- for k in sorted(self.description.keys()):
- r += k + " : 0x{:x}".format(getattr(self, k)) + "\n"
- return r
-
-
-class FIS_REG_H2D(FIS):
- def __init__(self, packet=[0]*fis_reg_h2d_cmd_len):
- FIS.__init__(self, packet, fis_reg_h2d_layout)
- self.type = fis_types["REG_H2D"]
- self.direction = "H2D"
-
- def __repr__(self):
- r = "FIS_REG_H2D\n"
- r += FIS.__repr__(self)
- return r
-
-
-class FIS_REG_D2H(FIS):
- def __init__(self, packet=[0]*fis_reg_d2h_cmd_len):
- FIS.__init__(self, packet, fis_reg_d2h_layout)
- self.type = fis_types["REG_D2H"]
- self.direction = "D2H"
-
- def __repr__(self):
- r = "FIS_REG_D2H\n"
- r += FIS.__repr__(self)
- return r
-
-
-class FIS_DMA_ACTIVATE_D2H(FIS):
- def __init__(self, packet=[0]*fis_dma_activate_d2h_cmd_len):
- FIS.__init__(self, packet, fis_dma_activate_d2h_layout)
- self.type = fis_types["DMA_ACTIVATE_D2H"]
- self.direction = "D2H"
-
- def __repr__(self):
- r = "FIS_DMA_ACTIVATE_D2H\n"
- r += FIS.__repr__(self)
- return r
-
-
-class FIS_DATA(FIS):
- def __init__(self, packet=[0], direction="H2D"):
- FIS.__init__(self, packet, fis_data_layout, direction)
- self.type = fis_types["DATA"]
-
- def __repr__(self):
- r = "FIS_DATA\n"
- r += FIS.__repr__(self)
- for data in self.packet[1:]:
- r += "{:08x}\n".format(data)
- return r
-
-
-class FIS_UNKNOWN(FIS):
- def __init__(self, packet=[0], direction="H2D"):
- FIS.__init__(self, packet, {}, direction)
-
- def __repr__(self):
- r = "UNKNOWN\n"
- if self.direction == "H2D":
- r += ">>>>>>>>\n"
- else:
- r += "<<<<<<<<\n"
- for dword in self.packet:
- r += "{:08x}\n".format(dword)
- return r
-
-
-class TransportLayer(Module):
- def __init__(self, link, debug=False, loopback=False):
- self.link = link
- self.debug = debug
- self.loopback = loopback
- self.link.set_transport_callback(self.callback)
-
- def set_command_callback(self, callback):
- self.command_callback = callback
-
- def send(self, fis):
- fis.encode()
- packet = LinkTXPacket(fis.packet)
- self.link.tx_packets.append(packet)
- if self.debug and not self.loopback:
- print_transport(fis)
-
- def callback(self, packet):
- fis_type = packet[0] & 0xff
- if fis_type == fis_types["REG_H2D"]:
- fis = FIS_REG_H2D(packet)
- elif fis_type == fis_types["REG_D2H"]:
- fis = FIS_REG_D2H(packet)
- elif fis_type == fis_types["DMA_ACTIVATE_D2H"]:
- fis = FIS_DMA_ACTIVATE_D2H(packet)
- elif fis_type == fis_types["DATA"]:
- fis = FIS_DATA(packet, direction="H2D")
- else:
- fis = FIS_UNKNOWN(packet, direction="H2D")
- if self.debug:
- print_transport(fis)
- if self.loopback:
- self.send(fis)
- else:
- self.command_callback(fis)
-
-
-# Command Layer model
-class CommandLayer(Module):
- def __init__(self, transport):
- self.transport = transport
- self.transport.set_command_callback(self.callback)
-
- self.hdd = None
-
- def set_hdd(self, hdd):
- self.hdd = hdd
-
- def callback(self, fis):
- resp = None
- if isinstance(fis, FIS_REG_H2D):
- if fis.command == regs["WRITE_DMA_EXT"]:
- resp = self.hdd.write_dma_callback(fis)
- elif fis.command == regs["READ_DMA_EXT"]:
- resp = self.hdd.read_dma_callback(fis)
- elif isinstance(fis, FIS_DATA):
- resp = self.hdd.data_callback(fis)
-
- if resp is not None:
- for packet in resp:
- self.transport.send(packet)
-
-
-# HDD model
-def print_hdd(s):
- print_with_prefix(s, "[HDD]: ")
-
-
-class HDDMemRegion:
- def __init__(self, base, count, sector_size):
- self.base = base
- self.count = count
- self.data = [0]*(count*sector_size//4)
-
-
-class HDD(Module):
- def __init__(self,
- link_debug=False, link_random_level=0,
- transport_debug=False, transport_loopback=False,
- hdd_debug=False,
- ):
- self.submodules.phy = PHYLayer()
- self.submodules.link = LinkLayer(self.phy, link_debug, link_random_level)
- self.submodules.transport = TransportLayer(self.link, transport_debug, transport_loopback)
- self.submodules.command = CommandLayer(self.transport)
-
- self.command.set_hdd(self)
-
- self.debug = hdd_debug
- self.mem = None
- self.wr_sector = 0
- self.wr_end_sector = 0
- self.rd_sector = 0
- self.rx_end_sector = 0
-
- def malloc(self, sector, count):
- if self.debug:
- s = "Allocating {n} sectors: {s} to {e}".format(n=count, s=sector, e=sector+count)
- s += " ({} KB)".format(count*logical_sector_size//1024)
- print_hdd(s)
- self.mem = HDDMemRegion(sector, count, logical_sector_size)
-
- def write(self, sector, data):
- n = math.ceil(dwords2sectors(len(data)))
- if self.debug:
- if n == 1:
- s = "{}".format(sector)
- else:
- s = "{s} to {e}".format(s=sector, e=sector+n-1)
- print_hdd("Writing sector " + s)
- for i in range(len(data)):
- offset = sectors2dwords(sector)
- self.mem.data[offset+i] = data[i]
-
- def read(self, sector, count):
- if self.debug:
- if count == 1:
- s = "{}".format(sector)
- else:
- s = "{s} to {e}".format(s=sector, e=sector+count-1)
- print_hdd("Reading sector " + s)
- data = []
- for i in range(sectors2dwords(count)):
- data.append(self.mem.data[sectors2dwords(sector)+i])
- return data
-
- def write_dma_callback(self, fis):
- self.wr_sector = fis.lba_lsb + (fis.lba_msb << 32)
- self.wr_end_sector = self.wr_sector + fis.count
- return [FIS_DMA_ACTIVATE_D2H()]
-
- def read_dma_callback(self, fis):
- self.rd_sector = fis.lba_lsb + (fis.lba_msb << 32)
- self.rd_end_sector = self.rd_sector + fis.count
- packets = []
- while self.rd_sector != self.rd_end_sector:
- count = min(self.rd_end_sector-self.rd_sector, (fis_max_dwords*4)//logical_sector_size)
- packet = self.read(self.rd_sector, count)
- packet.insert(0, 0)
- packets.append(FIS_DATA(packet, direction="D2H"))
- self.rd_sector += count
- packets.append(FIS_REG_D2H())
- return packets
-
- def data_callback(self, fis):
- self.write(self.wr_sector, fis.packet[1:])
- self.wr_sector += dwords2sectors(len(fis.packet[1:]))
- if self.wr_sector == self.wr_end_sector:
- return [FIS_REG_D2H()]
- else:
- return [FIS_DMA_ACTIVATE_D2H()]
from misoclib.mem.litesata.common import *
from misoclib.mem.litesata.core.link import LiteSATALink
+from misoclib.mem.litesata.test.model.hdd import *
from misoclib.mem.litesata.test.common import *
-from misoclib.mem.litesata.test.hdd import *
class LinkStreamer(PacketStreamer):
--- /dev/null
+from misoclib.mem.litesata.common import *
+from misoclib.mem.litesata.test.common import *
+
+from misoclib.mem.litesata.test.model.transport import FIS_REG_H2D, FIS_DATA
+
+class CommandLayer(Module):
+ def __init__(self, transport):
+ self.transport = transport
+ self.transport.set_command_callback(self.callback)
+
+ self.hdd = None
+
+ def set_hdd(self, hdd):
+ self.hdd = hdd
+
+ def callback(self, fis):
+ resp = None
+ if isinstance(fis, FIS_REG_H2D):
+ if fis.command == regs["WRITE_DMA_EXT"]:
+ resp = self.hdd.write_dma_callback(fis)
+ elif fis.command == regs["READ_DMA_EXT"]:
+ resp = self.hdd.read_dma_callback(fis)
+ elif isinstance(fis, FIS_DATA):
+ resp = self.hdd.data_callback(fis)
+
+ if resp is not None:
+ for packet in resp:
+ self.transport.send(packet)
--- /dev/null
+import math
+
+from misoclib.mem.litesata.common import *
+from misoclib.mem.litesata.test.common import *
+
+from misoclib.mem.litesata.test.model.phy import *
+from misoclib.mem.litesata.test.model.link import *
+from misoclib.mem.litesata.test.model.transport import *
+from misoclib.mem.litesata.test.model.command import *
+
+def print_hdd(s):
+ print_with_prefix(s, "[HDD]: ")
+
+
+class HDDMemRegion:
+ def __init__(self, base, count, sector_size):
+ self.base = base
+ self.count = count
+ self.data = [0]*(count*sector_size//4)
+
+
+class HDD(Module):
+ def __init__(self,
+ link_debug=False, link_random_level=0,
+ transport_debug=False, transport_loopback=False,
+ hdd_debug=False,
+ ):
+ self.submodules.phy = PHYLayer()
+ self.submodules.link = LinkLayer(self.phy, link_debug, link_random_level)
+ self.submodules.transport = TransportLayer(self.link, transport_debug, transport_loopback)
+ self.submodules.command = CommandLayer(self.transport)
+
+ self.command.set_hdd(self)
+
+ self.debug = hdd_debug
+ self.mem = None
+ self.wr_sector = 0
+ self.wr_end_sector = 0
+ self.rd_sector = 0
+ self.rx_end_sector = 0
+
+ def malloc(self, sector, count):
+ if self.debug:
+ s = "Allocating {n} sectors: {s} to {e}".format(n=count, s=sector, e=sector+count-1)
+ s += " ({} KB)".format(count*logical_sector_size//1024)
+ print_hdd(s)
+ self.mem = HDDMemRegion(sector, count, logical_sector_size)
+
+ def write(self, sector, data):
+ n = math.ceil(dwords2sectors(len(data)))
+ if self.debug:
+ if n == 1:
+ s = "{}".format(sector)
+ else:
+ s = "{s} to {e}".format(s=sector, e=sector+n-1)
+ print_hdd("Writing sector " + s)
+ for i in range(len(data)):
+ offset = sectors2dwords(sector)
+ self.mem.data[offset+i] = data[i]
+
+ def read(self, sector, count):
+ if self.debug:
+ if count == 1:
+ s = "{}".format(sector)
+ else:
+ s = "{s} to {e}".format(s=sector, e=sector+count-1)
+ print_hdd("Reading sector " + s)
+ data = []
+ for i in range(sectors2dwords(count)):
+ data.append(self.mem.data[sectors2dwords(sector)+i])
+ return data
+
+ def write_dma_callback(self, fis):
+ self.wr_sector = fis.lba_lsb + (fis.lba_msb << 32)
+ self.wr_end_sector = self.wr_sector + fis.count
+ return [FIS_DMA_ACTIVATE_D2H()]
+
+ def read_dma_callback(self, fis):
+ self.rd_sector = fis.lba_lsb + (fis.lba_msb << 32)
+ self.rd_end_sector = self.rd_sector + fis.count
+ packets = []
+ while self.rd_sector != self.rd_end_sector:
+ count = min(self.rd_end_sector-self.rd_sector, (fis_max_dwords*4)//logical_sector_size)
+ packet = self.read(self.rd_sector, count)
+ packet.insert(0, 0)
+ packets.append(FIS_DATA(packet, direction="D2H"))
+ self.rd_sector += count
+ packets.append(FIS_REG_D2H())
+ return packets
+
+ def data_callback(self, fis):
+ self.write(self.wr_sector, fis.packet[1:])
+ self.wr_sector += dwords2sectors(len(fis.packet[1:]))
+ if self.wr_sector == self.wr_end_sector:
+ return [FIS_REG_D2H()]
+ else:
+ return [FIS_DMA_ACTIVATE_D2H()]
--- /dev/null
+import subprocess
+import math
+
+from misoclib.mem.litesata.common import *
+from misoclib.mem.litesata.test.common import *
+
+def print_link(s):
+ print_with_prefix(s, "[LNK]: ")
+
+
+def import_scrambler_datas():
+ with subprocess.Popen(["./scrambler"],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE) as process:
+ process.stdin.write("0x10000".encode("ASCII"))
+ out, err = process.communicate()
+ return [int(e, 16) for e in out.decode("utf-8").split("\n")[:-1]]
+
+
+class LinkPacket(list):
+ def __init__(self, init=[]):
+ self.ongoing = False
+ self.done = False
+ self.scrambled_datas = import_scrambler_datas()
+ for dword in init:
+ self.append(dword)
+
+
+class LinkRXPacket(LinkPacket):
+ def descramble(self):
+ for i in range(len(self)):
+ self[i] = self[i] ^ self.scrambled_datas[i]
+
+ def check_crc(self):
+ stdin = ""
+ for v in self[:-1]:
+ stdin += "0x{:08x} ".format(v)
+ stdin += "exit"
+ with subprocess.Popen("./crc",
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE) as process:
+ process.stdin.write(stdin.encode("ASCII"))
+ out, err = process.communicate()
+ crc = int(out.decode("ASCII"), 16)
+ r = (self[-1] == crc)
+ self.pop()
+ return r
+
+ def decode(self):
+ self.descramble()
+ return self.check_crc()
+
+
+class LinkTXPacket(LinkPacket):
+ def insert_crc(self):
+ stdin = ""
+ for v in self:
+ stdin += "0x{:08x} ".format(v)
+ stdin += "exit"
+ with subprocess.Popen("./crc",
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE) as process:
+ process.stdin.write(stdin.encode("ASCII"))
+ out, err = process.communicate()
+ crc = int(out.decode("ASCII"), 16)
+ self.append(crc)
+
+ def scramble(self):
+ for i in range(len(self)):
+ self[i] = self[i] ^ self.scrambled_datas[i]
+
+ def encode(self):
+ self.insert_crc()
+ self.scramble()
+
+
+class LinkLayer(Module):
+ def __init__(self, phy, debug=False, random_level=0):
+ self.phy = phy
+ self.debug = debug
+ self.random_level = random_level
+ self.tx_packets = []
+ self.tx_packet = LinkTXPacket()
+ self.rx_packet = LinkRXPacket()
+
+ self.rx_cont = False
+ self.rx_last = 0
+ self.tx_cont = False
+ self.tx_cont_nb = -1
+ self.tx_lasts = [0, 0, 0]
+
+ self.scrambled_datas = import_scrambler_datas()
+
+ self.transport_callback = None
+
+ self.send_state = ""
+ self.send_states = ["RDY", "SOF", "DATA", "EOF", "WTRM"]
+
+ def set_transport_callback(self, callback):
+ self.transport_callback = callback
+
+ def send(self, dword):
+ if self.send_state == "RDY":
+ self.phy.send(primitives["X_RDY"])
+ if dword == primitives["R_RDY"]:
+ self.send_state = "SOF"
+ elif self.send_state == "SOF":
+ self.phy.send(primitives["SOF"])
+ self.send_state = "DATA"
+ elif self.send_state == "DATA":
+ if dword == primitives["HOLD"]:
+ self.phy.send(primitives["HOLDA"])
+ else:
+ self.phy.send(self.tx_packet.pop(0))
+ if len(self.tx_packet) == 0:
+ self.send_state = "EOF"
+ elif self.send_state == "EOF":
+ self.phy.send(primitives["EOF"])
+ self.send_state = "WTRM"
+ elif self.send_state == "WTRM":
+ self.phy.send(primitives["WTRM"])
+ if dword == primitives["R_OK"]:
+ self.tx_packet.done = True
+ elif dword == primitives["R_ERR"]:
+ self.tx_packet.done = True
+ if self.tx_packet.done:
+ self.phy.send(primitives["SYNC"])
+
+ def insert_cont(self):
+ self.tx_lasts.pop(0)
+ self.tx_lasts.append(self.phy.tx.dword.dat)
+ self.tx_cont = True
+ for i in range(3):
+ if not is_primitive(self.tx_lasts[i]):
+ self.tx_cont = False
+ if self.tx_lasts[i] != self.tx_lasts[0]:
+ self.tx_cont = False
+ if self.tx_cont:
+ if self.tx_cont_nb == 0:
+ self.phy.send(primitives["CONT"])
+ else:
+ self.phy.send(self.scrambled_datas[self.tx_cont_nb])
+ self.tx_cont_nb += 1
+ else:
+ self.tx_cont_nb = 0
+
+ def remove_cont(self, dword):
+ if dword == primitives["HOLD"]:
+ if self.rx_cont:
+ self.tx_lasts = [0, 0, 0]
+ if dword == primitives["CONT"]:
+ self.rx_cont = True
+ elif is_primitive(dword):
+ self.rx_last = dword
+ self.rx_cont = False
+ if self.rx_cont:
+ dword = self.rx_last
+ return dword
+
+ def callback(self, dword):
+ if dword == primitives["X_RDY"]:
+ self.phy.send(primitives["R_RDY"])
+ elif dword == primitives["WTRM"]:
+ self.phy.send(primitives["R_OK"])
+ if self.rx_packet.ongoing:
+ self.rx_packet.decode()
+ if self.transport_callback is not None:
+ self.transport_callback(self.rx_packet)
+ self.rx_packet.ongoing = False
+ elif dword == primitives["HOLD"]:
+ self.phy.send(primitives["HOLDA"])
+ elif dword == primitives["EOF"]:
+ pass
+ elif self.rx_packet.ongoing:
+ if dword != primitives["HOLD"]:
+ n = randn(100)
+ if n < self.random_level:
+ self.phy.send(primitives["HOLD"])
+ else:
+ self.phy.send(primitives["R_IP"])
+ if not is_primitive(dword):
+ self.rx_packet.append(dword)
+ elif dword == primitives["SOF"]:
+ self.rx_packet = LinkRXPacket()
+ self.rx_packet.ongoing = True
+
+ def gen_simulation(self, selfp):
+ self.tx_packet.done = True
+ self.phy.send(primitives["SYNC"])
+ while True:
+ yield from self.phy.receive()
+ if self.debug:
+ print_link(self.phy)
+ self.phy.send(primitives["SYNC"])
+ rx_dword = self.phy.rx.dword.dat
+ rx_dword = self.remove_cont(rx_dword)
+ if len(self.tx_packets) != 0:
+ if self.tx_packet.done:
+ self.tx_packet = self.tx_packets.pop(0)
+ self.tx_packet.encode()
+ self.send_state = "RDY"
+ if not self.tx_packet.done:
+ self.send(rx_dword)
+ else:
+ self.callback(rx_dword)
+ self.insert_cont()
+
+
+# Transport Layer model
+def print_transport(s):
+ print_with_prefix(s, "[TRN]: ")
+
+
+def get_field_data(field, packet):
+ return (packet[field.byte//4] >> field.offset) & (2**field.width-1)
+
+
+class FIS:
+ def __init__(self, packet, description, direction="H2D"):
+ self.packet = packet
+ self.description = description
+ self.direction = direction
+ self.decode()
+
+ def decode(self):
+ for k, v in self.description.items():
+ setattr(self, k, get_field_data(v, self.packet))
+
+ def encode(self):
+ for k, v in self.description.items():
+ self.packet[v.byte//4] |= (getattr(self, k) << v.offset)
+
+ def __repr__(self):
+ if self.direction == "H2D":
+ r = ">>>>>>>>\n"
+ else:
+ r = "<<<<<<<<\n"
+ for k in sorted(self.description.keys()):
+ r += k + " : 0x{:x}".format(getattr(self, k)) + "\n"
+ return r
+
+
+class FIS_REG_H2D(FIS):
+ def __init__(self, packet=[0]*fis_reg_h2d_header.length):
+ FIS.__init__(self, packet, fis_reg_h2d_header.fields)
+ self.type = fis_types["REG_H2D"]
+ self.direction = "H2D"
+
+ def __repr__(self):
+ r = "FIS_REG_H2D\n"
+ r += FIS.__repr__(self)
+ return r
+
+
+class FIS_REG_D2H(FIS):
+ def __init__(self, packet=[0]*fis_reg_d2h_header.length):
+ FIS.__init__(self, packet, fis_reg_d2h_header.fields)
+ self.type = fis_types["REG_D2H"]
+ self.direction = "D2H"
+
+ def __repr__(self):
+ r = "FIS_REG_D2H\n"
+ r += FIS.__repr__(self)
+ return r
+
+
+class FIS_DMA_ACTIVATE_D2H(FIS):
+ def __init__(self, packet=[0]*fis_dma_activate_d2h_header.length):
+ FIS.__init__(self, packet, fis_dma_activate_d2h_header.fields)
+ self.type = fis_types["DMA_ACTIVATE_D2H"]
+ self.direction = "D2H"
+
+ def __repr__(self):
+ r = "FIS_DMA_ACTIVATE_D2H\n"
+ r += FIS.__repr__(self)
+ return r
+
+
+class FIS_DATA(FIS):
+ def __init__(self, packet=[0], direction="H2D"):
+ FIS.__init__(self, packet, fis_data_header.fields, direction)
+ self.type = fis_types["DATA"]
+
+ def __repr__(self):
+ r = "FIS_DATA\n"
+ r += FIS.__repr__(self)
+ for data in self.packet[1:]:
+ r += "{:08x}\n".format(data)
+ return r
+
+
+class FIS_UNKNOWN(FIS):
+ def __init__(self, packet=[0], direction="H2D"):
+ FIS.__init__(self, packet, {}, direction)
+
+ def __repr__(self):
+ r = "UNKNOWN\n"
+ if self.direction == "H2D":
+ r += ">>>>>>>>\n"
+ else:
+ r += "<<<<<<<<\n"
+ for dword in self.packet:
+ r += "{:08x}\n".format(dword)
+ return r
+
+
+class TransportLayer(Module):
+ def __init__(self, link, debug=False, loopback=False):
+ self.link = link
+ self.debug = debug
+ self.loopback = loopback
+ self.link.set_transport_callback(self.callback)
+
+ def set_command_callback(self, callback):
+ self.command_callback = callback
+
+ def send(self, fis):
+ fis.encode()
+ packet = LinkTXPacket(fis.packet)
+ self.link.tx_packets.append(packet)
+ if self.debug and not self.loopback:
+ print_transport(fis)
+
+ def callback(self, packet):
+ fis_type = packet[0] & 0xff
+ if fis_type == fis_types["REG_H2D"]:
+ fis = FIS_REG_H2D(packet)
+ elif fis_type == fis_types["REG_D2H"]:
+ fis = FIS_REG_D2H(packet)
+ elif fis_type == fis_types["DMA_ACTIVATE_D2H"]:
+ fis = FIS_DMA_ACTIVATE_D2H(packet)
+ elif fis_type == fis_types["DATA"]:
+ fis = FIS_DATA(packet, direction="H2D")
+ else:
+ fis = FIS_UNKNOWN(packet, direction="H2D")
+ if self.debug:
+ print_transport(fis)
+ if self.loopback:
+ self.send(fis)
+ else:
+ self.command_callback(fis)
+
+
+# Command Layer model
+class CommandLayer(Module):
+ def __init__(self, transport):
+ self.transport = transport
+ self.transport.set_command_callback(self.callback)
+
+ self.hdd = None
+
+ def set_hdd(self, hdd):
+ self.hdd = hdd
+
+ def callback(self, fis):
+ resp = None
+ if isinstance(fis, FIS_REG_H2D):
+ if fis.command == regs["WRITE_DMA_EXT"]:
+ resp = self.hdd.write_dma_callback(fis)
+ elif fis.command == regs["READ_DMA_EXT"]:
+ resp = self.hdd.read_dma_callback(fis)
+ elif isinstance(fis, FIS_DATA):
+ resp = self.hdd.data_callback(fis)
+
+ if resp is not None:
+ for packet in resp:
+ self.transport.send(packet)
+
+
+# HDD model
+def print_hdd(s):
+ print_with_prefix(s, "[HDD]: ")
+
+
+class HDDMemRegion:
+ def __init__(self, base, count, sector_size):
+ self.base = base
+ self.count = count
+ self.data = [0]*(count*sector_size//4)
+
+
+class HDD(Module):
+ def __init__(self,
+ link_debug=False, link_random_level=0,
+ transport_debug=False, transport_loopback=False,
+ hdd_debug=False,
+ ):
+ self.submodules.phy = PHYLayer()
+ self.submodules.link = LinkLayer(self.phy, link_debug, link_random_level)
+ self.submodules.transport = TransportLayer(self.link, transport_debug, transport_loopback)
+ self.submodules.command = CommandLayer(self.transport)
+
+ self.command.set_hdd(self)
+
+ self.debug = hdd_debug
+ self.mem = None
+ self.wr_sector = 0
+ self.wr_end_sector = 0
+ self.rd_sector = 0
+ self.rx_end_sector = 0
+
+ def malloc(self, sector, count):
+ if self.debug:
+ s = "Allocating {n} sectors: {s} to {e}".format(n=count, s=sector, e=sector+count-1)
+ s += " ({} KB)".format(count*logical_sector_size//1024)
+ print_hdd(s)
+ self.mem = HDDMemRegion(sector, count, logical_sector_size)
+
+ def write(self, sector, data):
+ n = math.ceil(dwords2sectors(len(data)))
+ if self.debug:
+ if n == 1:
+ s = "{}".format(sector)
+ else:
+ s = "{s} to {e}".format(s=sector, e=sector+n-1)
+ print_hdd("Writing sector " + s)
+ for i in range(len(data)):
+ offset = sectors2dwords(sector)
+ self.mem.data[offset+i] = data[i]
+
+ def read(self, sector, count):
+ if self.debug:
+ if count == 1:
+ s = "{}".format(sector)
+ else:
+ s = "{s} to {e}".format(s=sector, e=sector+count-1)
+ print_hdd("Reading sector " + s)
+ data = []
+ for i in range(sectors2dwords(count)):
+ data.append(self.mem.data[sectors2dwords(sector)+i])
+ return data
+
+ def write_dma_callback(self, fis):
+ self.wr_sector = fis.lba_lsb + (fis.lba_msb << 32)
+ self.wr_end_sector = self.wr_sector + fis.count
+ return [FIS_DMA_ACTIVATE_D2H()]
+
+ def read_dma_callback(self, fis):
+ self.rd_sector = fis.lba_lsb + (fis.lba_msb << 32)
+ self.rd_end_sector = self.rd_sector + fis.count
+ packets = []
+ while self.rd_sector != self.rd_end_sector:
+ count = min(self.rd_end_sector-self.rd_sector, (fis_max_dwords*4)//logical_sector_size)
+ packet = self.read(self.rd_sector, count)
+ packet.insert(0, 0)
+ packets.append(FIS_DATA(packet, direction="D2H"))
+ self.rd_sector += count
+ packets.append(FIS_REG_D2H())
+ return packets
+
+ def data_callback(self, fis):
+ self.write(self.wr_sector, fis.packet[1:])
+ self.wr_sector += dwords2sectors(len(fis.packet[1:]))
+ if self.wr_sector == self.wr_end_sector:
+ return [FIS_REG_D2H()]
+ else:
+ return [FIS_DMA_ACTIVATE_D2H()]
--- /dev/null
+from misoclib.mem.litesata.common import *
+from misoclib.mem.litesata.test.common import *
+
+class PHYDword:
+ def __init__(self, dat=0):
+ self.dat = dat
+ self.start = 1
+ self.done = 0
+
+
+class PHYSource(Module):
+ def __init__(self):
+ self.source = Source(phy_description(32))
+
+ # # #
+
+ self.dword = PHYDword()
+
+ def send(self, dword):
+ self.dword = dword
+
+ def do_simulation(self, selfp):
+ selfp.source.stb = 1
+ selfp.source.charisk = 0b0000
+ for k, v in primitives.items():
+ if v == self.dword.dat:
+ selfp.source.charisk = 0b0001
+ selfp.source.data = self.dword.dat
+
+
+class PHYSink(Module):
+ def __init__(self):
+ self.sink = Sink(phy_description(32))
+
+ # # #
+
+ self.dword = PHYDword()
+
+ def receive(self):
+ self.dword.done = 0
+ while self.dword.done == 0:
+ yield
+
+ def do_simulation(self, selfp):
+ self.dword.done = 0
+ selfp.sink.ack = 1
+ if selfp.sink.stb == 1:
+ self.dword.done = 1
+ self.dword.dat = selfp.sink.data
+
+
+class PHYLayer(Module):
+ def __init__(self):
+
+ self.submodules.rx = PHYSink()
+ self.submodules.tx = PHYSource()
+
+ self.source = self.tx.source
+ self.sink = self.rx.sink
+
+ def send(self, dword):
+ packet = PHYDword(dword)
+ self.tx.send(packet)
+
+ def receive(self):
+ yield from self.rx.receive()
+
+ def __repr__(self):
+ receiving = "{:08x} ".format(self.rx.dword.dat)
+ receiving += decode_primitive(self.rx.dword.dat)
+ receiving += " "*(16-len(receiving))
+
+ sending = "{:08x} ".format(self.tx.dword.dat)
+ sending += decode_primitive(self.tx.dword.dat)
+ sending += " "*(16-len(sending))
+
+ return receiving + sending
--- /dev/null
+from misoclib.mem.litesata.common import *
+from misoclib.mem.litesata.test.common import *
+
+from misoclib.mem.litesata.test.model.link import LinkTXPacket
+
+def print_transport(s):
+ print_with_prefix(s, "[TRN]: ")
+
+
+def get_field_data(field, packet):
+ return (packet[field.byte//4] >> field.offset) & (2**field.width-1)
+
+
+class FIS:
+ def __init__(self, packet, description, direction="H2D"):
+ self.packet = packet
+ self.description = description
+ self.direction = direction
+ self.decode()
+
+ def decode(self):
+ for k, v in self.description.items():
+ setattr(self, k, get_field_data(v, self.packet))
+
+ def encode(self):
+ for k, v in self.description.items():
+ self.packet[v.byte//4] |= (getattr(self, k) << v.offset)
+
+ def __repr__(self):
+ if self.direction == "H2D":
+ r = ">>>>>>>>\n"
+ else:
+ r = "<<<<<<<<\n"
+ for k in sorted(self.description.keys()):
+ r += k + " : 0x{:x}".format(getattr(self, k)) + "\n"
+ return r
+
+
+class FIS_REG_H2D(FIS):
+ def __init__(self, packet=[0]*fis_reg_h2d_header.length):
+ FIS.__init__(self, packet, fis_reg_h2d_header.fields)
+ self.type = fis_types["REG_H2D"]
+ self.direction = "H2D"
+
+ def __repr__(self):
+ r = "FIS_REG_H2D\n"
+ r += FIS.__repr__(self)
+ return r
+
+
+class FIS_REG_D2H(FIS):
+ def __init__(self, packet=[0]*fis_reg_d2h_header.length):
+ FIS.__init__(self, packet, fis_reg_d2h_header.fields)
+ self.type = fis_types["REG_D2H"]
+ self.direction = "D2H"
+
+ def __repr__(self):
+ r = "FIS_REG_D2H\n"
+ r += FIS.__repr__(self)
+ return r
+
+
+class FIS_DMA_ACTIVATE_D2H(FIS):
+ def __init__(self, packet=[0]*fis_dma_activate_d2h_header.length):
+ FIS.__init__(self, packet, fis_dma_activate_d2h_header.fields)
+ self.type = fis_types["DMA_ACTIVATE_D2H"]
+ self.direction = "D2H"
+
+ def __repr__(self):
+ r = "FIS_DMA_ACTIVATE_D2H\n"
+ r += FIS.__repr__(self)
+ return r
+
+
+class FIS_DATA(FIS):
+ def __init__(self, packet=[0], direction="H2D"):
+ FIS.__init__(self, packet, fis_data_header.fields, direction)
+ self.type = fis_types["DATA"]
+
+ def __repr__(self):
+ r = "FIS_DATA\n"
+ r += FIS.__repr__(self)
+ for data in self.packet[1:]:
+ r += "{:08x}\n".format(data)
+ return r
+
+
+class FIS_UNKNOWN(FIS):
+ def __init__(self, packet=[0], direction="H2D"):
+ FIS.__init__(self, packet, {}, direction)
+
+ def __repr__(self):
+ r = "UNKNOWN\n"
+ if self.direction == "H2D":
+ r += ">>>>>>>>\n"
+ else:
+ r += "<<<<<<<<\n"
+ for dword in self.packet:
+ r += "{:08x}\n".format(dword)
+ return r
+
+
+class TransportLayer(Module):
+ def __init__(self, link, debug=False, loopback=False):
+ self.link = link
+ self.debug = debug
+ self.loopback = loopback
+ self.link.set_transport_callback(self.callback)
+
+ def set_command_callback(self, callback):
+ self.command_callback = callback
+
+ def send(self, fis):
+ fis.encode()
+ packet = LinkTXPacket(fis.packet)
+ self.link.tx_packets.append(packet)
+ if self.debug and not self.loopback:
+ print_transport(fis)
+
+ def callback(self, packet):
+ fis_type = packet[0] & 0xff
+ if fis_type == fis_types["REG_H2D"]:
+ fis = FIS_REG_H2D(packet)
+ elif fis_type == fis_types["REG_D2H"]:
+ fis = FIS_REG_D2H(packet)
+ elif fis_type == fis_types["DMA_ACTIVATE_D2H"]:
+ fis = FIS_DMA_ACTIVATE_D2H(packet)
+ elif fis_type == fis_types["DATA"]:
+ fis = FIS_DATA(packet, direction="H2D")
+ else:
+ fis = FIS_UNKNOWN(packet, direction="H2D")
+ if self.debug:
+ print_transport(fis)
+ if self.loopback:
+ self.send(fis)
+ else:
+ self.command_callback(fis)