--- /dev/null
+import csv
+
+# TODO: share reg for all software drivers
+
+class MappedReg:
+ def __init__(self, readfn, writefn, name, addr, length, busword, mode):
+ self.readfn = readfn
+ self.writefn = writefn
+ self.addr = addr
+ self.length = length
+ self.busword = busword
+ self.mode = mode
+
+ def read(self):
+ if self.mode not in ["rw", "ro"]:
+ raise KeyError(name + "register not readable")
+ datas = self.readfn(self.addr, burst_length=self.length)
+ if isinstance(datas, int):
+ return datas
+ else:
+ data = 0
+ for i in range(self.length):
+ data = data << self.busword
+ data |= datas[i]
+ return data
+
+ def write(self, value):
+ if self.mode not in ["rw", "wo"]:
+ raise KeyError(name + "register not writable")
+ datas = []
+ for i in range(self.length):
+ datas.append((value >> ((self.length-1-i)*self.busword)) & (2**self.busword-1))
+ self.writefn(self.addr, datas)
+
+
+class MappedRegs:
+ def __init__(self, d):
+ self.d = d
+
+ def __getattr__(self, attr):
+ try:
+ return self.__dict__['d'][attr]
+ except KeyError:
+ pass
+ raise KeyError("No such register " + attr)
+
+
+def build_map(addrmap, busword, readfn, writefn):
+ csv_reader = csv.reader(open(addrmap), delimiter=',', quotechar='#')
+ d = {}
+ for item in csv_reader:
+ name, addr, length, mode = item
+ addr = int(addr.replace("0x", ""), 16)
+ length = int(length)
+ d[name] = MappedReg(readfn, writefn, name, addr, length, busword, mode)
+ return MappedRegs(d)
--- /dev/null
+import serial
+from struct import *
+
+# TODO: share reg for all software drivers
+from litex.soc.cores.uart.software.reg import *
+
+
+def write_b(uart, data):
+ uart.write(pack('B', data))
+
+
+class UARTWishboneBridgeDriver:
+ cmds = {
+ "write": 0x01,
+ "read": 0x02
+ }
+ def __init__(self, port, baudrate=115200, addrmap=None, busword=8, debug=False):
+ self.port = port
+ self.baudrate = str(baudrate)
+ self.debug = debug
+ self.uart = serial.Serial(port, baudrate, timeout=0.25)
+ if addrmap is not None:
+ self.regs = build_map(addrmap, busword, self.read, self.write)
+
+ def open(self):
+ self.uart.flushOutput()
+ self.uart.close()
+ self.uart.open()
+ self.uart.flushInput()
+
+ def close(self):
+ self.uart.flushOutput()
+ self.uart.close()
+
+ def read(self, addr, burst_length=1):
+ datas = []
+ self.uart.flushInput()
+ write_b(self.uart, self.cmds["read"])
+ write_b(self.uart, burst_length)
+ word_addr = addr//4
+ write_b(self.uart, (word_addr >> 24) & 0xff)
+ write_b(self.uart, (word_addr >> 16) & 0xff)
+ write_b(self.uart, (word_addr >> 8) & 0xff)
+ write_b(self.uart, (word_addr >> 0) & 0xff)
+ for i in range(burst_length):
+ data = 0
+ for k in range(4):
+ data = data << 8
+ data |= ord(self.uart.read())
+ if self.debug:
+ print("RD {:08X} @ {:08X}".format(data, addr + 4*i))
+ datas.append(data)
+ if burst_length == 1:
+ return datas[0]
+ else:
+ return datas
+
+ def write(self, addr, data):
+ if isinstance(data, list):
+ burst_length = len(data)
+ else:
+ burst_length = 1
+ data = [data]
+ write_b(self.uart, self.cmds["write"])
+ write_b(self.uart, burst_length)
+ word_addr = addr//4
+ write_b(self.uart, (word_addr >> 24) & 0xff)
+ write_b(self.uart, (word_addr >> 16) & 0xff)
+ write_b(self.uart, (word_addr >> 8) & 0xff)
+ write_b(self.uart, (word_addr >> 0) & 0xff)
+ for i in range(len(data)):
+ dat = data[i]
+ for j in range(4):
+ write_b(self.uart, (dat >> 24) & 0xff)
+ dat = dat << 8
+ if self.debug:
+ print("WR {:08X} @ {:08X}".format(data[i], addr + 4*i))