+++ /dev/null
-import subprocess
-
-from migen.fhdl.std import *
-
-from lib.sata.common import *
-from lib.sata.test.common import *
-
-# PHY Layer model
-class PHYDword:
- def __init__(self, dat=0):
- self.dat = dat
- self.start = 1
- self.done = 0
-
-class PHYSource(Module):
- def __init__(self):
- self.source = Source(phy_description(32))
- ###
- self.dword = PHYDword()
-
- def send(self, dword):
- self.dword = dword
-
- def do_simulation(self, selfp):
- selfp.source.stb = 1
- selfp.source.charisk = 0b0000
- for k, v in primitives.items():
- if v == self.dword.dat:
- selfp.source.charisk = 0b0001
- selfp.source.data = self.dword.dat
-
-class PHYSink(Module):
- def __init__(self):
- self.sink = Sink(phy_description(32))
- ###
- self.dword = PHYDword()
-
- def receive(self):
- self.dword.done = 0
- while self.dword.done == 0:
- yield
-
- def do_simulation(self, selfp):
- self.dword.done = 0
- selfp.sink.ack = 1
- if selfp.sink.stb == 1:
- self.dword.done = 1
- self.dword.dat = selfp.sink.data
-
-class PHYLayer(Module):
- def __init__(self, debug=False):
- self.debug = debug
-
- self.submodules.rx = PHYSink()
- self.submodules.tx = PHYSource()
-
- self.source = self.tx.source
- self.sink = self.rx.sink
-
- def send(self, dword):
- packet = PHYDword(dword)
- self.tx.send(packet)
-
- def receive(self):
- if self.debug:
- print(self)
- yield from self.rx.receive()
-
- def __repr__(self):
- receiving = "%08x " %self.rx.dword.dat
- receiving += decode_primitive(self.rx.dword.dat)
- receiving += " "*(16-len(receiving))
-
- sending = "%08x " %self.tx.dword.dat
- sending += decode_primitive(self.tx.dword.dat)
- sending += " "*(16-len(sending))
-
- return receiving + sending
-
-# Link Layer model
-def import_scrambler_datas():
- with subprocess.Popen(["./scrambler"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as process:
- process.stdin.write("0x10000".encode("ASCII"))
- out, err = process.communicate()
- return [int(e, 16) for e in out.decode("utf-8").split("\n")[:-1]]
-
-class LinkPacket(list):
- def __init__(self, init=[]):
- self.ongoing = False
- self.done = False
- self.scrambled_datas = import_scrambler_datas()
- for dword in init:
- self.append(dword)
-
-class LinkRXPacket(LinkPacket):
- def decode(self):
- self.descramble()
- return self.check_crc()
-
- def descramble(self):
- for i in range(len(self)):
- self[i] = self[i] ^ self.scrambled_datas[i]
-
- def check_crc(self):
- stdin = ""
- for v in self[:-1]:
- stdin += "0x%08x " %v
- stdin += "exit"
- with subprocess.Popen("./crc", stdin=subprocess.PIPE, stdout=subprocess.PIPE) as process:
- process.stdin.write(stdin.encode("ASCII"))
- out, err = process.communicate()
- crc = int(out.decode("ASCII"), 16)
- r = (self[-1] == crc)
- self.pop()
- return r
-
-class LinkTXPacket(LinkPacket):
- def encode(self):
- self.insert_crc()
- self.scramble()
-
- def scramble(self):
- for i in range(len(self)):
- self[i] = self[i] ^ self.scrambled_datas[i]
-
- def insert_crc(self):
- stdin = ""
- for v in self:
- stdin += "0x%08x " %v
- stdin += "exit"
- with subprocess.Popen("./crc", stdin=subprocess.PIPE, stdout=subprocess.PIPE) as process:
- process.stdin.write(stdin.encode("ASCII"))
- out, err = process.communicate()
- crc = int(out.decode("ASCII"), 16)
- self.append(crc)
-
-class LinkLayer(Module):
- def __init__(self, phy, debug=False, random_level=0):
- self.phy = phy
- self.debug = debug
- self.random_level = random_level
- self.tx_packets = []
- self.tx_packet = LinkTXPacket()
- self.rx_packet = LinkRXPacket()
-
- self.rx_cont = False
- self.rx_last = 0
- self.tx_cont = False
- self.tx_cont_nb = -1
- self.tx_lasts = [0, 0, 0]
-
- self.scrambled_datas = import_scrambler_datas()
-
- self.transport_callback = None
-
- self.send_state = ""
- self.send_states = ["RDY", "SOF", "DATA", "EOF", "WTRM"]
-
- def set_transport_callback(self, callback):
- self.transport_callback = callback
-
- def remove_cont(self, dword):
- if dword == primitives["HOLD"]:
- if self.rx_cont:
- self.tx_lasts = [0, 0, 0]
- if dword == primitives["CONT"]:
- self.rx_cont = True
- elif is_primitive(dword):
- self.rx_last = dword
- self.rx_cont = False
- if self.rx_cont:
- dword = self.rx_last
- return dword
-
- def callback(self, dword):
- if dword == primitives["X_RDY"]:
- self.phy.send(primitives["R_RDY"])
- elif dword == primitives["WTRM"]:
- self.phy.send(primitives["R_OK"])
- if self.rx_packet.ongoing:
- self.rx_packet.decode()
- if self.transport_callback is not None:
- self.transport_callback(self.rx_packet)
- self.rx_packet.ongoing = False
- elif dword == primitives["HOLD"]:
- self.phy.send(primitives["HOLDA"])
- elif dword == primitives["EOF"]:
- pass
- elif self.rx_packet.ongoing:
- if dword != primitives["HOLD"]:
- n = randn(100)
- if n < self.random_level:
- self.phy.send(primitives["HOLD"])
- else:
- self.phy.send(primitives["R_IP"])
- if not is_primitive(dword):
- self.rx_packet.append(dword)
- elif dword == primitives["SOF"]:
- self.rx_packet = LinkRXPacket()
- self.rx_packet.ongoing = True
-
- def send(self, dword):
- if self.send_state == "RDY":
- self.phy.send(primitives["X_RDY"])
- if dword == primitives["R_RDY"]:
- self.send_state = "SOF"
- elif self.send_state == "SOF":
- self.phy.send(primitives["SOF"])
- self.send_state = "DATA"
- elif self.send_state == "DATA":
- if dword == primitives["HOLD"]:
- self.phy.send(primitives["HOLDA"])
- else:
- self.phy.send(self.tx_packet.pop(0))
- if len(self.tx_packet) == 0:
- self.send_state = "EOF"
- elif self.send_state == "EOF":
- self.phy.send(primitives["EOF"])
- self.send_state = "WTRM"
- elif self.send_state == "WTRM":
- self.phy.send(primitives["WTRM"])
- if dword == primitives["R_OK"]:
- self.tx_packet.done = True
- elif dword == primitives["R_ERR"]:
- self.tx_packet.done = True
- self.phy.send(primitives["SYNC"])
-
- def insert_cont(self):
- self.tx_lasts.pop(0)
- self.tx_lasts.append(self.phy.tx.dword.dat)
- self.tx_cont = True
- for i in range(3):
- if not is_primitive(self.tx_lasts[i]):
- self.tx_cont = False
- if self.tx_lasts[i] != self.tx_lasts[0]:
- self.tx_cont = False
- if self.tx_cont:
- if self.tx_cont_nb == 0:
- self.phy.send(primitives["CONT"])
- else:
- self.phy.send(self.scrambled_datas[self.tx_cont_nb])
- self.tx_cont_nb += 1
- else:
- self.tx_cont_nb = 0
-
- def gen_simulation(self, selfp):
- self.tx_packet.done = True
- self.phy.send(primitives["SYNC"])
- while True:
- yield from self.phy.receive()
- self.phy.send(primitives["SYNC"])
- rx_dword = self.phy.rx.dword.dat
- rx_dword = self.remove_cont(rx_dword)
- if len(self.tx_packets) != 0:
- if self.tx_packet.done:
- self.tx_packet = self.tx_packets.pop(0)
- self.tx_packet.encode()
- self.send_state = "RDY"
- if not self.tx_packet.done:
- self.send(rx_dword)
- else:
- self.callback(rx_dword)
- self.insert_cont()
-
-# Transport Layer model
-def get_field_data(field, packet):
- return (packet[field.dword] >> field.offset) & (2**field.width-1)
-
-class FIS:
- def __init__(self, packet, description):
- self.packet = packet
- self.description = description
- self.decode()
-
- def decode(self):
- for k, v in self.description.items():
- setattr(self, k, get_field_data(v, self.packet))
-
- def encode(self):
- for k, v in self.description.items():
- self.packet[v.dword] |= (getattr(self, k) << v.offset)
-
- def __repr__(self):
- r = "--------\n"
- for k in sorted(self.description.keys()):
- r += k + " : 0x%x" %getattr(self,k) + "\n"
- return r
-
-class FIS_REG_H2D(FIS):
- def __init__(self, packet=[0]*fis_reg_h2d_cmd_len):
- FIS.__init__(self, packet, fis_reg_h2d_layout)
- self.type = fis_types["REG_H2D"]
-
- def __repr__(self):
- r = "FIS_REG_H2D\n"
- r += FIS.__repr__(self)
- return r
-
-class FIS_REG_D2H(FIS):
- def __init__(self, packet=[0]*fis_reg_d2h_cmd_len):
- FIS.__init__(self, packet, fis_reg_d2h_layout)
- self.type = fis_types["REG_D2H"]
-
- def __repr__(self):
- r = "FIS_REG_D2H\n"
- r += FIS.__repr__(self)
- return r
-
-class FIS_DMA_ACTIVATE_D2H(FIS):
- def __init__(self, packet=[0]*fis_dma_activate_d2h_cmd_len):
- FIS.__init__(self, packet, fis_dma_activate_d2h_layout)
- self.type = fis_types["DMA_ACTIVATE_D2H"]
-
- def __repr__(self):
- r = "FIS_DMA_ACTIVATE_D2H\n"
- r += FIS.__repr__(self)
- return r
-
-class FIS_DATA(FIS):
- def __init__(self, packet=[0]):
- FIS.__init__(self, packet, fis_data_layout)
- self.type = fis_types["DATA"]
-
- def __repr__(self):
- r = "FIS_DATA\n"
- r += FIS.__repr__(self)
- for data in self.packet[1:]:
- r += "%08x\n" %data
- return r
-
-class FIS_UNKNOWN(FIS):
- def __init__(self, packet=[0]):
- FIS.__init__(self, packet, {})
-
- def __repr__(self):
- r = "UNKNOWN\n"
- r += "--------\n"
- for dword in self.packet:
- r += "%08x\n" %dword
- return r
-
-class TransportLayer(Module):
- def __init__(self, link, debug=False, loopback=False):
- self.link = link
- self.debug = debug
- self.loopback = loopback
- self.link.set_transport_callback(self.callback)
-
- def set_command_callback(self, callback):
- self.command_callback = callback
-
- def send(self, fis):
- fis.encode()
- packet = LinkTXPacket(fis.packet)
- self.link.tx_packets.append(packet)
- if self.debug and not self.loopback:
- print(fis)
-
- def callback(self, packet):
- fis_type = packet[0] & 0xff
- if fis_type == fis_types["REG_H2D"]:
- fis = FIS_REG_H2D(packet)
- elif fis_type == fis_types["REG_D2H"]:
- fis = FIS_REG_D2H(packet)
- elif fis_type == fis_types["DMA_ACTIVATE_D2H"]:
- fis = FIS_DMA_ACTIVATE_D2H(packet)
- elif fis_type == fis_types["DATA"]:
- fis = FIS_DATA(packet)
- else:
- fis = FIS_UNKNOWN(packet)
- if self.debug:
- print(fis)
- if self.loopback:
- self.send(fis)
- else:
- self.command_callback(fis)
-
-# Command Layer model
-class CommandLayer(Module):
- def __init__(self, transport, debug=False):
- self.transport = transport
- self.debug = debug
- self.transport.set_command_callback(self.callback)
-
- self.hdd = None
-
- def set_hdd(self, hdd):
- self.hdd = hdd
-
- def callback(self, fis):
- # XXX manage maximum of 2048 DWORDS per DMA
- resp = None
- if isinstance(fis, FIS_REG_H2D):
- if fis.command == regs["WRITE_DMA_EXT"]:
- resp = self.hdd.write_dma_cmd(fis)
- elif fis.command == regs["READ_DMA_EXT"]:
- resp = self.hdd.read_dma_cmd(fis)
- elif fis.command == regs["IDENTIFY_DEVICE_DMA"]:
- resp = self.hdd.identify_device_dma_cmd(fis)
- elif isinstance(fis, FIS_DATA):
- resp = self.hdd.data_cmd(fis)
-
- if resp is not None:
- for packet in resp:
- self.transport.send(packet)
-
-# HDD model
-class HDDMemRegion:
- def __init__(self, base, length):
- self.base = base
- self.length = length
- self.data = [0]*(length//4)
-
-class HDD(Module):
- def __init__(self, command, debug=False):
- self.command = command
- command.set_hdd(self)
-
- self.mem = None
- self.wr_address = 0
- self.wr_length = 0
- self.wr_cnt = 0
-
- def write_dma_cmd(self, fis):
- self.wr_address = fis.lba_lsb
- self.wr_length = fis.count
- self.wr_cnt = 0
- return [FIS_DMA_ACTIVATE_D2H()]
-
- def read_dma_cmd(self, fis):
- packet = self.read_mem(fis.lba_lsb, fis.count*4)
- packet.insert(0, 0)
- return [FIS_DATA(packet), FIS_REG_D2H()]
-
- def identify_dma_cmd(self, fis):
- packet = [i for i in range(256)]
- packet.insert(0, 0)
- return [FIS_DATA(packet), FIS_REG_D2H()]
-
- def data_cmd(self, fis):
- self.write_mem(self.wr_address, fis.packet[1:])
- self.wr_cnt += len(fis.packet[1:])
- if self.wr_length == self.wr_cnt:
- return [FIS_REG_D2H()]
- else:
- return None
-
- def allocate_mem(self, base, length):
- # XXX add support for multiple memory regions
- self.mem = HDDMemRegion(base, length)
-
- def write_mem(self, adr, data):
- # XXX test if adr allocated in one memory region
- current_adr = (adr-self.mem.base)//4
- for i in range(len(data)):
- self.mem.data[current_adr+i] = data[i]
-
- def read_mem(self, adr, length=1):
- # XXX test if adr allocated in one memory region
- current_adr = (adr-self.mem.base)//4
- data = []
- for i in range(length//4):
- data.append(self.mem.data[current_adr+i])
- return data
-
-class BFM(Module):
- def __init__(self,
- phy_debug=False,
- link_debug=False, link_random_level=0,
- transport_debug=False, transport_loopback=False,
- command_debug=False,
- hdd_debug=False
- ):
- ###
- self.submodules.phy = PHYLayer(phy_debug)
- self.submodules.link = LinkLayer(self.phy, link_debug, link_random_level)
- self.submodules.transport = TransportLayer(self.link, transport_debug, transport_loopback)
- self.submodules.command = CommandLayer(self.transport, command_debug)
- self.submodules.hdd = HDD(self.command, hdd_debug)
-import random
+import random, copy
from migen.fhdl.std import *
from migen.genlib.record import *
from lib.sata.transport import SATATransport
from lib.sata.command import SATACommand
-from lib.sata.test.bfm import *
+from lib.sata.test.hdd import *
from lib.sata.test.common import *
class CommandTXPacket(list):
self.length = 0
def send(self, packet, blocking=True):
+ packet = copy.deepcopy(packet)
self.packets.append(packet)
if blocking:
while packet.done == 0:
class TB(Module):
def __init__(self):
- self.submodules.bfm = BFM(phy_debug=False,
+ self.submodules.hdd = HDD(
+ phy_debug=False,
link_random_level=0, link_debug=False,
- transport_debug=True, transport_loopback=False,
+ transport_debug=False, transport_loopback=False,
command_debug=False,
- hdd_debug=False)
- self.submodules.link = SATALink(self.bfm.phy)
+ mem_debug=True)
+ self.submodules.link = SATALink(self.hdd.phy)
self.submodules.transport = SATATransport(self.link)
self.submodules.command = SATACommand(self.transport)
]
def gen_simulation(self, selfp):
- self.bfm.hdd.allocate_mem(0x00000000, 64*1024*1024)
+ self.hdd.allocate_mem(0x00000000, 64*1024*1024)
selfp.command.source.ack = 1
for i in range(100):
yield
- streamer_packet = CommandTXPacket(write=1, address=1024, length=32, data=[i for i in range(32)])
- yield from self.streamer.send(streamer_packet)
+ write_data = [i for i in range(128)]
+ write_packet = CommandTXPacket(write=1, address=1024, length=len(write_data), data=write_data)
+ yield from self.streamer.send(write_packet)
yield from self.logger.receive()
- for d in self.logger.packet:
- print("%08x" %d)
- for i in range(32):
- yield
- streamer_packet = CommandTXPacket(read=1, address=1024, length=32)
- yield from self.streamer.send(streamer_packet)
+ read_packet = CommandTXPacket(read=1, address=1024, length=len(write_data))
+ yield from self.streamer.send(read_packet)
yield from self.logger.receive()
- for d in self.logger.packet:
- print("%08x" %d)
+ read_data = self.logger.packet
yield from self.logger.receive()
- for d in self.logger.packet:
- print("%08x" %d)
+
+ # check results
+ s, l, e = check(write_data, read_data)
+ print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
if __name__ == "__main__":
run_simulation(TB(), ncycles=512, vcd_name="my.vcd", keep_files=True)
--- /dev/null
+import subprocess
+
+from migen.fhdl.std import *
+
+from lib.sata.common import *
+from lib.sata.test.common import *
+
+# PHY Layer model
+class PHYDword:
+ def __init__(self, dat=0):
+ self.dat = dat
+ self.start = 1
+ self.done = 0
+
+class PHYSource(Module):
+ def __init__(self):
+ self.source = Source(phy_description(32))
+ ###
+ self.dword = PHYDword()
+
+ def send(self, dword):
+ self.dword = dword
+
+ def do_simulation(self, selfp):
+ selfp.source.stb = 1
+ selfp.source.charisk = 0b0000
+ for k, v in primitives.items():
+ if v == self.dword.dat:
+ selfp.source.charisk = 0b0001
+ selfp.source.data = self.dword.dat
+
+class PHYSink(Module):
+ def __init__(self):
+ self.sink = Sink(phy_description(32))
+ ###
+ self.dword = PHYDword()
+
+ def receive(self):
+ self.dword.done = 0
+ while self.dword.done == 0:
+ yield
+
+ def do_simulation(self, selfp):
+ self.dword.done = 0
+ selfp.sink.ack = 1
+ if selfp.sink.stb == 1:
+ self.dword.done = 1
+ self.dword.dat = selfp.sink.data
+
+class PHYLayer(Module):
+ def __init__(self, debug=False):
+ self.debug = debug
+
+ self.submodules.rx = PHYSink()
+ self.submodules.tx = PHYSource()
+
+ self.source = self.tx.source
+ self.sink = self.rx.sink
+
+ def send(self, dword):
+ packet = PHYDword(dword)
+ self.tx.send(packet)
+
+ def receive(self):
+ if self.debug:
+ print(self)
+ yield from self.rx.receive()
+
+ def __repr__(self):
+ receiving = "%08x " %self.rx.dword.dat
+ receiving += decode_primitive(self.rx.dword.dat)
+ receiving += " "*(16-len(receiving))
+
+ sending = "%08x " %self.tx.dword.dat
+ sending += decode_primitive(self.tx.dword.dat)
+ sending += " "*(16-len(sending))
+
+ return receiving + sending
+
+# Link Layer model
+def import_scrambler_datas():
+ with subprocess.Popen(["./scrambler"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as process:
+ process.stdin.write("0x10000".encode("ASCII"))
+ out, err = process.communicate()
+ return [int(e, 16) for e in out.decode("utf-8").split("\n")[:-1]]
+
+class LinkPacket(list):
+ def __init__(self, init=[]):
+ self.ongoing = False
+ self.done = False
+ self.scrambled_datas = import_scrambler_datas()
+ for dword in init:
+ self.append(dword)
+
+class LinkRXPacket(LinkPacket):
+ def decode(self):
+ self.descramble()
+ return self.check_crc()
+
+ def descramble(self):
+ for i in range(len(self)):
+ self[i] = self[i] ^ self.scrambled_datas[i]
+
+ def check_crc(self):
+ stdin = ""
+ for v in self[:-1]:
+ stdin += "0x%08x " %v
+ stdin += "exit"
+ with subprocess.Popen("./crc", stdin=subprocess.PIPE, stdout=subprocess.PIPE) as process:
+ process.stdin.write(stdin.encode("ASCII"))
+ out, err = process.communicate()
+ crc = int(out.decode("ASCII"), 16)
+ r = (self[-1] == crc)
+ self.pop()
+ return r
+
+class LinkTXPacket(LinkPacket):
+ def encode(self):
+ self.insert_crc()
+ self.scramble()
+
+ def scramble(self):
+ for i in range(len(self)):
+ self[i] = self[i] ^ self.scrambled_datas[i]
+
+ def insert_crc(self):
+ stdin = ""
+ for v in self:
+ stdin += "0x%08x " %v
+ stdin += "exit"
+ with subprocess.Popen("./crc", stdin=subprocess.PIPE, stdout=subprocess.PIPE) as process:
+ process.stdin.write(stdin.encode("ASCII"))
+ out, err = process.communicate()
+ crc = int(out.decode("ASCII"), 16)
+ self.append(crc)
+
+class LinkLayer(Module):
+ def __init__(self, phy, debug=False, random_level=0):
+ self.phy = phy
+ self.debug = debug
+ self.random_level = random_level
+ self.tx_packets = []
+ self.tx_packet = LinkTXPacket()
+ self.rx_packet = LinkRXPacket()
+
+ self.rx_cont = False
+ self.rx_last = 0
+ self.tx_cont = False
+ self.tx_cont_nb = -1
+ self.tx_lasts = [0, 0, 0]
+
+ self.scrambled_datas = import_scrambler_datas()
+
+ self.transport_callback = None
+
+ self.send_state = ""
+ self.send_states = ["RDY", "SOF", "DATA", "EOF", "WTRM"]
+
+ def set_transport_callback(self, callback):
+ self.transport_callback = callback
+
+ def remove_cont(self, dword):
+ if dword == primitives["HOLD"]:
+ if self.rx_cont:
+ self.tx_lasts = [0, 0, 0]
+ if dword == primitives["CONT"]:
+ self.rx_cont = True
+ elif is_primitive(dword):
+ self.rx_last = dword
+ self.rx_cont = False
+ if self.rx_cont:
+ dword = self.rx_last
+ return dword
+
+ def callback(self, dword):
+ if dword == primitives["X_RDY"]:
+ self.phy.send(primitives["R_RDY"])
+ elif dword == primitives["WTRM"]:
+ self.phy.send(primitives["R_OK"])
+ if self.rx_packet.ongoing:
+ self.rx_packet.decode()
+ if self.transport_callback is not None:
+ self.transport_callback(self.rx_packet)
+ self.rx_packet.ongoing = False
+ elif dword == primitives["HOLD"]:
+ self.phy.send(primitives["HOLDA"])
+ elif dword == primitives["EOF"]:
+ pass
+ elif self.rx_packet.ongoing:
+ if dword != primitives["HOLD"]:
+ n = randn(100)
+ if n < self.random_level:
+ self.phy.send(primitives["HOLD"])
+ else:
+ self.phy.send(primitives["R_IP"])
+ if not is_primitive(dword):
+ self.rx_packet.append(dword)
+ elif dword == primitives["SOF"]:
+ self.rx_packet = LinkRXPacket()
+ self.rx_packet.ongoing = True
+
+ def send(self, dword):
+ if self.send_state == "RDY":
+ self.phy.send(primitives["X_RDY"])
+ if dword == primitives["R_RDY"]:
+ self.send_state = "SOF"
+ elif self.send_state == "SOF":
+ self.phy.send(primitives["SOF"])
+ self.send_state = "DATA"
+ elif self.send_state == "DATA":
+ if dword == primitives["HOLD"]:
+ self.phy.send(primitives["HOLDA"])
+ else:
+ self.phy.send(self.tx_packet.pop(0))
+ if len(self.tx_packet) == 0:
+ self.send_state = "EOF"
+ elif self.send_state == "EOF":
+ self.phy.send(primitives["EOF"])
+ self.send_state = "WTRM"
+ elif self.send_state == "WTRM":
+ self.phy.send(primitives["WTRM"])
+ if dword == primitives["R_OK"]:
+ self.tx_packet.done = True
+ elif dword == primitives["R_ERR"]:
+ self.tx_packet.done = True
+ self.phy.send(primitives["SYNC"])
+
+ def insert_cont(self):
+ self.tx_lasts.pop(0)
+ self.tx_lasts.append(self.phy.tx.dword.dat)
+ self.tx_cont = True
+ for i in range(3):
+ if not is_primitive(self.tx_lasts[i]):
+ self.tx_cont = False
+ if self.tx_lasts[i] != self.tx_lasts[0]:
+ self.tx_cont = False
+ if self.tx_cont:
+ if self.tx_cont_nb == 0:
+ self.phy.send(primitives["CONT"])
+ else:
+ self.phy.send(self.scrambled_datas[self.tx_cont_nb])
+ self.tx_cont_nb += 1
+ else:
+ self.tx_cont_nb = 0
+
+ def gen_simulation(self, selfp):
+ self.tx_packet.done = True
+ self.phy.send(primitives["SYNC"])
+ while True:
+ yield from self.phy.receive()
+ self.phy.send(primitives["SYNC"])
+ rx_dword = self.phy.rx.dword.dat
+ rx_dword = self.remove_cont(rx_dword)
+ if len(self.tx_packets) != 0:
+ if self.tx_packet.done:
+ self.tx_packet = self.tx_packets.pop(0)
+ self.tx_packet.encode()
+ self.send_state = "RDY"
+ if not self.tx_packet.done:
+ self.send(rx_dword)
+ else:
+ self.callback(rx_dword)
+ self.insert_cont()
+
+# Transport Layer model
+def get_field_data(field, packet):
+ return (packet[field.dword] >> field.offset) & (2**field.width-1)
+
+class FIS:
+ def __init__(self, packet, description, direction="H2D"):
+ self.packet = packet
+ self.description = description
+ self.direction = direction
+ self.decode()
+
+ def decode(self):
+ for k, v in self.description.items():
+ setattr(self, k, get_field_data(v, self.packet))
+
+ def encode(self):
+ for k, v in self.description.items():
+ self.packet[v.dword] |= (getattr(self, k) << v.offset)
+
+ def __repr__(self):
+ if self.direction == "H2D":
+ r = ">>>>>>>>\n"
+ else:
+ r = "<<<<<<<<\n"
+ for k in sorted(self.description.keys()):
+ r += k + " : 0x%x" %getattr(self,k) + "\n"
+ return r
+
+class FIS_REG_H2D(FIS):
+ def __init__(self, packet=[0]*fis_reg_h2d_cmd_len):
+ FIS.__init__(self, packet, fis_reg_h2d_layout)
+ self.type = fis_types["REG_H2D"]
+ self.direction = "H2D"
+
+ def __repr__(self):
+ r = "FIS_REG_H2D\n"
+ r += FIS.__repr__(self)
+ return r
+
+class FIS_REG_D2H(FIS):
+ def __init__(self, packet=[0]*fis_reg_d2h_cmd_len):
+ FIS.__init__(self, packet, fis_reg_d2h_layout)
+ self.type = fis_types["REG_D2H"]
+ self.direction = "D2H"
+
+ def __repr__(self):
+ r = "FIS_REG_D2H\n"
+ r += FIS.__repr__(self)
+ return r
+
+class FIS_DMA_ACTIVATE_D2H(FIS):
+ def __init__(self, packet=[0]*fis_dma_activate_d2h_cmd_len):
+ FIS.__init__(self, packet, fis_dma_activate_d2h_layout)
+ self.type = fis_types["DMA_ACTIVATE_D2H"]
+ self.direction = "D2H"
+
+ def __repr__(self):
+ r = "FIS_DMA_ACTIVATE_D2H\n"
+ r += FIS.__repr__(self)
+ return r
+
+class FIS_DATA(FIS):
+ def __init__(self, packet=[0], direction="H2D"):
+ FIS.__init__(self, packet, fis_data_layout, direction)
+ self.type = fis_types["DATA"]
+
+ def __repr__(self):
+ r = "FIS_DATA\n"
+ r += FIS.__repr__(self)
+ for data in self.packet[1:]:
+ r += "%08x\n" %data
+ return r
+
+class FIS_UNKNOWN(FIS):
+ def __init__(self, packet=[0], direction="H2D"):
+ FIS.__init__(self, packet, {}, direction)
+
+ def __repr__(self):
+ r = "UNKNOWN\n"
+ if self.direction == "H2D":
+ r += ">>>>>>>>\\n"
+ else:
+ r += "<<<<<<<<\n"
+ for dword in self.packet:
+ r += "%08x\n" %dword
+ return r
+
+class TransportLayer(Module):
+ def __init__(self, link, debug=False, loopback=False):
+ self.link = link
+ self.debug = debug
+ self.loopback = loopback
+ self.link.set_transport_callback(self.callback)
+
+ def set_command_callback(self, callback):
+ self.command_callback = callback
+
+ def send(self, fis):
+ fis.encode()
+ packet = LinkTXPacket(fis.packet)
+ self.link.tx_packets.append(packet)
+ if self.debug and not self.loopback:
+ print(fis)
+
+ def callback(self, packet):
+ fis_type = packet[0] & 0xff
+ if fis_type == fis_types["REG_H2D"]:
+ fis = FIS_REG_H2D(packet)
+ elif fis_type == fis_types["REG_D2H"]:
+ fis = FIS_REG_D2H(packet)
+ elif fis_type == fis_types["DMA_ACTIVATE_D2H"]:
+ fis = FIS_DMA_ACTIVATE_D2H(packet)
+ elif fis_type == fis_types["DATA"]:
+ fis = FIS_DATA(packet, direction="H2D")
+ else:
+ fis = FIS_UNKNOWN(packet, direction="H2D")
+ if self.debug:
+ print(fis)
+ if self.loopback:
+ self.send(fis)
+ else:
+ self.command_callback(fis)
+
+# Command Layer model
+class CommandLayer(Module):
+ def __init__(self, transport, debug=False):
+ self.transport = transport
+ self.debug = debug
+ self.transport.set_command_callback(self.callback)
+
+ self.hdd = None
+
+ def set_hdd(self, hdd):
+ self.hdd = hdd
+
+ def callback(self, fis):
+ # XXX manage maximum of 2048 DWORDS per DMA
+ resp = None
+ if isinstance(fis, FIS_REG_H2D):
+ if fis.command == regs["WRITE_DMA_EXT"]:
+ resp = self.hdd.write_dma_cmd(fis)
+ elif fis.command == regs["READ_DMA_EXT"]:
+ resp = self.hdd.read_dma_cmd(fis)
+ elif fis.command == regs["IDENTIFY_DEVICE_DMA"]:
+ resp = self.hdd.identify_device_dma_cmd(fis)
+ elif isinstance(fis, FIS_DATA):
+ resp = self.hdd.data_cmd(fis)
+
+ if resp is not None:
+ for packet in resp:
+ self.transport.send(packet)
+
+# HDD model
+class HDDMemRegion:
+ def __init__(self, base, length):
+ self.base = base
+ self.length = length
+ self.data = [0]*(length//4)
+
+class HDD(Module):
+ def __init__(self,
+ phy_debug=False,
+ link_debug=False, link_random_level=0,
+ transport_debug=False, transport_loopback=False,
+ command_debug=False,
+ mem_debug=False
+ ):
+ ###
+ self.submodules.phy = PHYLayer(phy_debug)
+ self.submodules.link = LinkLayer(self.phy, link_debug, link_random_level)
+ self.submodules.transport = TransportLayer(self.link, transport_debug, transport_loopback)
+ self.submodules.command = CommandLayer(self.transport, command_debug)
+
+ self.command.set_hdd(self)
+
+ self.mem_debug = mem_debug
+ self.mem = None
+ self.wr_address = 0
+ self.wr_length = 0
+ self.wr_cnt = 0
+
+ def write_dma_cmd(self, fis):
+ self.wr_address = fis.lba_lsb
+ self.wr_length = fis.count
+ self.wr_cnt = 0
+ return [FIS_DMA_ACTIVATE_D2H()]
+
+ def read_dma_cmd(self, fis):
+ packet = self.read_mem(fis.lba_lsb, fis.count*4)
+ packet.insert(0, 0)
+ return [FIS_DATA(packet, direction="H2D"), FIS_REG_D2H()]
+
+ def identify_dma_cmd(self, fis):
+ packet = [i for i in range(256)]
+ packet.insert(0, 0)
+ return [FIS_DATA(packet, direction="H2D"), FIS_REG_D2H()]
+
+ def data_cmd(self, fis):
+ self.write_mem(self.wr_address, fis.packet[1:])
+ self.wr_cnt += len(fis.packet[1:])
+ if self.wr_length == self.wr_cnt:
+ return [FIS_REG_D2H()]
+ else:
+ return None
+
+ def allocate_mem(self, base, length):
+ if self.mem_debug:
+ print("[HDD] : Allocating {n} bytes at 0x{a}".format(n=length, a=base))
+ self.mem = HDDMemRegion(base, length)
+
+ def write_mem(self, adr, data):
+ if self.mem_debug:
+ print("[HDD] : Writing {n} bytes at 0x{a}".format(n=len(data)*4, a=adr))
+ current_adr = (adr-self.mem.base)//4
+ for i in range(len(data)):
+ self.mem.data[current_adr+i] = data[i]
+
+ def read_mem(self, adr, length=1):
+ if self.mem_debug:
+ print("[HDD] : Reading {n} bytes at 0x{a}".format(n=length, a=adr))
+ current_adr = (adr-self.mem.base)//4
+ data = []
+ for i in range(length//4):
+ data.append(self.mem.data[current_adr+i])
+ return data
-import random
+import random, copy
from migen.fhdl.std import *
from migen.genlib.record import *
from lib.sata.common import *
from lib.sata.link import SATALink
-from lib.sata.test.bfm import *
+from lib.sata.test.hdd import *
from lib.sata.test.common import *
class LinkStreamer(Module):
self.packet.done = 1
def send(self, packet, blocking=True):
+ packet = copy.deepcopy(packet)
self.packets.append(packet)
if blocking:
while packet.done == 0:
class TB(Module):
def __init__(self):
- self.submodules.bfm = BFM(phy_debug=False,
- link_random_level=50, transport_debug=False, transport_loopback=True)
- self.submodules.link = SATALink(self.bfm.phy)
+ self.submodules.hdd = HDD(
+ phy_debug=False,
+ link_random_level=50,
+ transport_debug=False, transport_loopback=True)
+ self.submodules.link = SATALink(self.hdd.phy)
self.submodules.streamer = LinkStreamer()
streamer_ack_randomizer = AckRandomizer(link_description(32), level=50)
]
def gen_simulation(self, selfp):
- for i in range(24):
- yield
for i in range(8):
streamer_packet = LinkTXPacket([i for i in range(64)])
- yield from self.streamer.send(LinkTXPacket([i for i in range(64)]))
+ yield from self.streamer.send(streamer_packet)
yield from self.logger.receive()
# check results