self.append(dword)
class LinkRXPacket(LinkPacket):
- def decode(self):
- self.descramble()
- return self.check_crc()
-
def descramble(self):
for i in range(len(self)):
self[i] = self[i] ^ self.scrambled_datas[i]
self.pop()
return r
-class LinkTXPacket(LinkPacket):
- def encode(self):
- self.insert_crc()
- self.scramble()
-
- def scramble(self):
- for i in range(len(self)):
- self[i] = self[i] ^ self.scrambled_datas[i]
+ def decode(self):
+ self.descramble()
+ return self.check_crc()
+class LinkTXPacket(LinkPacket):
def insert_crc(self):
stdin = ""
for v in self:
crc = int(out.decode("ASCII"), 16)
self.append(crc)
+ def scramble(self):
+ for i in range(len(self)):
+ self[i] = self[i] ^ self.scrambled_datas[i]
+
+ def encode(self):
+ self.insert_crc()
+ self.scramble()
+
class LinkLayer(Module):
def __init__(self, phy, debug=False, random_level=0):
self.phy = phy
def set_transport_callback(self, callback):
self.transport_callback = callback
- def remove_cont(self, dword):
- if dword == primitives["HOLD"]:
- if self.rx_cont:
- self.tx_lasts = [0, 0, 0]
- if dword == primitives["CONT"]:
- self.rx_cont = True
- elif is_primitive(dword):
- self.rx_last = dword
- self.rx_cont = False
- if self.rx_cont:
- dword = self.rx_last
- return dword
-
- def callback(self, dword):
- if dword == primitives["X_RDY"]:
- self.phy.send(primitives["R_RDY"])
- elif dword == primitives["WTRM"]:
- self.phy.send(primitives["R_OK"])
- if self.rx_packet.ongoing:
- self.rx_packet.decode()
- if self.transport_callback is not None:
- self.transport_callback(self.rx_packet)
- self.rx_packet.ongoing = False
- elif dword == primitives["HOLD"]:
- self.phy.send(primitives["HOLDA"])
- elif dword == primitives["EOF"]:
- pass
- elif self.rx_packet.ongoing:
- if dword != primitives["HOLD"]:
- n = randn(100)
- if n < self.random_level:
- self.phy.send(primitives["HOLD"])
- else:
- self.phy.send(primitives["R_IP"])
- if not is_primitive(dword):
- self.rx_packet.append(dword)
- elif dword == primitives["SOF"]:
- self.rx_packet = LinkRXPacket()
- self.rx_packet.ongoing = True
-
def send(self, dword):
if self.send_state == "RDY":
self.phy.send(primitives["X_RDY"])
else:
self.tx_cont_nb = 0
+ def remove_cont(self, dword):
+ if dword == primitives["HOLD"]:
+ if self.rx_cont:
+ self.tx_lasts = [0, 0, 0]
+ if dword == primitives["CONT"]:
+ self.rx_cont = True
+ elif is_primitive(dword):
+ self.rx_last = dword
+ self.rx_cont = False
+ if self.rx_cont:
+ dword = self.rx_last
+ return dword
+
+ def callback(self, dword):
+ if dword == primitives["X_RDY"]:
+ self.phy.send(primitives["R_RDY"])
+ elif dword == primitives["WTRM"]:
+ self.phy.send(primitives["R_OK"])
+ if self.rx_packet.ongoing:
+ self.rx_packet.decode()
+ if self.transport_callback is not None:
+ self.transport_callback(self.rx_packet)
+ self.rx_packet.ongoing = False
+ elif dword == primitives["HOLD"]:
+ self.phy.send(primitives["HOLDA"])
+ elif dword == primitives["EOF"]:
+ pass
+ elif self.rx_packet.ongoing:
+ if dword != primitives["HOLD"]:
+ n = randn(100)
+ if n < self.random_level:
+ self.phy.send(primitives["HOLD"])
+ else:
+ self.phy.send(primitives["R_IP"])
+ if not is_primitive(dword):
+ self.rx_packet.append(dword)
+ elif dword == primitives["SOF"]:
+ self.rx_packet = LinkRXPacket()
+ self.rx_packet.ongoing = True
+
def gen_simulation(self, selfp):
self.tx_packet.done = True
self.phy.send(primitives["SYNC"])
self.wr_length = 0
self.wr_cnt = 0
+ def allocate_mem(self, base, length):
+ if self.mem_debug:
+ print("[HDD] : Allocating {n} bytes at 0x{a}".format(n=length, a=base))
+ self.mem = HDDMemRegion(base, length)
+
+ def write_mem(self, adr, data):
+ if self.mem_debug:
+ print("[HDD] : Writing {n} bytes at 0x{a}".format(n=len(data)*4, a=adr))
+ current_adr = (adr-self.mem.base)//4
+ for i in range(len(data)):
+ self.mem.data[current_adr+i] = data[i]
+
+ def read_mem(self, adr, length=1):
+ if self.mem_debug:
+ print("[HDD] : Reading {n} bytes at 0x{a}".format(n=length, a=adr))
+ current_adr = (adr-self.mem.base)//4
+ data = []
+ for i in range(length//4):
+ data.append(self.mem.data[current_adr+i])
+ return data
+
def write_dma_cmd(self, fis):
self.wr_address = fis.lba_lsb
self.wr_length = fis.count
return [FIS_REG_D2H()]
else:
return None
-
- def allocate_mem(self, base, length):
- if self.mem_debug:
- print("[HDD] : Allocating {n} bytes at 0x{a}".format(n=length, a=base))
- self.mem = HDDMemRegion(base, length)
-
- def write_mem(self, adr, data):
- if self.mem_debug:
- print("[HDD] : Writing {n} bytes at 0x{a}".format(n=len(data)*4, a=adr))
- current_adr = (adr-self.mem.base)//4
- for i in range(len(data)):
- self.mem.data[current_adr+i] = data[i]
-
- def read_mem(self, adr, length=1):
- if self.mem_debug:
- print("[HDD] : Reading {n} bytes at 0x{a}".format(n=length, a=adr))
- current_adr = (adr-self.mem.base)//4
- data = []
- for i in range(length//4):
- data.append(self.mem.data[current_adr+i])
- return data