self.socket.close()
del self.socket
- def read(self, addr, length=1):
+ def read(self, addr, length=None):
+ length_int = 1 if length is None else length
# prepare packet
record = EtherboneRecord()
- record.reads = EtherboneReads(addrs=[addr + 4*j for j in range(length)])
+ record.reads = EtherboneReads(addrs=[addr + 4*j for j in range(length_int)])
record.rcount = len(record.reads)
# send packet
if self.debug:
for i, data in enumerate(datas):
print("read {:08x} @ {:08x}".format(data, addr + 4*i))
- if length == 1:
- return datas[0]
- else:
- return datas
+ return datas[0] if length is None else datas
def write(self, addr, datas):
- if not isinstance(datas, list):
- datas = [datas]
+ datas = datas if isinstance(datas, list) else [datas]
record = EtherboneRecord()
record.writes = EtherboneWrites(base_addr=addr, datas=[d for d in datas])
record.wcount = len(record.writes)
def __init__(self, port, baudrate=115200, debug=False):
self.port = port
self.baudrate = str(baudrate)
- self.csr_data_width = None
self.debug = debug
self.port = serial.serial_for_url(port, baudrate)
- def open(self, csr_data_width):
- self.csr_data_width = csr_data_width
+ def open(self):
if hasattr(self, "port"):
return
self.port.open()
pos += written
def read(self, addr, length=None):
- r = []
+ data = []
length_int = 1 if length is None else length
self._write([self.msg_type["read"], length_int])
self._write(list((addr//4).to_bytes(4, byteorder="big")))
for i in range(length_int):
- data = int.from_bytes(self._read(4), "big")
+ value = int.from_bytes(self._read(4), "big")
if self.debug:
print("read {:08x} @ {:08x}".format(data, addr + 4*i))
if length is None:
- return data
- r.append(data)
- return r
+ return value
+ data.append(value)
+ return data
def write(self, addr, data):
data = data if isinstance(data, list) else [data]
length = len(data)
self._write([self.msg_type["write"], length])
self._write(list((addr//4).to_bytes(4, byteorder="big")))
- for i in range(len(data)):
- self._write(list(data[i].to_bytes(4, byteorder="big")))
+ for i, value in enumerate(range(data)):
+ self._write(list(value.to_bytes(4, byteorder="big")))
if self.debug:
- print("write {:08x} @ {:08x}".format(data[i], addr + 4*i))
+ print("write {:08x} @ {:08x}".format(value, addr + 4*i))
class RemoteServer(EtherboneIPC):
- def __init__(self, comm, port=1234, csr_data_width=32):
+ def __init__(self, comm, port=1234):
self.comm = comm
self.port = port
- self.csr_data_width = 32
def open(self):
if hasattr(self, "socket"):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind(("localhost", self.port))
self.socket.listen(1)
- self.comm.open(self.csr_data_width)
+ self.comm.open()
def close(self):
self.comm.close()
parser.add_argument("--comm", default="uart", help="comm interface")
parser.add_argument("--port", default="2", help="UART port")
parser.add_argument("--baudrate", default=115200, help="UART baudrate")
- parser.add_argument("--csr_data_width", default=32, help="CSR data_width")
return parser.parse_args()
def main():
else:
raise NotImplementedError
- server = RemoteServer(comm, csr_data_width=args.csr_data_width)
+ server = RemoteServer(comm)
server.open()
server.start()
try: