add LiteScopeLA example
[litex.git] / litescope / host / driver.py
1 import csv
2 import time
3 import sys
4 import string
5 import serial
6 from struct import *
7 from migen.fhdl.structure import *
8 from litescope.host.reg import *
9 from litescope.host.dump import *
10 from litescope.host.truthtable import *
11
12 def write_b(uart, data):
13 uart.write(pack('B',data))
14
15 class LiteScopeUART2WBDriver:
16 WRITE_CMD = 0x01
17 READ_CMD = 0x02
18 def __init__(self, port, baudrate=115200, addrmap=None, busword=8, debug=False):
19 self.port = port
20 self.baudrate = str(baudrate)
21 self.debug = debug
22 self.uart = serial.Serial(port, baudrate, timeout=0.25)
23 self.regs = build_map(addrmap, busword, self.read, self.write)
24
25 def open(self):
26 self.uart.flushOutput()
27 self.uart.close()
28 self.uart.open()
29 self.uart.flushInput()
30 try:
31 self.regs.uart2wb_sel.write(1)
32 except:
33 pass
34
35 def close(self):
36 try:
37 self.regs.uart2wb_sel.write(0)
38 except:
39 pass
40 self.uart.flushOutput()
41 self.uart.close()
42
43 def read(self, addr, burst_length=1):
44 self.uart.flushInput()
45 write_b(self.uart, self.READ_CMD)
46 write_b(self.uart, burst_length)
47 addr = addr//4
48 write_b(self.uart, (addr & 0xff000000) >> 24)
49 write_b(self.uart, (addr & 0x00ff0000) >> 16)
50 write_b(self.uart, (addr & 0x0000ff00) >> 8)
51 write_b(self.uart, (addr & 0x000000ff))
52 values = []
53 for i in range(burst_length):
54 val = 0
55 for j in range(4):
56 val = val << 8
57 val |= ord(self.uart.read())
58 if self.debug:
59 print("RD %08X @ %08X" %(val, (addr+i)*4))
60 values.append(val)
61 if burst_length == 1:
62 return values[0]
63 else:
64 return values
65
66 def write(self, addr, data):
67 if isinstance(data, list):
68 burst_length = len(data)
69 else:
70 burst_length = 1
71 write_b(self.uart, self.WRITE_CMD)
72 write_b(self.uart, burst_length)
73 addr = addr//4
74 write_b(self.uart, (addr & 0xff000000) >> 24)
75 write_b(self.uart, (addr & 0x00ff0000) >> 16)
76 write_b(self.uart, (addr & 0x0000ff00) >> 8)
77 write_b(self.uart, (addr & 0x000000ff))
78 if isinstance(data, list):
79 for i in range(len(data)):
80 dat = data[i]
81 for j in range(4):
82 write_b(self.uart, (dat & 0xff000000) >> 24)
83 dat = dat << 8
84 if self.debug:
85 print("WR %08X @ %08X" %(data[i], (addr + i)*4))
86 else:
87 dat = data
88 for j in range(4):
89 write_b(self.uart, (dat & 0xff000000) >> 24)
90 dat = dat << 8
91 if self.debug:
92 print("WR %08X @ %08X" %(data, (addr * 4)))
93
94 class LiteScopeIODriver():
95 def __init__(self, regs, name):
96 self.regs = regs
97 self.name = name
98 self.build()
99
100 def build(self):
101 for key, value in self.regs.d.items():
102 if self.name in key:
103 key = key.replace(self.name +"_", "")
104 setattr(self, key, value)
105
106 def write(self, value):
107 self.o.write(value)
108
109 def read(self):
110 return self.i.read()
111
112 class LiteScopeLADriver():
113 def __init__(self, regs, name, config_csv=None, use_rle=False):
114 self.regs = regs
115 self.name = name
116 self.use_rle = use_rle
117 if config_csv is None:
118 self.config_csv = name + ".csv"
119 self.get_config()
120 self.get_layout()
121 self.build()
122 self.dat = Dat(self.width)
123
124 def get_config(self):
125 csv_reader = csv.reader(open(self.config_csv), delimiter=',', quotechar='#')
126 for item in csv_reader:
127 t, n, v = item
128 if t == "config":
129 setattr(self, n, int(v))
130
131 def get_layout(self):
132 self.layout = []
133 csv_reader = csv.reader(open(self.config_csv), delimiter=',', quotechar='#')
134 for item in csv_reader:
135 t, n, v = item
136 if t == "layout":
137 self.layout.append((n, int(v)))
138
139 def build(self):
140 for key, value in self.regs.d.items():
141 if self.name == key[:len(self.name)]:
142 key = key.replace(self.name + "_", "")
143 setattr(self, key, value)
144 value = 1
145 for name, length in self.layout:
146 setattr(self, name + "_o", value)
147 value = value*(2**length)
148 value = 0
149 for name, length in self.layout:
150 setattr(self, name + "_m", (2**length-1) << value)
151 value += length
152
153 def show_state(self, s):
154 print(s, end="|")
155 sys.stdout.flush()
156
157 def prog_term(self, port, trigger=0, mask=0, cond=None):
158 if cond is not None:
159 for k, v in cond.items():
160 trigger |= getattr(self, k + "_o")*v
161 mask |= getattr(self, k + "_m")
162 t = getattr(self, "trigger_port{d}_trig".format(d=int(port)))
163 m = getattr(self, "trigger_port{d}_mask".format(d=int(port)))
164 t.write(trigger)
165 m.write(mask)
166
167 def prog_range_detector(self, port, low, high):
168 l = getattr(self, "trigger_port{d}_low".format(d=int(port)))
169 h = getattr(self, "trigger_port{d}_high".format(d=int(port)))
170 l.write(low)
171 h.write(high)
172
173 def prog_edge_detector(self, port, rising_mask, falling_mask, both_mask):
174 rm = getattr(self, "trigger_port{d}_rising_mask".format(d=int(port)))
175 fm = getattr(self, "trigger_port{d}_falling_mask".format(d=int(port)))
176 bm = getattr(self, "trigger_port{d}_both_mask".format(d=int(port)))
177 rm.write(rising_mask)
178 fm.write(falling_mask)
179 bm.write(both_mask)
180
181 def prog_sum(self, equation):
182 datas = gen_truth_table(equation)
183 for adr, dat in enumerate(datas):
184 self.trigger_sum_prog_adr.write(adr)
185 self.trigger_sum_prog_dat.write(dat)
186 self.trigger_sum_prog_we.write(1)
187
188 def config_rle(self, v):
189 self.rle_enable.write(v)
190
191 def is_done(self):
192 return self.recorder_done.read()
193
194 def wait_done(self):
195 self.show_state("WAIT HIT")
196 while(not self.is_done()):
197 time.sleep(0.1)
198
199 def trigger(self, offset, length):
200 self.show_state("TRIG")
201 if self.with_rle:
202 self.config_rle(self.use_rle)
203 self.recorder_offset.write(offset)
204 self.recorder_length.write(length)
205 self.recorder_trigger.write(1)
206
207 def read(self):
208 self.show_state("READ")
209 empty = self.recorder_read_empty.read()
210 while(not empty):
211 self.dat.append(self.recorder_read_dat.read())
212 empty = self.recorder_read_empty.read()
213 self.recorder_read_en.write(1)
214 if self.with_rle:
215 if self.use_rle:
216 self.dat = self.dat.decode_rle()
217 return self.dat
218
219 def export(self, export_fn=None):
220 self.show_state("EXPORT")
221 dump = Dump()
222 dump.add_from_layout(self.layout, self.dat)
223 if ".vcd" in export_fn:
224 VCDExport(dump).write(export_fn)
225 elif ".csv" in export_fn:
226 CSVExport(dump).write(export_fn)
227 elif ".py" in export_fn:
228 PYExport(dump).write(export_fn)
229 else:
230 raise NotImplementedError