3698c1f20f355dd5b0592788e379b170fac7bf59
[c4m-jtag.git] / c4m / nmigen / jtag / tap.py
1 #!/bin/env python3
2 import os
3
4 from nmigen import *
5 from nmigen.build import *
6 from nmigen.lib.io import *
7 from nmigen.hdl.rec import Direction
8 from nmigen.tracer import get_var_name
9
10 from c4m_repo.nmigen.lib import Wishbone
11
12 from .bus import Interface
13
14 __all__ = [
15 "TAP", "ShiftReg",
16 ]
17
18
19 class ShiftReg(Record):
20 """Object with interface for extra shift registers on a TAP.
21
22 Parameters
23 ----------
24 sr_length : int
25 cmds : int, default=1
26 The number of corresponding JTAG instructions
27
28 This object is normally only allocated and returned from ``TAP.add_shiftreg``
29 It is a Record subclass.
30
31 Attributes
32 ----------
33 i: length=sr_length, FANIN
34 The input data sampled during capture state of the TAP
35 ie: length=cmds, FANOUT
36 Indicates that data is to be sampled by the JTAG TAP and
37 should be held stable. The bit indicates the corresponding
38 instruction for which data is asked.
39 This signal is kept high for a whole JTAG TAP clock cycle
40 and may thus be kept higher for more than one clock cycle
41 on the domain where ShiftReg is used.
42 The JTAG protocol does not allow insertion of wait states
43 so data need to be provided before ie goes down. The speed
44 of the response will determine the max. frequency for the
45 JTAG interface.
46 o: length=sr_length, FANOUT
47 The value of the shift register.
48 oe: length=cmds, FANOUT
49 Indicates that output needs to be sampled downstream because
50 JTAG TAP in in the Update state. The bit indicated the corresponding
51 instruction. The bit is only kept high for one clock cycle.
52 """
53 def __init__(self, *, sr_length, cmds=1, name=None, src_loc_at=0):
54 layout = [
55 ("i", sr_length, Direction.FANIN),
56 ("ie", cmds, Direction.FANOUT),
57 ("o", sr_length, Direction.FANOUT),
58 ("oe", cmds, Direction.FANOUT),
59 ]
60 super().__init__(layout, name=name, src_loc_at=src_loc_at+1)
61
62
63 class TAP(Elaboratable):
64 #TODO: Document TAP
65 @staticmethod
66 def _add_files(platform, prefix):
67 d = os.path.realpath("{dir}{sep}{par}{sep}{par}{sep}vhdl{sep}jtag".format(
68 dir=os.path.dirname(__file__), sep=os.path.sep, par=os.path.pardir
69 )) + os.path.sep
70 for fname in [
71 "c4m_jtag_pkg.vhdl",
72 "c4m_jtag_idblock.vhdl",
73 "c4m_jtag_iocell.vhdl",
74 "c4m_jtag_ioblock.vhdl",
75 "c4m_jtag_irblock.vhdl",
76 "c4m_jtag_tap_fsm.vhdl",
77 "c4m_jtag_tap_controller.vhdl",
78 ]:
79 f = open(d + fname, "r")
80 platform.add_file(prefix + fname, f)
81 f.close()
82
83
84 def __init__(
85 self, io_count, *, with_reset=False, ir_width=None,
86 manufacturer_id=Const(0b10001111111, 11), part_number=Const(1, 16),
87 version=Const(0, 4),
88 name=None, src_loc_at=0
89 ):
90 assert(isinstance(io_count, int) and io_count > 0)
91 assert((ir_width is None) or (isinstance(ir_width, int) and ir_width >= 2))
92 assert(len(version) == 4)
93
94 self.name = name if name is not None else get_var_name(depth=src_loc_at+2, default="TAP")
95 self.bus = Interface(with_reset=with_reset, name=self.name+"_bus",
96 src_loc_at=src_loc_at+1)
97
98 # TODO: Handle IOs with different directions
99 self.core = Array(Pin(1, "io") for _ in range(io_count)) # Signals to use for core
100 self.pad = Array(Pin(1, "io") for _ in range(io_count)) # Signals going to IO pads
101
102 ##
103
104 self._io_count = io_count
105 self._ir_width = ir_width
106 self._manufacturer_id = manufacturer_id
107 self._part_number = part_number
108 self._version = version
109
110 self._ircodes = [0, 1, 2] # Already taken codes, all ones added at the end
111 self._srs = []
112
113 self._wbs = []
114
115
116 def elaborate(self, platform):
117 TAP._add_files(platform, "jtag" + os.path.sep)
118
119 m = Module()
120
121 # Determine ir_width if not fixed.
122 ir_max = max(self._ircodes) + 1 # One extra code needed with all ones
123 ir_width = len("{:b}".format(ir_max))
124 if self._ir_width is not None:
125 assert self._ir_width >= ir_width, "Specified JTAG IR width not big enough for allocated shiift registers"
126 ir_width = self._ir_width
127
128 sigs = Record([
129 ("capture", 1),
130 ("shift", 1),
131 ("update", 1),
132 ("ir", ir_width),
133 ("tdo_jtag", 1),
134 ])
135
136 reset = Signal()
137
138 trst_n = Signal()
139 m.d.comb += trst_n.eq(~self.bus.trst if hasattr(self.bus, "trst") else Const(1))
140
141 core_i = Cat(pin.i for pin in self.core)
142 core_o = Cat(pin.o for pin in self.core)
143 core_oe = Cat(pin.oe for pin in self.core)
144 pad_i = Cat(pin.i for pin in self.pad)
145 pad_o = Cat(pin.o for pin in self.pad)
146 pad_oe = Cat(pin.oe for pin in self.pad)
147
148 params = {
149 "p_IOS": self._io_count,
150 "p_IR_WIDTH": ir_width,
151 "p_MANUFACTURER": self._manufacturer_id,
152 "p_PART_NUMBER": self._part_number,
153 "p_VERSION": self._version,
154 "i_TCK": self.bus.tck,
155 "i_TMS": self.bus.tms,
156 "i_TDI": self.bus.tdi,
157 "o_TDO": sigs.tdo_jtag,
158 "i_TRST_N": trst_n,
159 "o_RESET": reset,
160 "o_DRCAPTURE": sigs.capture,
161 "o_DRSHIFT": sigs.shift,
162 "o_DRUPDATE": sigs.update,
163 "o_IR": sigs.ir,
164 "o_CORE_IN": core_i,
165 "i_CORE_OUT": core_o,
166 "i_CORE_EN": core_oe,
167 "i_PAD_IN": pad_i,
168 "o_PAD_OUT": pad_o,
169 "o_PAD_EN": pad_oe,
170 }
171 m.submodules.tap = Instance("c4m_jtag_tap_controller", **params)
172
173 # Own clock domain using TCK as clock signal
174 m.domains.jtag = jtag_cd = ClockDomain(name="jtag", local=True)
175 m.d.comb += [
176 jtag_cd.clk.eq(self.bus.tck),
177 jtag_cd.rst.eq(reset),
178 ]
179
180 self._elaborate_shiftregs(m, sigs)
181 self._elaborate_wishbones(m)
182
183 return m
184
185
186 def add_shiftreg(self, ircode, length, domain="sync", name=None, src_loc_at=0):
187 """Add a shift register to the JTAG interface
188
189 Parameters:
190 - ircode: code(s) for the IR; int or sequence of ints. In the latter case this
191 shiftreg is shared between different IR codes.
192 - length: the length of the shift register
193 - domain: the domain on which the signal will be used"""
194
195 try:
196 ir_it = iter(ircode)
197 ircodes = ircode
198 except TypeError:
199 ir_it = ircodes = (ircode,)
200 for _ircode in ir_it:
201 if not isinstance(_ircode, int) or _ircode <= 0:
202 raise ValueError("IR code '{}' is not an int greater than 0".format(_ircode))
203 if _ircode in self._ircodes:
204 raise ValueError("IR code '{}' already taken".format(_ircode))
205
206 self._ircodes.extend(ircodes)
207
208 if name is None:
209 name = self.name + "_sr{}".format(len(self._srs))
210 sr = ShiftReg(sr_length=length, cmds=len(ircodes), name=name, src_loc_at=src_loc_at+1)
211 self._srs.append((ircodes, domain, sr))
212
213 return sr
214
215 def _elaborate_shiftregs(self, m, sigs):
216 # tdos is tuple of (tdo, tdo_en) for each shiftreg
217 tdos = []
218 for ircodes, domain, sr in self._srs:
219 reg = Signal(len(sr.o), name=sr.name+"_reg")
220 m.d.comb += sr.o.eq(reg)
221
222 isir = Signal(len(ircodes), name=sr.name+"_isir")
223 capture = Signal(name=sr.name+"_capture")
224 shift = Signal(name=sr.name+"_shift")
225 update = Signal(name=sr.name+"_update")
226 m.d.comb += [
227 isir.eq(Cat(sigs.ir == ircode for ircode in ircodes)),
228 capture.eq((isir != 0) & sigs.capture),
229 shift.eq((isir != 0) & sigs.shift),
230 update.eq((isir != 0) & sigs.update),
231 ]
232
233 # update signal is on the JTAG clockdomain, sr.oe is on `domain` clockdomain
234 # latch update in `domain` clockdomain and see when it has falling edge.
235 # At that edge put isir in sr.oe for one `domain` clockdomain
236 update_core = Signal(name=sr.name+"_update_core")
237 update_core_prev = Signal(name=sr.name+"_update_core_prev")
238 m.d[domain] += [
239 update_core.eq(update), # This is CDC from JTAG domain to given domain
240 update_core_prev.eq(update_core)
241 ]
242 with m.If(update_core_prev & ~update_core == 0):
243 # Falling edge of update
244 m.d[domain] += sr.oe.eq(isir)
245 with m.Else():
246 m.d[domain] += sr.oe.eq(0)
247
248 with m.If(shift):
249 m.d.jtag += reg.eq(Cat(reg[1:], self.bus.tdi))
250 with m.If(capture):
251 m.d.jtag += reg.eq(sr.i)
252
253 # tdo = reg[0], tdo_en = shift
254 tdos.append((reg[0], shift))
255
256 for i, (tdo, tdo_en) in enumerate(tdos):
257 if i == 0:
258 with m.If(shift):
259 m.d.comb += self.bus.tdo.eq(tdo)
260 else:
261 with m.Elif(shift):
262 m.d.comb += self.bus.tdo.eq(tdo)
263
264 if len(tdos) > 0:
265 with m.Else():
266 m.d.comb += self.bus.tdo.eq(sigs.tdo_jtag)
267 else:
268 # Always connect tdo_jtag to
269 m.d.comb += self.bus.tdo.eq(sigs.tdo_jtag)
270
271
272 def add_wishbone(self, *, ircodes, address_width, data_width, sel_width=None, domain="sync"):
273 """Add a wishbone interface
274
275 In order to allow high JTAG clock speed, data will be cached. This means that if data is
276 output the value of the next address will be read automatically.
277
278 Parameters:
279 ircodes: sequence of three integer for the JTAG IR codes;
280 they represent resp. WBADDR, WBREAD and WBREADWRITE. First code
281 has a shift register of length 'address_width', the two other codes
282 share a shift register of length data_width.
283 address_width: width of the address
284 data_width: width of the data
285 """
286 if len(ircodes) != 3:
287 raise ValueError("3 IR Codes have to be provided")
288
289 sr_addr = self.add_shiftreg(ircodes[0], address_width, domain=domain)
290 sr_data = self.add_shiftreg(ircodes[1:], data_width, domain=domain)
291
292 wb = Wishbone(data_width=data_width, address_width=address_width, sel_width=sel_width, master=True)
293
294 self._wbs.append((sr_addr, sr_data, wb, domain))
295
296 return wb
297
298 def _elaborate_wishbones(self, m):
299 for sr_addr, sr_data, wb, domain in self._wbs:
300 if hasattr(wb, "sel"):
301 # Always selected
302 m.d.comb += [s.eq(1) for s in wb.sel]
303
304 with m.FSM(domain=domain) as fsm:
305 with m.State("IDLE"):
306 m.d.comb += [
307 wb.cyc.eq(0),
308 wb.stb.eq(0),
309 wb.we.eq(0),
310 ]
311 with m.If(sr_addr.oe): # WBADDR code
312 m.d[domain] += wb.addr.eq(sr_addr.o)
313 m.next = "READ"
314 with m.Elif(sr_data.oe[0]): # WBREAD code
315 # If data is
316 m.d[domain] += wb.addr.eq(wb.addr + 1)
317 m.next = "READ"
318
319 with m.If(sr_data.oe[1]): # WBWRITE code
320 m.d[domain] += wb.dat_w.eq(sr_data.o)
321 m.next = "WRITEREAD"
322 with m.State("READ"):
323 m.d.comb += [
324 wb.cyc.eq(1),
325 wb.stb.eq(1),
326 wb.we.eq(0),
327 ]
328 with m.If(~wb.stall):
329 m.next = "READACK"
330 with m.State("READACK"):
331 m.d.comb += [
332 wb.cyc.eq(1),
333 wb.stb.eq(0),
334 wb.we.eq(0),
335 ]
336 with m.If(wb.ack):
337 # Store read data in sr_data.i and keep it there til next read
338 m.d[domain] += sr_data.i.eq(wb.dat_r)
339 m.next = "IDLE"
340 with m.State("WRITEREAD"):
341 m.d.comb += [
342 wb.cyc.eq(1),
343 wb.stb.eq(1),
344 wb.we.eq(1),
345 ]
346 with m.If(~wb.stall):
347 m.next = "WRITEREADACK"
348 with m.State("WRITEREADACK"):
349 m.d.comb += [
350 wb.cyc.eq(1),
351 wb.stb.eq(0),
352 wb.we.eq(0),
353 ]
354 with m.If(wb.ack):
355 m.d[domain] += wb.addr.eq(wb.addr + 1)
356 m.next = "READ"