b2bcf818f8a3a2bc5b9fe62bfa5dafd628b8321b
[c4m-jtag.git] / c4m / nmigen / jtag / tap.py
1 #!/bin/env python3
2 import os, textwrap
3
4 from nmigen import *
5 from nmigen.build import *
6 from nmigen.lib.io import *
7 from nmigen.hdl.rec import Direction
8 from nmigen.tracer import get_var_name
9
10 from nmigen_soc.wishbone import Interface as WishboneInterface
11
12 from .bus import Interface
13
14 __all__ = [
15 "TAP", "ShiftReg",
16 ]
17
18
19 class ShiftReg(Record):
20 """Object with interface for extra shift registers on a TAP.
21
22 Parameters
23 ----------
24 sr_length : int
25 cmds : int, default=1
26 The number of corresponding JTAG instructions
27
28 This object is normally only allocated and returned from ``TAP.add_shiftreg``
29 It is a Record subclass.
30
31 Attributes
32 ----------
33 i: length=sr_length, FANIN
34 The input data sampled during capture state of the TAP
35 ie: length=cmds, FANOUT
36 Indicates that data is to be sampled by the JTAG TAP and
37 should be held stable. The bit indicates the corresponding
38 instruction for which data is asked.
39 This signal is kept high for a whole JTAG TAP clock cycle
40 and may thus be kept higher for more than one clock cycle
41 on the domain where ShiftReg is used.
42 The JTAG protocol does not allow insertion of wait states
43 so data need to be provided before ie goes down. The speed
44 of the response will determine the max. frequency for the
45 JTAG interface.
46 o: length=sr_length, FANOUT
47 The value of the shift register.
48 oe: length=cmds, FANOUT
49 Indicates that output needs to be sampled downstream because
50 JTAG TAP in in the Update state. The bit indicated the corresponding
51 instruction. The bit is only kept high for one clock cycle.
52 """
53 def __init__(self, *, sr_length, cmds=1, name=None, src_loc_at=0):
54 layout = [
55 ("i", sr_length, Direction.FANIN),
56 ("ie", cmds, Direction.FANOUT),
57 ("o", sr_length, Direction.FANOUT),
58 ("oe", cmds, Direction.FANOUT),
59 ]
60 super().__init__(layout, name=name, src_loc_at=src_loc_at+1)
61
62
63 class TAP(Elaboratable):
64 #TODO: Document TAP
65 @staticmethod
66 def _add_files(platform, prefix):
67 d = os.path.realpath("{dir}{sep}{par}{sep}{par}{sep}vhdl{sep}jtag".format(
68 dir=os.path.dirname(__file__), sep=os.path.sep, par=os.path.pardir
69 )) + os.path.sep
70 for fname in [
71 "c4m_jtag_pkg.vhdl",
72 "c4m_jtag_idblock.vhdl",
73 "c4m_jtag_iocell.vhdl",
74 "c4m_jtag_ioblock.vhdl",
75 "c4m_jtag_irblock.vhdl",
76 "c4m_jtag_tap_fsm.vhdl",
77 "c4m_jtag_tap_controller.vhdl",
78 ]:
79 f = open(d + fname, "r")
80 platform.add_file(prefix + fname, f)
81 f.close()
82
83 _controller_templ = textwrap.dedent(r"""
84 library ieee;
85 use ieee.std_logic_1164.ALL;
86
87 use work.c4m_jtag.ALL;
88
89 entity {name} is
90 port (
91 -- The TAP signals
92 TCK: in std_logic;
93 TMS: in std_logic;
94 TDI: in std_logic;
95 TDO: out std_logic;
96 TRST_N: in std_logic;
97
98 -- The FSM state indicators
99 RESET: out std_logic;
100 CAPTURE: out std_logic;
101 SHIFT: out std_logic;
102 UPDATE: out std_logic;
103
104 -- The Instruction Register
105 IR: out std_logic_vector({ir_width}-1 downto 0);
106
107 -- The I/O access ports
108 CORE_IN: out std_logic_vector({ios}-1 downto 0);
109 CORE_EN: in std_logic_vector({ios}-1 downto 0);
110 CORE_OUT: in std_logic_vector({ios}-1 downto 0);
111
112 -- The pad connections
113 PAD_IN: in std_logic_vector({ios}-1 downto 0);
114 PAD_EN: out std_logic_vector({ios}-1 downto 0);
115 PAD_OUT: out std_logic_vector({ios}-1 downto 0)
116 );
117 end {name};
118
119 architecture rtl of {name} is
120 begin
121 jtag : c4m_jtag_tap_controller
122 generic map(
123 DEBUG => FALSE,
124 IR_WIDTH => {ir_width},
125 IOS => {ios},
126 MANUFACTURER => "{manufacturer:011b}",
127 PART_NUMBER => "{part:016b}",
128 VERSION => "{version:04b}"
129 )
130 port map(
131 TCK => TCK,
132 TMS => TMS,
133 TDI => TDI,
134 TDO => TDO,
135 TRST_N => TRST_N,
136 RESET => RESET,
137 CAPTURE => CAPTURE,
138 SHIFT => SHIFT,
139 UPDATE => UPDATE,
140 IR => IR,
141 CORE_IN => CORE_IN,
142 CORE_EN => CORE_EN,
143 CORE_OUT => CORE_OUT,
144 PAD_IN => PAD_IN,
145 PAD_EN => PAD_EN,
146 PAD_OUT => PAD_OUT
147 );
148 end architecture rtl;
149 """)
150 _cell_inst = 0
151 @classmethod
152 def _add_instance(cls, platform, prefix, *, ir_width, ios, manufacturer, part, version):
153 name = "jtag_controller_i{}".format(cls._cell_inst)
154 cls._cell_inst += 1
155
156 platform.add_file(
157 "{}{}.vhdl".format(prefix, name),
158 cls._controller_templ.format(
159 name=name, ir_width=ir_width, ios=ios,
160 manufacturer=manufacturer, part=part, version=version,
161 )
162 )
163
164 return name
165
166
167 def __init__(
168 self, io_count, *, with_reset=False, ir_width=None,
169 manufacturer_id=Const(0b10001111111, 11), part_number=Const(1, 16),
170 version=Const(0, 4),
171 name=None, src_loc_at=0
172 ):
173 assert(isinstance(io_count, int) and io_count > 0)
174 assert((ir_width is None) or (isinstance(ir_width, int) and ir_width >= 2))
175 assert(len(version) == 4)
176
177 if name is None:
178 name = get_var_name(depth=src_loc_at+2, default="TAP")
179 self.name = name
180 self.bus = Interface(with_reset=with_reset, name=self.name+"_bus",
181 src_loc_at=src_loc_at+1)
182
183 # TODO: Handle IOs with different directions
184 self.core = Array(
185 Pin(1, "io", name=name+"_coreio"+str(i), src_loc_at=src_loc_at+1)
186 for i in range(io_count)
187 ) # Signals to use for core
188 self.pad = Array(
189 Pin(1, "io", name=name+"_padio"+str(i), src_loc_at=src_loc_at+1)
190 for i in range(io_count)
191 ) # Signals going to IO pads
192
193 ##
194
195 self._io_count = io_count
196 self._ir_width = ir_width
197 self._manufacturer_id = manufacturer_id
198 self._part_number = part_number
199 self._version = version
200
201 self._ircodes = [0, 1, 2] # Already taken codes, all ones added at the end
202 self._srs = []
203
204 self._wbs = []
205
206
207 def elaborate(self, platform):
208 self.__class__._add_files(platform, "jtag" + os.path.sep)
209
210 m = Module()
211
212 # Determine ir_width if not fixed.
213 ir_max = max(self._ircodes) + 1 # One extra code needed with all ones
214 ir_width = len("{:b}".format(ir_max))
215 if self._ir_width is not None:
216 assert self._ir_width >= ir_width, "Specified JTAG IR width not big enough for allocated shiift registers"
217 ir_width = self._ir_width
218
219 cell = self.__class__._add_instance(
220 platform, "jtag" + os.path.sep, ir_width=ir_width, ios=self._io_count,
221 manufacturer=self._manufacturer_id.value, part=self._part_number.value,
222 version=self._version.value,
223 )
224
225 sigs = Record([
226 ("capture", 1),
227 ("shift", 1),
228 ("update", 1),
229 ("ir", ir_width),
230 ("tdo_jtag", 1),
231 ])
232
233 reset = Signal()
234
235 trst_n = Signal()
236 m.d.comb += trst_n.eq(~self.bus.trst if hasattr(self.bus, "trst") else Const(1))
237
238 core_i = Cat(pin.i for pin in self.core)
239 core_o = Cat(pin.o for pin in self.core)
240 core_oe = Cat(pin.oe for pin in self.core)
241 pad_i = Cat(pin.i for pin in self.pad)
242 pad_o = Cat(pin.o for pin in self.pad)
243 pad_oe = Cat(pin.oe for pin in self.pad)
244
245 m.submodules.tap = Instance(cell,
246 i_TCK=self.bus.tck,
247 i_TMS=self.bus.tms,
248 i_TDI=self.bus.tdi,
249 o_TDO=sigs.tdo_jtag,
250 i_TRST_N=trst_n,
251 o_RESET=reset,
252 o_CAPTURE=sigs.capture,
253 o_SHIFT=sigs.shift,
254 o_UPDATE=sigs.update,
255 o_IR=sigs.ir,
256 o_CORE_IN=core_i,
257 i_CORE_OUT=core_o,
258 i_CORE_EN=core_oe,
259 i_PAD_IN=pad_i,
260 o_PAD_OUT=pad_o,
261 o_PAD_EN=pad_oe,
262 )
263
264 # Own clock domain using TCK as clock signal
265 m.domains.jtag = jtag_cd = ClockDomain(name="jtag", local=True)
266 m.d.comb += [
267 jtag_cd.clk.eq(self.bus.tck),
268 jtag_cd.rst.eq(reset),
269 ]
270
271 self._elaborate_shiftregs(m, sigs)
272 self._elaborate_wishbones(m)
273
274 return m
275
276
277 def add_shiftreg(self, *, ircode, length, domain="sync", name=None, src_loc_at=0):
278 """Add a shift register to the JTAG interface
279
280 Parameters:
281 - ircode: code(s) for the IR; int or sequence of ints. In the latter case this
282 shiftreg is shared between different IR codes.
283 - length: the length of the shift register
284 - domain: the domain on which the signal will be used"""
285
286 try:
287 ir_it = iter(ircode)
288 ircodes = ircode
289 except TypeError:
290 ir_it = ircodes = (ircode,)
291 for _ircode in ir_it:
292 if not isinstance(_ircode, int) or _ircode <= 0:
293 raise ValueError("IR code '{}' is not an int greater than 0".format(_ircode))
294 if _ircode in self._ircodes:
295 raise ValueError("IR code '{}' already taken".format(_ircode))
296
297 self._ircodes.extend(ircodes)
298
299 if name is None:
300 name = self.name + "_sr{}".format(len(self._srs))
301 sr = ShiftReg(sr_length=length, cmds=len(ircodes), name=name, src_loc_at=src_loc_at+1)
302 self._srs.append((ircodes, domain, sr))
303
304 return sr
305
306 def _elaborate_shiftregs(self, m, sigs):
307 # tdos is tuple of (tdo, tdo_en) for each shiftreg
308 tdos = []
309 for ircodes, domain, sr in self._srs:
310 reg = Signal(len(sr.o), name=sr.name+"_reg")
311 m.d.comb += sr.o.eq(reg)
312
313 isir = Signal(len(ircodes), name=sr.name+"_isir")
314 capture = Signal(name=sr.name+"_capture")
315 shift = Signal(name=sr.name+"_shift")
316 update = Signal(name=sr.name+"_update")
317 m.d.comb += [
318 isir.eq(Cat(sigs.ir == ircode for ircode in ircodes)),
319 capture.eq((isir != 0) & sigs.capture),
320 shift.eq((isir != 0) & sigs.shift),
321 update.eq((isir != 0) & sigs.update),
322 ]
323
324 # update signal is on the JTAG clockdomain, sr.oe is on `domain` clockdomain
325 # latch update in `domain` clockdomain and see when it has falling edge.
326 # At that edge put isir in sr.oe for one `domain` clockdomain
327 update_core = Signal(name=sr.name+"_update_core")
328 update_core_prev = Signal(name=sr.name+"_update_core_prev")
329 m.d[domain] += [
330 update_core.eq(update), # This is CDC from JTAG domain to given domain
331 update_core_prev.eq(update_core)
332 ]
333 with m.If(update_core_prev & ~update_core == 0):
334 # Falling edge of update
335 m.d[domain] += sr.oe.eq(isir)
336 with m.Else():
337 m.d[domain] += sr.oe.eq(0)
338
339 with m.If(shift):
340 m.d.jtag += reg.eq(Cat(reg[1:], self.bus.tdi))
341 with m.If(capture):
342 m.d.jtag += reg.eq(sr.i)
343
344 # tdo = reg[0], tdo_en = shift
345 tdos.append((reg[0], shift))
346
347 for i, (tdo, tdo_en) in enumerate(tdos):
348 if i == 0:
349 with m.If(shift):
350 m.d.comb += self.bus.tdo.eq(tdo)
351 else:
352 with m.Elif(shift):
353 m.d.comb += self.bus.tdo.eq(tdo)
354
355 if len(tdos) > 0:
356 with m.Else():
357 m.d.comb += self.bus.tdo.eq(sigs.tdo_jtag)
358 else:
359 # Always connect tdo_jtag to
360 m.d.comb += self.bus.tdo.eq(sigs.tdo_jtag)
361
362
363 def add_wishbone(self, *, ircodes, address_width, data_width, granularity=None, domain="sync"):
364 """Add a wishbone interface
365
366 In order to allow high JTAG clock speed, data will be cached. This means that if data is
367 output the value of the next address will be read automatically.
368
369 Parameters:
370 -----------
371 ircodes: sequence of three integer for the JTAG IR codes;
372 they represent resp. WBADDR, WBREAD and WBREADWRITE. First code
373 has a shift register of length 'address_width', the two other codes
374 share a shift register of length data_width.
375 address_width: width of the address
376 data_width: width of the data
377
378 Returns:
379 wb: nmigen_soc.wishbone.bus.Interface
380 The Wishbone interface, is pipelined and has stall field.
381 """
382 if len(ircodes) != 3:
383 raise ValueError("3 IR Codes have to be provided")
384
385 sr_addr = self.add_shiftreg(ircodes[0], address_width, domain=domain)
386 sr_data = self.add_shiftreg(ircodes[1:], data_width, domain=domain)
387
388 wb = WishboneInterface(data_width=data_width, addr_width=address_width,
389 granularity=granularity, features={"stall", "lock", "err", "rty"})
390
391 self._wbs.append((sr_addr, sr_data, wb, domain))
392
393 return wb
394
395 def _elaborate_wishbones(self, m):
396 for sr_addr, sr_data, wb, domain in self._wbs:
397 if hasattr(wb, "sel"):
398 # Always selected
399 m.d.comb += [s.eq(1) for s in wb.sel]
400
401 with m.FSM(domain=domain) as fsm:
402 with m.State("IDLE"):
403 with m.If(sr_addr.oe): # WBADDR code
404 m.d[domain] += wb.adr.eq(sr_addr.o)
405 m.next = "READ"
406 with m.Elif(sr_data.oe[0]): # WBREAD code
407 # If data is
408 m.d[domain] += wb.adr.eq(wb.adr + 1)
409 m.next = "READ"
410 with m.Elif(sr_data.oe[1]): # WBWRITE code
411 m.d[domain] += wb.dat_w.eq(sr_data.o)
412 m.next = "WRITEREAD"
413 with m.State("READ"):
414 with m.If(~wb.stall):
415 m.next = "READACK"
416 with m.State("READACK"):
417 with m.If(wb.ack):
418 # Store read data in sr_data.i and keep it there til next read
419 m.d[domain] += sr_data.i.eq(wb.dat_r)
420 m.next = "IDLE"
421 with m.State("WRITEREAD"):
422 with m.If(~wb.stall):
423 m.next = "WRITEREADACK"
424 with m.State("WRITEREADACK"):
425 with m.If(wb.ack):
426 m.d[domain] += wb.adr.eq(wb.adr + 1)
427 m.next = "READ"
428
429 m.d.comb += [
430 wb.cyc.eq(~fsm.ongoing("IDLE")),
431 wb.stb.eq(fsm.ongoing("READ") | fsm.ongoing("WRITEREAD")),
432 wb.we.eq(fsm.ongoing("WRITEREAD")),
433 ]