from lib.sata.common import *
from lib.sata.test.common import *
+# PHY Layer model
class PHYDword:
def __init__(self, dat=0):
self.dat = dat
return receiving + sending
+# Link Layer model
def import_scrambler_datas():
with subprocess.Popen(["./scrambler"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as process:
process.stdin.write("0x10000".encode("ASCII"))
self.callback(rx_dword)
self.insert_cont()
+# Transport Layer model
def get_field_data(field, packet):
return (packet[field.dword] >> field.offset) & (2**field.width-1)
else:
self.command_callback(fis)
+# Command Layer model
class CommandLayer(Module):
def __init__(self, transport, debug=False):
self.transport = transport
self.debug = debug
self.transport.set_command_callback(self.callback)
- self.dma_enable = 0
- self.dma_address = 0
+ self.hdd = None
- def allocate_dma(self, base, length):
- self.dma_base = base
- self.dma_buffer = [0]*(length//4)
+ def set_hdd(self, hdd):
+ self.hdd = hdd
- def enable_dma(self):
- self.dma_enable = 1
+ def callback(self, fis):
+ # XXX manage maximum of 2048 DWORDS per DMA
+ if isinstance(fis, FIS_REG_H2D):
+ if fis.command == regs["WRITE_DMA_EXT"]:
+ self.transport.send(self.hdd.write_dma_cmd(fis))
+ elif fis.command == regs["READ_DMA_EXT"]:
+ self.transport.send(self.hdd.read_dma_cmd(fis))
+ elif fis.command == regs["IDENTIFY_DEVICE_DMA"]:
+ self.transport.send(self.hdd.identify_device_dma_cmd(fis))
+ elif isinstance(fis, FIS_DATA):
+ self.hdd.data_cmd(fis)
+
+# HDD model
+class HDDMemRegion:
+ def __init__(self, base, length):
+ self.base = base
+ self.length = length
+ self.data = [0]*(length//4)
+
+class HDD(Module):
+ def __init__(self, command, debug=False):
+ self.command = command
+ command.set_hdd(self)
+
+ self.mem = None
+ self.wr_address = 0
+
+ def write_dma_cmd(self, fis):
+ self.wr_address = fis.lba_lsb
+ return FIS_DMA_ACTIVATE_D2H()
- def disable_dma(self):
- self.dma_enable = 0
+ def read_dma_cmd(self, fis):
+ return FIS_DATA(self.read_mem(fis.lba_lsb, fis.count*4))
- def dma_write(self, adr, data):
- current_adr = (adr-self.dma_base)//4
+ def identify_dma_cmd(self, fis):
+ return FIS_DATA([i for i in range(256)])
+
+ def data_cmd(self, fis):
+ self.write_mem(self.wr_address, fis.packet[1:])
+
+ def allocate_mem(self, base, length):
+ # XXX add support for multiple memory regions
+ self.mem = HDDMemRegion(base, length)
+
+ def write_mem(self, adr, data):
+ # XXX test if adr allocate in one memory region
+ current_adr = (adr-self.mem.base)//4
for i in range(len(data)):
- self.dma_buffer[current_adr+i] = data[i]
+ self.mem.data[current_adr+i] = data[i]
- def dma_read(self, adr, length=1):
- current_adr = (adr-self.dma_base)//4
+ def read_mem(self, adr, length=1):
+ # XXX test if adr allocate in one memory region
+ current_adr = (adr-self.mem.base)//4
data = []
for i in range(length//4):
- data.append(self.dma_buffer[current_adr+i])
+ data.append(self.mem.data[current_adr+i])
return data
- def callback(self, fis):
- # XXX maximum of 2048 DWORDS per DMA
- if isinstance(fis, FIS_REG_H2D):
- if fis.command == regs["WRITE_DMA_EXT"]:
- self.dma_address = fis.lba_lsb
- dma_activate = FIS_DMA_ACTIVATE_D2H()
- self.transport.send(dma_activate)
- elif fis.command == regs["READ_DMA_EXT"]:
- self.dma_address = fis.lba_lsb
- data = FIS_DATA(self.dma_read(fis.lba_lsb, fis.count*4))
- self.transport.send(data)
- elif fis.command == regs["IDENTIFY_DEVICE_DMA"]:
- self.dma_address = fis.lba_lsb
- data = FIS_DATA(self.dma_read(fis.lba_lsb, fis.count*4))
- self.transport.send(data)
- elif isinstance(fis, FIS_DATA):
- if self.dma_enable:
- self.dma_write(self.dma_address, fis.packet[1:])
- self.dma_address += len(fis.packet[1:])
-
class BFM(Module):
def __init__(self,
phy_debug=False,
link_debug=False, link_random_level=0,
transport_debug=False, transport_loopback=False,
- command_debug=False
+ command_debug=False,
+ hdd_debug=False
):
###
self.submodules.phy = PHYLayer(phy_debug)
self.submodules.link = LinkLayer(self.phy, link_debug, link_random_level)
self.submodules.transport = TransportLayer(self.link, transport_debug, transport_loopback)
self.submodules.command = CommandLayer(self.transport, command_debug)
+ self.submodules.hdd = HDD(self.command, hdd_debug)