if __name__ == "__main__":
args = _get_args()
if args.bridge == "uart":
- from misoclib.tools.litescope.host.driver.uart import LiteScopeUART2WishboneDriver
+ from misoclib.tools.litescope.software.driver.uart import LiteScopeUART2WishboneDriver
port = args.port if not args.port.isdigit() else int(args.port)
wb = LiteScopeUART2WishboneDriver(port, args.baudrate, "./csr.csv", int(args.busword), debug=False)
elif args.bridge == "etherbone":
- from misoclib.tools.litescope.host.driver.etherbone import LiteScopeEtherboneDriver
+ from misoclib.tools.litescope.software.driver.etherbone import LiteScopeEtherboneDriver
wb = LiteScopeEtherboneDriver(args.ip_address, int(args.udp_port), "./csr.csv", int(args.busword), debug=False)
else:
ValueError("Invalid bridge {}".format(args.bridge))
import time
-from misoclib.tools.litescope.host.driver.la import LiteScopeLADriver
+from misoclib.tools.litescope.software.driver.la import LiteScopeLADriver
def main(wb):
if __name__ == "__main__":
args = _get_args()
if args.bridge == "uart":
- from misoclib.tools.litescope.host.driver.uart import LiteScopeUART2WishboneDriver
+ from misoclib.tools.litescope.software.driver.uart import LiteScopeUART2WishboneDriver
port = args.port if not args.port.isdigit() else int(args.port)
wb = LiteScopeUART2WishboneDriver(port, args.baudrate, "./csr.csv", int(args.busword), debug=False)
elif args.bridge == "etherbone":
- from misoclib.tools.litescope.host.driver.etherbone import LiteScopeEtherboneDriver
+ from misoclib.tools.litescope.software.driver.etherbone import LiteScopeEtherboneDriver
wb = LiteScopeEtherboneDriver(args.ip_address, int(args.udp_port), "./csr.csv", int(args.busword), debug=False)
elif args.bridge == "pcie":
- from misoclib.tools.litescope.host.driver.pcie import LiteScopePCIeDriver
+ from misoclib.tools.litescope.software.driver.pcie import LiteScopePCIeDriver
wb = LiteScopePCIeDriver(args.bar, args.bar_size, "./csr.csv", int(args.busword), debug=False)
else:
ValueError("Invalid bridge {}".format(args.bridge))
import argparse
import random as rand
from collections import OrderedDict
-from misoclib.tools.litescope.host.driver.uart import LiteScopeUART2WishboneDriver
+from misoclib.tools.litescope.software.driver.uart import LiteScopeUART2WishboneDriver
KB = 1024
MB = 1024*KB
if __name__ == "__main__":
args = _get_args()
if args.bridge == "uart":
- from misoclib.tools.litescope.host.driver.uart import LiteScopeUART2WishboneDriver
+ from misoclib.tools.litescope.software.driver.uart import LiteScopeUART2WishboneDriver
port = args.port if not args.port.isdigit() else int(args.port)
wb = LiteScopeUART2WishboneDriver(port, args.baudrate, "./csr.csv", int(args.busword), debug=False)
elif args.bridge == "etherbone":
- from misoclib.tools.litescope.host.driver.etherbone import LiteScopeEtherboneDriver
+ from misoclib.tools.litescope.software.driver.etherbone import LiteScopeEtherboneDriver
wb = LiteScopeEtherboneDriver(args.ip_address, int(args.udp_port), "./csr.csv", int(args.busword), debug=False)
else:
ValueError("Invalid bridge {}".format(args.bridge))
import sys
from tools import *
from test_bist import *
-from litescope.host.driver.la import LiteScopeLADriver
+from litescope.software.driver.la import LiteScopeLADriver
def main(wb):
-from litescope.host.dump import *
+from litescope.software.dump import *
primitives = {
"ALIGN": 0x7B4A4ABC,
if __name__ == "__main__":
args = _get_args()
if args.bridge == "uart":
- from misoclib.tools.litescope.host.driver.uart import LiteScopeUART2WishboneDriver
+ from misoclib.tools.litescope.software.driver.uart import LiteScopeUART2WishboneDriver
port = args.port if not args.port.isdigit() else int(args.port)
wb = LiteScopeUART2WishboneDriver(port, args.baudrate, "./csr.csv", int(args.busword), debug=False)
elif args.bridge == "etherbone":
- from misoclib.tools.litescope.host.driver.etherbone import LiteScopeEtherboneDriver
+ from misoclib.tools.litescope.software.driver.etherbone import LiteScopeEtherboneDriver
wb = LiteScopeEtherboneDriver(args.ip_address, int(args.udp_port), "./csr.csv", int(args.busword), debug=False)
else:
ValueError("Invalid bridge {}".format(args.bridge))
import time
-from misoclib.tools.litescope.host.driver.io import LiteScopeIODriver
+from misoclib.tools.litescope.software.driver.io import LiteScopeIODriver
def led_anim0(io):
-from misoclib.tools.litescope.host.driver.la import LiteScopeLADriver
+from misoclib.tools.litescope.software.driver.la import LiteScopeLADriver
def main(wb):
+++ /dev/null
-import socket
-from misoclib.tools.litescope.host.driver.reg import *
-
-from liteeth.test.model.etherbone import *
-
-
-class LiteScopeEtherboneDriver:
- def __init__(self, ip_address, udp_port=20000, addrmap=None, busword=8, debug=False):
- self.ip_address = ip_address
- self.udp_port = udp_port
- self.debug = debug
-
- self.tx_sock = None
- self.rx_sock = None
- if addrmap is not None:
- self.regs = build_map(addrmap, busword, self.read, self.write)
-
- def open(self):
- self.tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- self.rx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- self.rx_sock.bind(("", self.udp_port))
-
- def close(self):
- pass
-
- def read(self, addr, burst_length=1):
- reads_addrs = [addr+4*j for j in range(burst_length)]
- reads = EtherboneReads(base_ret_addr=0x1000, addrs=reads_addrs)
- record = EtherboneRecord()
- record.writes = None
- record.reads = reads
- record.bca = 0
- record.rca = 0
- record.rff = 0
- record.cyc = 0
- record.wca = 0
- record.wff = 0
- record.byte_enable = 0xf
- record.wcount = 0
- record.rcount = len(reads_addrs)
-
- packet = EtherbonePacket()
- packet.records = [record]
- packet.encode()
- self.tx_sock.sendto(bytes(packet), (self.ip_address, self.udp_port))
-
- datas, addrs = self.rx_sock.recvfrom(8192)
- packet = EtherbonePacket(datas)
- packet.decode()
- datas = packet.records.pop().writes.get_datas()
- if self.debug:
- for i, data in enumerate(datas):
- print("RD {:08X} @ {:08X}".format(data, addr + 4*i))
- if burst_length == 1:
- return datas[0]
- else:
- return datas
-
- def write(self, addr, datas):
- if not isinstance(datas, list):
- datas = [datas]
- writes_datas = [d for d in datas]
- writes = EtherboneWrites(base_addr=addr, datas=writes_datas)
- record = EtherboneRecord()
- record.writes = writes
- record.reads = None
- record.bca = 0
- record.rca = 0
- record.rff = 0
- record.cyc = 0
- record.wca = 0
- record.wff = 0
- record.byte_enable = 0xf
- record.wcount = len(writes_datas)
- record.rcount = 0
-
- packet = EtherbonePacket()
- packet.records = [record]
- packet.encode()
- self.tx_sock.sendto(bytes(packet), (self.ip_address, self.udp_port))
-
- if self.debug:
- for i, data in enumerate(datas):
- print("WR {:08X} @ {:08X}".format(data, addr + 4*i))
+++ /dev/null
-class LiteScopeIODriver():
- def __init__(self, regs, name):
- self.regs = regs
- self.name = name
- self.build()
-
- def build(self):
- for key, value in self.regs.d.items():
- if self.name in key:
- key = key.replace(self.name + "_", "")
- setattr(self, key, value)
-
- def write(self, value):
- self.output.write(value)
-
- def read(self):
- return self.input.read()
+++ /dev/null
-import csv
-from struct import *
-from migen.fhdl.structure import *
-from misoclib.tools.litescope.host.dump import *
-from misoclib.tools.litescope.host.driver.truthtable import *
-
-
-class LiteScopeLADriver():
- def __init__(self, regs, name, config_csv=None, clk_freq=None, debug=False):
- self.regs = regs
- self.name = name
- if config_csv is None:
- self.config_csv = name + ".csv"
- if clk_freq is None:
- try:
- self.clk_freq = regs.identifier_frequency.read()
- except:
- self.clk_freq = None
- self.samplerate = self.clk_freq
- else:
- self.clk_freq = clk_freq
- self.samplerate = clk_freq
- self.debug = debug
- self.get_config()
- self.get_layout()
- self.build()
- self.data = Dat(self.dw)
-
- def get_config(self):
- csv_reader = csv.reader(open(self.config_csv), delimiter=',', quotechar='#')
- for item in csv_reader:
- t, n, v = item
- if t == "config":
- setattr(self, n, int(v))
-
- def get_layout(self):
- self.layout = []
- csv_reader = csv.reader(open(self.config_csv), delimiter=',', quotechar='#')
- for item in csv_reader:
- t, n, v = item
- if t == "layout":
- self.layout.append((n, int(v)))
-
- def build(self):
- for key, value in self.regs.d.items():
- if self.name == key[:len(self.name)]:
- key = key.replace(self.name + "_", "")
- setattr(self, key, value)
- value = 1
- for name, length in self.layout:
- setattr(self, name + "_o", value)
- value = value*(2**length)
- value = 0
- for name, length in self.layout:
- setattr(self, name + "_m", (2**length-1) << value)
- value += length
-
- def configure_term(self, port, trigger=0, mask=0, cond=None):
- if cond is not None:
- for k, v in cond.items():
- trigger |= getattr(self, k + "_o")*v
- mask |= getattr(self, k + "_m")
- t = getattr(self, "trigger_port{d}_trig".format(d=int(port)))
- m = getattr(self, "trigger_port{d}_mask".format(d=int(port)))
- t.write(trigger)
- m.write(mask)
-
- def configure_range_detector(self, port, low, high):
- l = getattr(self, "trigger_port{d}_low".format(d=int(port)))
- h = getattr(self, "trigger_port{d}_high".format(d=int(port)))
- l.write(low)
- h.write(high)
-
- def configure_edge_detector(self, port, rising_mask, falling_mask, both_mask):
- rm = getattr(self, "trigger_port{d}_rising_mask".format(d=int(port)))
- fm = getattr(self, "trigger_port{d}_falling_mask".format(d=int(port)))
- bm = getattr(self, "trigger_port{d}_both_mask".format(d=int(port)))
- rm.write(rising_mask)
- fm.write(falling_mask)
- bm.write(both_mask)
-
- def configure_sum(self, equation):
- datas = gen_truth_table(equation)
- for adr, dat in enumerate(datas):
- self.trigger_sum_prog_adr.write(adr)
- self.trigger_sum_prog_dat.write(dat)
- self.trigger_sum_prog_we.write(1)
-
- def configure_subsampler(self, n):
- self.subsampler_value.write(n-1)
- if self.clk_freq is not None:
- self.samplerate = self.clk_freq//n
- else:
- self.samplerate = None
-
- def configure_qualifier(self, v):
- self.recorder_qualifier.write(v)
-
- def configure_rle(self, v):
- self.rle_enable.write(v)
-
- def done(self):
- return self.recorder_done.read()
-
- def run(self, offset, length):
- if self.debug:
- print("running")
- self.recorder_offset.write(offset)
- self.recorder_length.write(length)
- self.recorder_trigger.write(1)
-
- def upload(self):
- if self.debug:
- print("uploading")
- while self.recorder_source_stb.read():
- self.data.append(self.recorder_source_data.read())
- self.recorder_source_ack.write(1)
- if self.with_rle:
- if self.rle_enable.read():
- self.data = self.data.decode_rle()
- return self.data
-
- def save(self, filename):
- if self.debug:
- print("saving to " + filename)
- name, ext = os.path.splitext(filename)
- if ext == ".vcd":
- from misoclib.tools.litescope.host.dump.vcd import VCDDump
- dump = VCDDump()
- elif ext == ".csv":
- from misoclib.tools.litescope.host.dump.csv import CSVDump
- dump = CSVDump()
- elif ext == ".py":
- from misoclib.tools.litescope.host.dump.python import PythonDump
- dump = PythonDump()
- elif ext == ".sr":
- from misoclib.tools.litescope.host.dump.sigrok import SigrokDump
- if self.samplerate is None:
- raise ValueError("Unable to automatically retrieve clk_freq, clk_freq parameter required")
- dump = SigrokDump(samplerate=self.samplerate)
- else:
- raise NotImplementedError
- dump.add_from_layout(self.layout, self.data)
- dump.write(filename)
+++ /dev/null
-import string
-import mmap
-from misoclib.tools.litescope.host.driver.reg import *
-
-
-class LiteScopePCIeDriver:
- def __init__(self, bar, bar_size, addrmap=None, busword=8, debug=False):
- self.bar = bar
- self.bar_size = bar_size
- self.debug = debug
- self.f = None
- self.mmap = None
- self.regs = build_map(addrmap, busword, self.read, self.write)
-
- def open(self):
- self.f = open(self.bar, "r+b")
- self.f.flush()
- self.mmap = mmap.mmap(self.f.fileno(), self.bar_size)
-
- def close(self):
- self.mmap.close()
- self.f.close()
-
- def read(self, addr, burst_length=1):
- datas = []
- for i in range(burst_length):
- self.mmap.seek(addr + 4*i)
- dat = self.mmap.read(4)
- val = dat[3] << 24
- val |= dat[2] << 16
- val |= dat[1] << 8
- val |= dat[0] << 0
- if self.debug:
- print("RD {:08X} @ {:08X}".format(val, addr + 4*i))
- datas.append(val)
- if burst_length == 1:
- return datas[0]
- else:
- return datas
-
- def write(self, addr, data):
- if isinstance(data, list):
- burst_length = len(data)
- else:
- burst_length = 1
- data = [data]
-
- for i, dat in enumerate(data):
- dat_bytes = [0, 0, 0, 0]
- dat_bytes[3] = (dat >> 24) & 0xff
- dat_bytes[2] = (dat >> 16) & 0xff
- dat_bytes[1] = (dat >> 8) & 0xff
- dat_bytes[0] = (dat >> 0) & 0xff
- self.mmap[addr + 4*i:addr + 4*(i+1)] = bytes(dat_bytes)
- if self.debug:
- print("WR {:08X} @ {:08X}".format(dat, (addr + i)*4))
+++ /dev/null
-import csv
-
-
-class MappedReg:
- def __init__(self, readfn, writefn, name, addr, length, busword, mode):
- self.readfn = readfn
- self.writefn = writefn
- self.addr = addr
- self.length = length
- self.busword = busword
- self.mode = mode
-
- def read(self):
- if self.mode not in ["rw", "ro"]:
- raise KeyError(name + "register not readable")
- datas = self.readfn(self.addr, burst_length=self.length)
- if isinstance(datas, int):
- return datas
- else:
- data = 0
- for i in range(self.length):
- data = data << self.busword
- data |= datas[i]
- return data
-
- def write(self, value):
- if self.mode not in ["rw", "wo"]:
- raise KeyError(name + "register not writable")
- datas = []
- for i in range(self.length):
- datas.append((value >> ((self.length-1-i)*self.busword)) & (2**self.busword-1))
- self.writefn(self.addr, datas)
-
-
-class MappedRegs:
- def __init__(self, d):
- self.d = d
-
- def __getattr__(self, attr):
- try:
- return self.__dict__['d'][attr]
- except KeyError:
- pass
- raise KeyError("No such register " + attr)
-
-
-def build_map(addrmap, busword, readfn, writefn):
- csv_reader = csv.reader(open(addrmap), delimiter=',', quotechar='#')
- d = {}
- for item in csv_reader:
- name, addr, length, mode = item
- addr = int(addr.replace("0x", ""), 16)
- length = int(length)
- d[name] = MappedReg(readfn, writefn, name, addr, length, busword, mode)
- return MappedRegs(d)
+++ /dev/null
-import os
-import re
-import sys
-
-
-def is_number(x):
- try:
- _ = float(x)
- except ValueError:
- return False
- return True
-
-
-def remove_numbers(seq):
- return [x for x in seq if not is_number(x)]
-
-
-def remove_duplicates(seq):
- seen = set()
- seen_add = seen.add
- return [x for x in seq if x not in seen and not seen_add(x)]
-
-
-def get_operands(s):
- operands = re.findall("[A-z0-9_]+", s)
- operands = remove_duplicates(operands)
- operands = remove_numbers(operands)
- return sorted(operands)
-
-
-def gen_truth_table(s):
- operands = get_operands(s)
- width = len(operands)
- stim = []
- for i in range(width):
- stim_op = []
- for j in range(2**width):
- stim_op.append((int(j/(2**i)))%2)
- stim.append(stim_op)
-
- truth_table = []
- for i in range(2**width):
- for j in range(width):
- exec("{} = stim[j][i]".format(operands[j]))
- truth_table.append(eval(s) != 0)
- return truth_table
-
-
-def main():
- print(gen_truth_table("(A&B&C)|D"))
-
-if __name__ == '__main__':
- main()
+++ /dev/null
-import serial
-from struct import *
-from misoclib.tools.litescope.host.driver.reg import *
-
-
-def write_b(uart, data):
- uart.write(pack('B', data))
-
-
-class LiteScopeUART2WishboneDriver:
- cmds = {
- "write": 0x01,
- "read": 0x02
- }
- def __init__(self, port, baudrate=115200, addrmap=None, busword=8, debug=False):
- self.port = port
- self.baudrate = str(baudrate)
- self.debug = debug
- self.uart = serial.Serial(port, baudrate, timeout=0.25)
- if addrmap is not None:
- self.regs = build_map(addrmap, busword, self.read, self.write)
-
- def open(self):
- self.uart.flushOutput()
- self.uart.close()
- self.uart.open()
- self.uart.flushInput()
-
- def close(self):
- self.uart.flushOutput()
- self.uart.close()
-
- def read(self, addr, burst_length=1):
- datas = []
- self.uart.flushInput()
- write_b(self.uart, self.cmds["read"])
- write_b(self.uart, burst_length)
- word_addr = addr//4
- write_b(self.uart, (word_addr >> 24) & 0xff)
- write_b(self.uart, (word_addr >> 16) & 0xff)
- write_b(self.uart, (word_addr >> 8) & 0xff)
- write_b(self.uart, (word_addr >> 0) & 0xff)
- for i in range(burst_length):
- data = 0
- for k in range(4):
- data = data << 8
- data |= ord(self.uart.read())
- if self.debug:
- print("RD {:08X} @ {:08X}".format(data, addr + 4*i))
- datas.append(data)
- if burst_length == 1:
- return datas[0]
- else:
- return datas
-
- def write(self, addr, data):
- if isinstance(data, list):
- burst_length = len(data)
- else:
- burst_length = 1
- data = [data]
- write_b(self.uart, self.cmds["write"])
- write_b(self.uart, burst_length)
- word_addr = addr//4
- write_b(self.uart, (word_addr >> 24) & 0xff)
- write_b(self.uart, (word_addr >> 16) & 0xff)
- write_b(self.uart, (word_addr >> 8) & 0xff)
- write_b(self.uart, (word_addr >> 0) & 0xff)
- for i in range(len(data)):
- dat = data[i]
- for j in range(4):
- write_b(self.uart, (dat >> 24) & 0xff)
- dat = dat << 8
- if self.debug:
- print("WR {:08X} @ {:08X}".format(data[i], addr + 4*i))
+++ /dev/null
-def dec2bin(d, nb=0):
- if d == "x":
- return "x"*nb
- elif d == 0:
- b = "0"
- else:
- b = ""
- while d != 0:
- b = "01"[d&1] + b
- d = d >> 1
- return b.zfill(nb)
-
-
-def get_bits(values, low, high=None):
- r = []
- if high is None:
- high = low+1
- for val in values:
- t = (val >> low) & (2**(high-low)-1)
- r.append(t)
- return r
-
-
-class Dat(list):
- def __init__(self, width):
- self.width = width
-
- def __getitem__(self, key):
- if isinstance(key, int):
- return get_bits(self, key)
- elif isinstance(key, slice):
- if key.start != None:
- start = key.start
- else:
- start = 0
- if key.stop != None:
- stop = key.stop
- else:
- stop = self.width
- if stop > self.width:
- stop = self.width
- if key.step != None:
- raise KeyError
- return get_bits(self, start, stop)
- else:
- raise KeyError
-
- def decode_rle(self):
- datas = Dat(self.width-1)
- last_data = 0
- for data in self:
- rle = data >> (self.width-1)
- data = data & (2**(self.width-1)-1)
- if rle:
- for i in range(data):
- datas.append(last_data)
- else:
- datas.append(data)
- last_data = data
- return datas
-
-
-class Var:
- def __init__(self, name, width, values=[], type="wire", default="x"):
- self.type = type
- self.width = width
- self.name = name
- self.val = default
- self.values = values
- self.vcd_id = None
-
- def set_vcd_id(self, s):
- self.vcd_id = s
-
- def __len__(self):
- return len(self.values)
-
- def change(self, cnt):
- r = ""
- try:
- if self.values[cnt+1] != self.val:
- r += "b"
- r += dec2bin(self.values[cnt+1], self.width)
- r += " "
- r += self.vcd_id
- r += "\n"
- return r
- except:
- return r
- return r
-
-
-class Dump:
- def __init__(self):
- self.vars = []
- self.vcd_id = "!"
-
- def add(self, var):
- var.set_vcd_id(self.vcd_id)
- self.vcd_id = chr(ord(self.vcd_id)+1)
- self.vars.append(var)
-
- def add_from_layout(self, layout, var):
- i = 0
- for s, n in layout:
- self.add(Var(s, n, var[i:i+n]))
- i += n
-
- def __len__(self):
- l = 0
- for var in self.vars:
- l = max(len(var), l)
- return l
+++ /dev/null
-from misoclib.tools.litescope.host.dump import *
-
-
-class CSVDump(Dump):
- def __init__(self, init_dump=None):
- Dump.__init__(self)
- if init_dump:
- self.vars = init_dump.vars
-
- def generate_vars(self):
- r = ""
- for var in self.vars:
- r += var.name
- r += ","
- r += "\n"
- for var in self.vars:
- r += str(var.width)
- r += ","
- r += "\n"
- return r
-
- def generate_dumpvars(self):
- r = ""
- for i in range(len(self)):
- for var in self.vars:
- try:
- var.val = var.values[i]
- except:
- pass
- if var.val == "x":
- r += "x"
- else:
- r += dec2bin(var.val, var.width)
- r += ", "
- r += "\n"
- return r
-
- def write(self, filename):
- f = open(filename, "w")
- f.write(self.generate_vars())
- f.write(self.generate_dumpvars())
- f.close()
-
- def read(self, filename):
- raise NotImplementedError("CSV files can not (yet) be read, please contribute!")
-
-if __name__ == '__main__':
- dump = CSVDump()
- dump.add(Var("foo1", 1, [0, 1, 0, 1, 0, 1]))
- dump.add(Var("foo2", 2, [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0]))
- ramp = [i%128 for i in range(1024)]
- dump.add(Var("ramp", 16, ramp))
- dump.write("dump.csv")
+++ /dev/null
-from misoclib.tools.litescope.host.dump import *
-
-
-class PythonDump(Dump):
- def __init__(self, init_dump=None):
- Dump.__init__(self)
- if init_dump:
- self.vars = init_dump.vars
-
- def generate_data(self):
- r = "dump = {\n"
- for var in self.vars:
- r += "\"" + var.name + "\""
- r += " : "
- r += str(var.values)
- r += ",\n"
- r += "}"
- return r
-
- def write(self, filename):
- f = open(filename, "w")
- f.write(self.generate_data())
- f.close()
-
- def read(self, filename):
- raise NotImplementedError("Python files can not (yet) be read, please contribute!")
-
-if __name__ == '__main__':
- dump = PythonDump()
- dump.add(Var("foo1", 1, [0, 1, 0, 1, 0, 1]))
- dump.add(Var("foo2", 2, [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0]))
- ramp = [i%128 for i in range(1024)]
- dump.add(Var("ramp", 16, ramp))
- dump.write("dump.py")
+++ /dev/null
-import os
-import math
-import shutil
-import zipfile
-import re
-from collections import OrderedDict
-
-from misoclib.tools.litescope.host.dump import *
-
-
-class SigrokDump(Dump):
- def __init__(self, init_dump=None, samplerate=50000000):
- Dump.__init__(self)
- if init_dump:
- self.vars = init_dump.vars
- self.samplerate = samplerate
-
- def write_version(self):
- f = open("version", "w")
- f.write("1")
- f.close()
-
- def write_metadata(self, name):
- f = open("metadata", "w")
- r = """
-[global]
-sigrok version = 0.2.0
-[device 1]
-driver = litescope
-capturefile = logic-1
-unitsize = 1
-total probes = {}
-samplerate = {} KHz
-""".format(
- len(self.vars),
- self.samplerate//1000,
- )
- for i, var in enumerate(self.vars):
- r += "probe{} = {}\n".format(i+1, var.name)
- f.write(r)
- f.close()
-
- def write_data(self):
- # XXX are probes limited to 1 bit?
- data_bits = math.ceil(len(self.vars)/8)*8
- data_len = 0
- for var in self.vars:
- data_len = max(data_len, len(var))
- datas = []
- for i in range(data_len):
- data = 0
- for j, var in enumerate(reversed(self.vars)):
- data = data << 1
- try:
- data |= var.values[i] % 2
- except:
- pass
- datas.append(data)
- f = open("logic-1", "wb")
- for data in datas:
- f.write(data.to_bytes(data_bits//8, "big"))
- f.close()
-
- def zip(self, name):
- f = zipfile.ZipFile(name + ".sr", "w")
- os.chdir(name)
- f.write("version")
- f.write("metadata")
- f.write("logic-1")
- os.chdir("..")
- f.close()
-
- def write(self, filename):
- name, ext = os.path.splitext(filename)
- if os.path.exists(name):
- shutil.rmtree(name)
- os.makedirs(name)
- os.chdir(name)
- self.write_version()
- self.write_metadata(name)
- self.write_data()
- os.chdir("..")
- self.zip(name)
- shutil.rmtree(name)
-
- def unzip(self, filename, name):
- f = open(filename, "rb")
- z = zipfile.ZipFile(f)
- if os.path.exists(name):
- shutil.rmtree(name)
- os.makedirs(name)
- for file in z.namelist():
- z.extract(file, name)
- f.close()
-
- def read_metadata(self):
- probes = OrderedDict()
- f = open("metadata", "r")
- for l in f:
- m = re.search("probe([0-9]+) = (\w+)", l, re.I)
- if m is not None:
- index = int(m.group(1))
- name = m.group(2)
- probes[name] = index
- m = re.search("samplerate = ([0-9]+) kHz", l, re.I)
- if m is not None:
- self.samplerate = int(m.group(1))*1000
- m = re.search("samplerate = ([0-9]+) mHz", l, re.I)
- if m is not None:
- self.samplerate = int(m.group(1))*1000000
- f.close()
- return probes
-
- def read_data(self, name, nprobes):
- datas = []
- f = open("logic-1", "rb")
- while True:
- data = f.read(math.ceil(nprobes/8))
- if data == bytes('', "utf-8"):
- break
- data = int.from_bytes(data, "big")
- datas.append(data)
- f.close()
- return datas
-
- def read(self, filename):
- self.vars = []
- name, ext = os.path.splitext(filename)
- self.unzip(filename, name)
- os.chdir(name)
- probes = self.read_metadata()
- datas = self.read_data(name, len(probes.keys()))
- os.chdir("..")
- shutil.rmtree(name)
-
- for k, v in probes.items():
- probe_data = []
- for data in datas:
- probe_data.append((data >> (v-1)) & 0x1)
- self.add(Var(k, 1, probe_data))
-
-if __name__ == '__main__':
- dump = SigrokDump()
- dump.add(Var("foo1", 1, [0, 1, 0, 1, 0, 1]))
- dump.add(Var("foo2", 2, [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0]))
- ramp = [i%128 for i in range(1024)]
- dump.add(Var("ramp", 16, ramp))
- dump.write("dump.sr")
- dump.read("dump.sr")
- dump.write("dump_copy.sr")
+++ /dev/null
-import datetime
-from misoclib.tools.litescope.host.dump import *
-
-
-class VCDDump(Dump):
- def __init__(self, init_dump=None, timescale="1ps", comment=""):
- Dump.__init__(self)
- if init_dump:
- self.vars = init_dump.vars
- self.timescale = timescale
- self.comment = comment
- self.cnt = -1
-
- def change(self):
- r = ""
- c = ""
- for var in self.vars:
- c += var.change(self.cnt)
- if c != "":
- r += "#"
- r += str(self.cnt+1)
- r += "\n"
- r += c
- return r
-
- def generate_date(self):
- now = datetime.datetime.now()
- r = "$date\n"
- r += "\t"
- r += now.strftime("%Y-%m-%d %H:%M")
- r += "\n"
- r += "$end\n"
- return r
-
- def generate_version(self):
- r = "$version\n"
- r += "\tmiscope VCD dump\n"
- r += "$end\n"
- return r
-
- def generate_comment(self):
- r = "$comment\n"
- r += self.comment
- r += "\n$end\n"
- return r
-
- def generate_timescale(self):
- r = "$timescale "
- r += self.timescale
- r += " $end\n"
- return r
-
- def generate_scope(self):
- r = "$scope "
- r += self.timescale
- r += " $end\n"
- return r
-
- def generate_vars(self):
- r = ""
- for var in self.vars:
- r += "$var "
- r += var.type
- r += " "
- r += str(var.width)
- r += " "
- r += var.vcd_id
- r += " "
- r += var.name
- r += " $end\n"
- return r
-
- def generate_unscope(self):
- r = "$unscope "
- r += " $end\n"
- return r
-
- def generate_enddefinitions(self):
- r = "$enddefinitions "
- r += " $end\n"
- return r
-
- def generate_dumpvars(self):
- r = "$dumpvars\n"
- for var in self.vars:
- r += "b"
- r += dec2bin(var.val, var.width)
- r += " "
- r += var.vcd_id
- r += "\n"
- r += "$end\n"
- return r
-
- def generate_valuechange(self):
- r = ""
- for i in range(len(self)):
- r += self.change()
- self.cnt += 1
- return r
-
- def __repr__(self):
- r = ""
-
- return r
-
- def write(self, filename):
- f = open(filename, "w")
- f.write(self.generate_date())
- f.write(self.generate_comment())
- f.write(self.generate_timescale())
- f.write(self.generate_scope())
- f.write(self.generate_vars())
- f.write(self.generate_unscope())
- f.write(self.generate_enddefinitions())
- f.write(self.generate_dumpvars())
- f.write(self.generate_valuechange())
- f.close()
-
- def read(self, filename):
- raise NotImplementedError("VCD files can not (yet) be read, please contribute!")
-
-if __name__ == '__main__':
- dump = VCDDump()
- dump.add(Var("foo1", 1, [0, 1, 0, 1, 0, 1]))
- dump.add(Var("foo2", 2, [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0]))
- ramp = [i%128 for i in range(1024)]
- dump.add(Var("ramp", 16, ramp))
- dump.write("dump.vcd")
--- /dev/null
+import socket
+from misoclib.tools.litescope.software.driver.reg import *
+
+from liteeth.test.model.etherbone import *
+
+
+class LiteScopeEtherboneDriver:
+ def __init__(self, ip_address, udp_port=20000, addrmap=None, busword=8, debug=False):
+ self.ip_address = ip_address
+ self.udp_port = udp_port
+ self.debug = debug
+
+ self.tx_sock = None
+ self.rx_sock = None
+ if addrmap is not None:
+ self.regs = build_map(addrmap, busword, self.read, self.write)
+
+ def open(self):
+ self.tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.rx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.rx_sock.bind(("", self.udp_port))
+
+ def close(self):
+ pass
+
+ def read(self, addr, burst_length=1):
+ reads_addrs = [addr+4*j for j in range(burst_length)]
+ reads = EtherboneReads(base_ret_addr=0x1000, addrs=reads_addrs)
+ record = EtherboneRecord()
+ record.writes = None
+ record.reads = reads
+ record.bca = 0
+ record.rca = 0
+ record.rff = 0
+ record.cyc = 0
+ record.wca = 0
+ record.wff = 0
+ record.byte_enable = 0xf
+ record.wcount = 0
+ record.rcount = len(reads_addrs)
+
+ packet = EtherbonePacket()
+ packet.records = [record]
+ packet.encode()
+ self.tx_sock.sendto(bytes(packet), (self.ip_address, self.udp_port))
+
+ datas, addrs = self.rx_sock.recvfrom(8192)
+ packet = EtherbonePacket(datas)
+ packet.decode()
+ datas = packet.records.pop().writes.get_datas()
+ if self.debug:
+ for i, data in enumerate(datas):
+ print("RD {:08X} @ {:08X}".format(data, addr + 4*i))
+ if burst_length == 1:
+ return datas[0]
+ else:
+ return datas
+
+ def write(self, addr, datas):
+ if not isinstance(datas, list):
+ datas = [datas]
+ writes_datas = [d for d in datas]
+ writes = EtherboneWrites(base_addr=addr, datas=writes_datas)
+ record = EtherboneRecord()
+ record.writes = writes
+ record.reads = None
+ record.bca = 0
+ record.rca = 0
+ record.rff = 0
+ record.cyc = 0
+ record.wca = 0
+ record.wff = 0
+ record.byte_enable = 0xf
+ record.wcount = len(writes_datas)
+ record.rcount = 0
+
+ packet = EtherbonePacket()
+ packet.records = [record]
+ packet.encode()
+ self.tx_sock.sendto(bytes(packet), (self.ip_address, self.udp_port))
+
+ if self.debug:
+ for i, data in enumerate(datas):
+ print("WR {:08X} @ {:08X}".format(data, addr + 4*i))
--- /dev/null
+class LiteScopeIODriver():
+ def __init__(self, regs, name):
+ self.regs = regs
+ self.name = name
+ self.build()
+
+ def build(self):
+ for key, value in self.regs.d.items():
+ if self.name in key:
+ key = key.replace(self.name + "_", "")
+ setattr(self, key, value)
+
+ def write(self, value):
+ self.output.write(value)
+
+ def read(self):
+ return self.input.read()
--- /dev/null
+import csv
+from struct import *
+from migen.fhdl.structure import *
+from misoclib.tools.litescope.software.dump import *
+from misoclib.tools.litescope.software.driver.truthtable import *
+
+
+class LiteScopeLADriver():
+ def __init__(self, regs, name, config_csv=None, clk_freq=None, debug=False):
+ self.regs = regs
+ self.name = name
+ if config_csv is None:
+ self.config_csv = name + ".csv"
+ if clk_freq is None:
+ try:
+ self.clk_freq = regs.identifier_frequency.read()
+ except:
+ self.clk_freq = None
+ self.samplerate = self.clk_freq
+ else:
+ self.clk_freq = clk_freq
+ self.samplerate = clk_freq
+ self.debug = debug
+ self.get_config()
+ self.get_layout()
+ self.build()
+ self.data = Dat(self.dw)
+
+ def get_config(self):
+ csv_reader = csv.reader(open(self.config_csv), delimiter=',', quotechar='#')
+ for item in csv_reader:
+ t, n, v = item
+ if t == "config":
+ setattr(self, n, int(v))
+
+ def get_layout(self):
+ self.layout = []
+ csv_reader = csv.reader(open(self.config_csv), delimiter=',', quotechar='#')
+ for item in csv_reader:
+ t, n, v = item
+ if t == "layout":
+ self.layout.append((n, int(v)))
+
+ def build(self):
+ for key, value in self.regs.d.items():
+ if self.name == key[:len(self.name)]:
+ key = key.replace(self.name + "_", "")
+ setattr(self, key, value)
+ value = 1
+ for name, length in self.layout:
+ setattr(self, name + "_o", value)
+ value = value*(2**length)
+ value = 0
+ for name, length in self.layout:
+ setattr(self, name + "_m", (2**length-1) << value)
+ value += length
+
+ def configure_term(self, port, trigger=0, mask=0, cond=None):
+ if cond is not None:
+ for k, v in cond.items():
+ trigger |= getattr(self, k + "_o")*v
+ mask |= getattr(self, k + "_m")
+ t = getattr(self, "trigger_port{d}_trig".format(d=int(port)))
+ m = getattr(self, "trigger_port{d}_mask".format(d=int(port)))
+ t.write(trigger)
+ m.write(mask)
+
+ def configure_range_detector(self, port, low, high):
+ l = getattr(self, "trigger_port{d}_low".format(d=int(port)))
+ h = getattr(self, "trigger_port{d}_high".format(d=int(port)))
+ l.write(low)
+ h.write(high)
+
+ def configure_edge_detector(self, port, rising_mask, falling_mask, both_mask):
+ rm = getattr(self, "trigger_port{d}_rising_mask".format(d=int(port)))
+ fm = getattr(self, "trigger_port{d}_falling_mask".format(d=int(port)))
+ bm = getattr(self, "trigger_port{d}_both_mask".format(d=int(port)))
+ rm.write(rising_mask)
+ fm.write(falling_mask)
+ bm.write(both_mask)
+
+ def configure_sum(self, equation):
+ datas = gen_truth_table(equation)
+ for adr, dat in enumerate(datas):
+ self.trigger_sum_prog_adr.write(adr)
+ self.trigger_sum_prog_dat.write(dat)
+ self.trigger_sum_prog_we.write(1)
+
+ def configure_subsampler(self, n):
+ self.subsampler_value.write(n-1)
+ if self.clk_freq is not None:
+ self.samplerate = self.clk_freq//n
+ else:
+ self.samplerate = None
+
+ def configure_qualifier(self, v):
+ self.recorder_qualifier.write(v)
+
+ def configure_rle(self, v):
+ self.rle_enable.write(v)
+
+ def done(self):
+ return self.recorder_done.read()
+
+ def run(self, offset, length):
+ if self.debug:
+ print("running")
+ self.recorder_offset.write(offset)
+ self.recorder_length.write(length)
+ self.recorder_trigger.write(1)
+
+ def upload(self):
+ if self.debug:
+ print("uploading")
+ while self.recorder_source_stb.read():
+ self.data.append(self.recorder_source_data.read())
+ self.recorder_source_ack.write(1)
+ if self.with_rle:
+ if self.rle_enable.read():
+ self.data = self.data.decode_rle()
+ return self.data
+
+ def save(self, filename):
+ if self.debug:
+ print("saving to " + filename)
+ name, ext = os.path.splitext(filename)
+ if ext == ".vcd":
+ from misoclib.tools.litescope.software.dump.vcd import VCDDump
+ dump = VCDDump()
+ elif ext == ".csv":
+ from misoclib.tools.litescope.software.dump.csv import CSVDump
+ dump = CSVDump()
+ elif ext == ".py":
+ from misoclib.tools.litescope.software.dump.python import PythonDump
+ dump = PythonDump()
+ elif ext == ".sr":
+ from misoclib.tools.litescope.software.dump.sigrok import SigrokDump
+ if self.samplerate is None:
+ raise ValueError("Unable to automatically retrieve clk_freq, clk_freq parameter required")
+ dump = SigrokDump(samplerate=self.samplerate)
+ else:
+ raise NotImplementedError
+ dump.add_from_layout(self.layout, self.data)
+ dump.write(filename)
--- /dev/null
+import string
+import mmap
+from misoclib.tools.litescope.software.driver.reg import *
+
+
+class LiteScopePCIeDriver:
+ def __init__(self, bar, bar_size, addrmap=None, busword=8, debug=False):
+ self.bar = bar
+ self.bar_size = bar_size
+ self.debug = debug
+ self.f = None
+ self.mmap = None
+ self.regs = build_map(addrmap, busword, self.read, self.write)
+
+ def open(self):
+ self.f = open(self.bar, "r+b")
+ self.f.flush()
+ self.mmap = mmap.mmap(self.f.fileno(), self.bar_size)
+
+ def close(self):
+ self.mmap.close()
+ self.f.close()
+
+ def read(self, addr, burst_length=1):
+ datas = []
+ for i in range(burst_length):
+ self.mmap.seek(addr + 4*i)
+ dat = self.mmap.read(4)
+ val = dat[3] << 24
+ val |= dat[2] << 16
+ val |= dat[1] << 8
+ val |= dat[0] << 0
+ if self.debug:
+ print("RD {:08X} @ {:08X}".format(val, addr + 4*i))
+ datas.append(val)
+ if burst_length == 1:
+ return datas[0]
+ else:
+ return datas
+
+ def write(self, addr, data):
+ if isinstance(data, list):
+ burst_length = len(data)
+ else:
+ burst_length = 1
+ data = [data]
+
+ for i, dat in enumerate(data):
+ dat_bytes = [0, 0, 0, 0]
+ dat_bytes[3] = (dat >> 24) & 0xff
+ dat_bytes[2] = (dat >> 16) & 0xff
+ dat_bytes[1] = (dat >> 8) & 0xff
+ dat_bytes[0] = (dat >> 0) & 0xff
+ self.mmap[addr + 4*i:addr + 4*(i+1)] = bytes(dat_bytes)
+ if self.debug:
+ print("WR {:08X} @ {:08X}".format(dat, (addr + i)*4))
--- /dev/null
+import csv
+
+
+class MappedReg:
+ def __init__(self, readfn, writefn, name, addr, length, busword, mode):
+ self.readfn = readfn
+ self.writefn = writefn
+ self.addr = addr
+ self.length = length
+ self.busword = busword
+ self.mode = mode
+
+ def read(self):
+ if self.mode not in ["rw", "ro"]:
+ raise KeyError(name + "register not readable")
+ datas = self.readfn(self.addr, burst_length=self.length)
+ if isinstance(datas, int):
+ return datas
+ else:
+ data = 0
+ for i in range(self.length):
+ data = data << self.busword
+ data |= datas[i]
+ return data
+
+ def write(self, value):
+ if self.mode not in ["rw", "wo"]:
+ raise KeyError(name + "register not writable")
+ datas = []
+ for i in range(self.length):
+ datas.append((value >> ((self.length-1-i)*self.busword)) & (2**self.busword-1))
+ self.writefn(self.addr, datas)
+
+
+class MappedRegs:
+ def __init__(self, d):
+ self.d = d
+
+ def __getattr__(self, attr):
+ try:
+ return self.__dict__['d'][attr]
+ except KeyError:
+ pass
+ raise KeyError("No such register " + attr)
+
+
+def build_map(addrmap, busword, readfn, writefn):
+ csv_reader = csv.reader(open(addrmap), delimiter=',', quotechar='#')
+ d = {}
+ for item in csv_reader:
+ name, addr, length, mode = item
+ addr = int(addr.replace("0x", ""), 16)
+ length = int(length)
+ d[name] = MappedReg(readfn, writefn, name, addr, length, busword, mode)
+ return MappedRegs(d)
--- /dev/null
+import os
+import re
+import sys
+
+
+def is_number(x):
+ try:
+ _ = float(x)
+ except ValueError:
+ return False
+ return True
+
+
+def remove_numbers(seq):
+ return [x for x in seq if not is_number(x)]
+
+
+def remove_duplicates(seq):
+ seen = set()
+ seen_add = seen.add
+ return [x for x in seq if x not in seen and not seen_add(x)]
+
+
+def get_operands(s):
+ operands = re.findall("[A-z0-9_]+", s)
+ operands = remove_duplicates(operands)
+ operands = remove_numbers(operands)
+ return sorted(operands)
+
+
+def gen_truth_table(s):
+ operands = get_operands(s)
+ width = len(operands)
+ stim = []
+ for i in range(width):
+ stim_op = []
+ for j in range(2**width):
+ stim_op.append((int(j/(2**i)))%2)
+ stim.append(stim_op)
+
+ truth_table = []
+ for i in range(2**width):
+ for j in range(width):
+ exec("{} = stim[j][i]".format(operands[j]))
+ truth_table.append(eval(s) != 0)
+ return truth_table
+
+
+def main():
+ print(gen_truth_table("(A&B&C)|D"))
+
+if __name__ == '__main__':
+ main()
--- /dev/null
+import serial
+from struct import *
+from misoclib.tools.litescope.software.driver.reg import *
+
+
+def write_b(uart, data):
+ uart.write(pack('B', data))
+
+
+class LiteScopeUART2WishboneDriver:
+ cmds = {
+ "write": 0x01,
+ "read": 0x02
+ }
+ def __init__(self, port, baudrate=115200, addrmap=None, busword=8, debug=False):
+ self.port = port
+ self.baudrate = str(baudrate)
+ self.debug = debug
+ self.uart = serial.Serial(port, baudrate, timeout=0.25)
+ if addrmap is not None:
+ self.regs = build_map(addrmap, busword, self.read, self.write)
+
+ def open(self):
+ self.uart.flushOutput()
+ self.uart.close()
+ self.uart.open()
+ self.uart.flushInput()
+
+ def close(self):
+ self.uart.flushOutput()
+ self.uart.close()
+
+ def read(self, addr, burst_length=1):
+ datas = []
+ self.uart.flushInput()
+ write_b(self.uart, self.cmds["read"])
+ write_b(self.uart, burst_length)
+ word_addr = addr//4
+ write_b(self.uart, (word_addr >> 24) & 0xff)
+ write_b(self.uart, (word_addr >> 16) & 0xff)
+ write_b(self.uart, (word_addr >> 8) & 0xff)
+ write_b(self.uart, (word_addr >> 0) & 0xff)
+ for i in range(burst_length):
+ data = 0
+ for k in range(4):
+ data = data << 8
+ data |= ord(self.uart.read())
+ if self.debug:
+ print("RD {:08X} @ {:08X}".format(data, addr + 4*i))
+ datas.append(data)
+ if burst_length == 1:
+ return datas[0]
+ else:
+ return datas
+
+ def write(self, addr, data):
+ if isinstance(data, list):
+ burst_length = len(data)
+ else:
+ burst_length = 1
+ data = [data]
+ write_b(self.uart, self.cmds["write"])
+ write_b(self.uart, burst_length)
+ word_addr = addr//4
+ write_b(self.uart, (word_addr >> 24) & 0xff)
+ write_b(self.uart, (word_addr >> 16) & 0xff)
+ write_b(self.uart, (word_addr >> 8) & 0xff)
+ write_b(self.uart, (word_addr >> 0) & 0xff)
+ for i in range(len(data)):
+ dat = data[i]
+ for j in range(4):
+ write_b(self.uart, (dat >> 24) & 0xff)
+ dat = dat << 8
+ if self.debug:
+ print("WR {:08X} @ {:08X}".format(data[i], addr + 4*i))
--- /dev/null
+def dec2bin(d, nb=0):
+ if d == "x":
+ return "x"*nb
+ elif d == 0:
+ b = "0"
+ else:
+ b = ""
+ while d != 0:
+ b = "01"[d&1] + b
+ d = d >> 1
+ return b.zfill(nb)
+
+
+def get_bits(values, low, high=None):
+ r = []
+ if high is None:
+ high = low+1
+ for val in values:
+ t = (val >> low) & (2**(high-low)-1)
+ r.append(t)
+ return r
+
+
+class Dat(list):
+ def __init__(self, width):
+ self.width = width
+
+ def __getitem__(self, key):
+ if isinstance(key, int):
+ return get_bits(self, key)
+ elif isinstance(key, slice):
+ if key.start != None:
+ start = key.start
+ else:
+ start = 0
+ if key.stop != None:
+ stop = key.stop
+ else:
+ stop = self.width
+ if stop > self.width:
+ stop = self.width
+ if key.step != None:
+ raise KeyError
+ return get_bits(self, start, stop)
+ else:
+ raise KeyError
+
+ def decode_rle(self):
+ datas = Dat(self.width-1)
+ last_data = 0
+ for data in self:
+ rle = data >> (self.width-1)
+ data = data & (2**(self.width-1)-1)
+ if rle:
+ for i in range(data):
+ datas.append(last_data)
+ else:
+ datas.append(data)
+ last_data = data
+ return datas
+
+
+class Var:
+ def __init__(self, name, width, values=[], type="wire", default="x"):
+ self.type = type
+ self.width = width
+ self.name = name
+ self.val = default
+ self.values = values
+ self.vcd_id = None
+
+ def set_vcd_id(self, s):
+ self.vcd_id = s
+
+ def __len__(self):
+ return len(self.values)
+
+ def change(self, cnt):
+ r = ""
+ try:
+ if self.values[cnt+1] != self.val:
+ r += "b"
+ r += dec2bin(self.values[cnt+1], self.width)
+ r += " "
+ r += self.vcd_id
+ r += "\n"
+ return r
+ except:
+ return r
+ return r
+
+
+class Dump:
+ def __init__(self):
+ self.vars = []
+ self.vcd_id = "!"
+
+ def add(self, var):
+ var.set_vcd_id(self.vcd_id)
+ self.vcd_id = chr(ord(self.vcd_id)+1)
+ self.vars.append(var)
+
+ def add_from_layout(self, layout, var):
+ i = 0
+ for s, n in layout:
+ self.add(Var(s, n, var[i:i+n]))
+ i += n
+
+ def __len__(self):
+ l = 0
+ for var in self.vars:
+ l = max(len(var), l)
+ return l
--- /dev/null
+from misoclib.tools.litescope.software.dump import *
+
+
+class CSVDump(Dump):
+ def __init__(self, init_dump=None):
+ Dump.__init__(self)
+ if init_dump:
+ self.vars = init_dump.vars
+
+ def generate_vars(self):
+ r = ""
+ for var in self.vars:
+ r += var.name
+ r += ","
+ r += "\n"
+ for var in self.vars:
+ r += str(var.width)
+ r += ","
+ r += "\n"
+ return r
+
+ def generate_dumpvars(self):
+ r = ""
+ for i in range(len(self)):
+ for var in self.vars:
+ try:
+ var.val = var.values[i]
+ except:
+ pass
+ if var.val == "x":
+ r += "x"
+ else:
+ r += dec2bin(var.val, var.width)
+ r += ", "
+ r += "\n"
+ return r
+
+ def write(self, filename):
+ f = open(filename, "w")
+ f.write(self.generate_vars())
+ f.write(self.generate_dumpvars())
+ f.close()
+
+ def read(self, filename):
+ raise NotImplementedError("CSV files can not (yet) be read, please contribute!")
+
+if __name__ == '__main__':
+ dump = CSVDump()
+ dump.add(Var("foo1", 1, [0, 1, 0, 1, 0, 1]))
+ dump.add(Var("foo2", 2, [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0]))
+ ramp = [i%128 for i in range(1024)]
+ dump.add(Var("ramp", 16, ramp))
+ dump.write("dump.csv")
--- /dev/null
+from misoclib.tools.litescope.software.dump import *
+
+
+class PythonDump(Dump):
+ def __init__(self, init_dump=None):
+ Dump.__init__(self)
+ if init_dump:
+ self.vars = init_dump.vars
+
+ def generate_data(self):
+ r = "dump = {\n"
+ for var in self.vars:
+ r += "\"" + var.name + "\""
+ r += " : "
+ r += str(var.values)
+ r += ",\n"
+ r += "}"
+ return r
+
+ def write(self, filename):
+ f = open(filename, "w")
+ f.write(self.generate_data())
+ f.close()
+
+ def read(self, filename):
+ raise NotImplementedError("Python files can not (yet) be read, please contribute!")
+
+if __name__ == '__main__':
+ dump = PythonDump()
+ dump.add(Var("foo1", 1, [0, 1, 0, 1, 0, 1]))
+ dump.add(Var("foo2", 2, [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0]))
+ ramp = [i%128 for i in range(1024)]
+ dump.add(Var("ramp", 16, ramp))
+ dump.write("dump.py")
--- /dev/null
+import os
+import math
+import shutil
+import zipfile
+import re
+from collections import OrderedDict
+
+from misoclib.tools.litescope.software.dump import *
+
+
+class SigrokDump(Dump):
+ def __init__(self, init_dump=None, samplerate=50000000):
+ Dump.__init__(self)
+ if init_dump:
+ self.vars = init_dump.vars
+ self.samplerate = samplerate
+
+ def write_version(self):
+ f = open("version", "w")
+ f.write("1")
+ f.close()
+
+ def write_metadata(self, name):
+ f = open("metadata", "w")
+ r = """
+[global]
+sigrok version = 0.2.0
+[device 1]
+driver = litescope
+capturefile = logic-1
+unitsize = 1
+total probes = {}
+samplerate = {} KHz
+""".format(
+ len(self.vars),
+ self.samplerate//1000,
+ )
+ for i, var in enumerate(self.vars):
+ r += "probe{} = {}\n".format(i+1, var.name)
+ f.write(r)
+ f.close()
+
+ def write_data(self):
+ # XXX are probes limited to 1 bit?
+ data_bits = math.ceil(len(self.vars)/8)*8
+ data_len = 0
+ for var in self.vars:
+ data_len = max(data_len, len(var))
+ datas = []
+ for i in range(data_len):
+ data = 0
+ for j, var in enumerate(reversed(self.vars)):
+ data = data << 1
+ try:
+ data |= var.values[i] % 2
+ except:
+ pass
+ datas.append(data)
+ f = open("logic-1", "wb")
+ for data in datas:
+ f.write(data.to_bytes(data_bits//8, "big"))
+ f.close()
+
+ def zip(self, name):
+ f = zipfile.ZipFile(name + ".sr", "w")
+ os.chdir(name)
+ f.write("version")
+ f.write("metadata")
+ f.write("logic-1")
+ os.chdir("..")
+ f.close()
+
+ def write(self, filename):
+ name, ext = os.path.splitext(filename)
+ if os.path.exists(name):
+ shutil.rmtree(name)
+ os.makedirs(name)
+ os.chdir(name)
+ self.write_version()
+ self.write_metadata(name)
+ self.write_data()
+ os.chdir("..")
+ self.zip(name)
+ shutil.rmtree(name)
+
+ def unzip(self, filename, name):
+ f = open(filename, "rb")
+ z = zipfile.ZipFile(f)
+ if os.path.exists(name):
+ shutil.rmtree(name)
+ os.makedirs(name)
+ for file in z.namelist():
+ z.extract(file, name)
+ f.close()
+
+ def read_metadata(self):
+ probes = OrderedDict()
+ f = open("metadata", "r")
+ for l in f:
+ m = re.search("probe([0-9]+) = (\w+)", l, re.I)
+ if m is not None:
+ index = int(m.group(1))
+ name = m.group(2)
+ probes[name] = index
+ m = re.search("samplerate = ([0-9]+) kHz", l, re.I)
+ if m is not None:
+ self.samplerate = int(m.group(1))*1000
+ m = re.search("samplerate = ([0-9]+) mHz", l, re.I)
+ if m is not None:
+ self.samplerate = int(m.group(1))*1000000
+ f.close()
+ return probes
+
+ def read_data(self, name, nprobes):
+ datas = []
+ f = open("logic-1", "rb")
+ while True:
+ data = f.read(math.ceil(nprobes/8))
+ if data == bytes('', "utf-8"):
+ break
+ data = int.from_bytes(data, "big")
+ datas.append(data)
+ f.close()
+ return datas
+
+ def read(self, filename):
+ self.vars = []
+ name, ext = os.path.splitext(filename)
+ self.unzip(filename, name)
+ os.chdir(name)
+ probes = self.read_metadata()
+ datas = self.read_data(name, len(probes.keys()))
+ os.chdir("..")
+ shutil.rmtree(name)
+
+ for k, v in probes.items():
+ probe_data = []
+ for data in datas:
+ probe_data.append((data >> (v-1)) & 0x1)
+ self.add(Var(k, 1, probe_data))
+
+if __name__ == '__main__':
+ dump = SigrokDump()
+ dump.add(Var("foo1", 1, [0, 1, 0, 1, 0, 1]))
+ dump.add(Var("foo2", 2, [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0]))
+ ramp = [i%128 for i in range(1024)]
+ dump.add(Var("ramp", 16, ramp))
+ dump.write("dump.sr")
+ dump.read("dump.sr")
+ dump.write("dump_copy.sr")
--- /dev/null
+import datetime
+from misoclib.tools.litescope.software.dump import *
+
+
+class VCDDump(Dump):
+ def __init__(self, init_dump=None, timescale="1ps", comment=""):
+ Dump.__init__(self)
+ if init_dump:
+ self.vars = init_dump.vars
+ self.timescale = timescale
+ self.comment = comment
+ self.cnt = -1
+
+ def change(self):
+ r = ""
+ c = ""
+ for var in self.vars:
+ c += var.change(self.cnt)
+ if c != "":
+ r += "#"
+ r += str(self.cnt+1)
+ r += "\n"
+ r += c
+ return r
+
+ def generate_date(self):
+ now = datetime.datetime.now()
+ r = "$date\n"
+ r += "\t"
+ r += now.strftime("%Y-%m-%d %H:%M")
+ r += "\n"
+ r += "$end\n"
+ return r
+
+ def generate_version(self):
+ r = "$version\n"
+ r += "\tmiscope VCD dump\n"
+ r += "$end\n"
+ return r
+
+ def generate_comment(self):
+ r = "$comment\n"
+ r += self.comment
+ r += "\n$end\n"
+ return r
+
+ def generate_timescale(self):
+ r = "$timescale "
+ r += self.timescale
+ r += " $end\n"
+ return r
+
+ def generate_scope(self):
+ r = "$scope "
+ r += self.timescale
+ r += " $end\n"
+ return r
+
+ def generate_vars(self):
+ r = ""
+ for var in self.vars:
+ r += "$var "
+ r += var.type
+ r += " "
+ r += str(var.width)
+ r += " "
+ r += var.vcd_id
+ r += " "
+ r += var.name
+ r += " $end\n"
+ return r
+
+ def generate_unscope(self):
+ r = "$unscope "
+ r += " $end\n"
+ return r
+
+ def generate_enddefinitions(self):
+ r = "$enddefinitions "
+ r += " $end\n"
+ return r
+
+ def generate_dumpvars(self):
+ r = "$dumpvars\n"
+ for var in self.vars:
+ r += "b"
+ r += dec2bin(var.val, var.width)
+ r += " "
+ r += var.vcd_id
+ r += "\n"
+ r += "$end\n"
+ return r
+
+ def generate_valuechange(self):
+ r = ""
+ for i in range(len(self)):
+ r += self.change()
+ self.cnt += 1
+ return r
+
+ def __repr__(self):
+ r = ""
+
+ return r
+
+ def write(self, filename):
+ f = open(filename, "w")
+ f.write(self.generate_date())
+ f.write(self.generate_comment())
+ f.write(self.generate_timescale())
+ f.write(self.generate_scope())
+ f.write(self.generate_vars())
+ f.write(self.generate_unscope())
+ f.write(self.generate_enddefinitions())
+ f.write(self.generate_dumpvars())
+ f.write(self.generate_valuechange())
+ f.close()
+
+ def read(self, filename):
+ raise NotImplementedError("VCD files can not (yet) be read, please contribute!")
+
+if __name__ == '__main__':
+ dump = VCDDump()
+ dump.add(Var("foo1", 1, [0, 1, 0, 1, 0, 1]))
+ dump.add(Var("foo2", 2, [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0]))
+ ramp = [i%128 for i in range(1024)]
+ dump.add(Var("ramp", 16, ramp))
+ dump.write("dump.vcd")