Add top controller instance from nmigen code.
[c4m-jtag.git] / c4m / nmigen / jtag / tap.py
1 #!/bin/env python3
2 import os, textwrap
3
4 from nmigen import *
5 from nmigen.build import *
6 from nmigen.lib.io import *
7 from nmigen.hdl.rec import Direction
8 from nmigen.tracer import get_var_name
9
10 from nmigen_soc.wishbone import Interface as WishboneInterface
11
12 from .bus import Interface
13
14 __all__ = [
15 "TAP", "ShiftReg",
16 ]
17
18
19 class ShiftReg(Record):
20 """Object with interface for extra shift registers on a TAP.
21
22 Parameters
23 ----------
24 sr_length : int
25 cmds : int, default=1
26 The number of corresponding JTAG instructions
27
28 This object is normally only allocated and returned from ``TAP.add_shiftreg``
29 It is a Record subclass.
30
31 Attributes
32 ----------
33 i: length=sr_length, FANIN
34 The input data sampled during capture state of the TAP
35 ie: length=cmds, FANOUT
36 Indicates that data is to be sampled by the JTAG TAP and
37 should be held stable. The bit indicates the corresponding
38 instruction for which data is asked.
39 This signal is kept high for a whole JTAG TAP clock cycle
40 and may thus be kept higher for more than one clock cycle
41 on the domain where ShiftReg is used.
42 The JTAG protocol does not allow insertion of wait states
43 so data need to be provided before ie goes down. The speed
44 of the response will determine the max. frequency for the
45 JTAG interface.
46 o: length=sr_length, FANOUT
47 The value of the shift register.
48 oe: length=cmds, FANOUT
49 Indicates that output needs to be sampled downstream because
50 JTAG TAP in in the Update state. The bit indicated the corresponding
51 instruction. The bit is only kept high for one clock cycle.
52 """
53 def __init__(self, *, sr_length, cmds=1, name=None, src_loc_at=0):
54 layout = [
55 ("i", sr_length, Direction.FANIN),
56 ("ie", cmds, Direction.FANOUT),
57 ("o", sr_length, Direction.FANOUT),
58 ("oe", cmds, Direction.FANOUT),
59 ]
60 super().__init__(layout, name=name, src_loc_at=src_loc_at+1)
61
62
63 class TAP(Elaboratable):
64 #TODO: Document TAP
65 @staticmethod
66 def _add_files(platform, prefix):
67 d = os.path.realpath("{dir}{sep}{par}{sep}{par}{sep}vhdl{sep}jtag".format(
68 dir=os.path.dirname(__file__), sep=os.path.sep, par=os.path.pardir
69 )) + os.path.sep
70 for fname in [
71 "c4m_jtag_pkg.vhdl",
72 "c4m_jtag_idblock.vhdl",
73 "c4m_jtag_iocell.vhdl",
74 "c4m_jtag_ioblock.vhdl",
75 "c4m_jtag_irblock.vhdl",
76 "c4m_jtag_tap_fsm.vhdl",
77 "c4m_jtag_tap_controller.vhdl",
78 ]:
79 f = open(d + fname, "r")
80 platform.add_file(prefix + fname, f)
81 f.close()
82
83 _controller_templ = textwrap.dedent(r"""
84 library ieee;
85 use ieee.std_logic_1164.ALL;
86
87 use work.c4m_jtag.ALL;
88
89 entity {name} is
90 port (
91 -- The TAP signals
92 TCK: in std_logic;
93 TMS: in std_logic;
94 TDI: in std_logic;
95 TDO: out std_logic;
96 TRST_N: in std_logic;
97
98 -- The FSM state indicators
99 RESET: out std_logic;
100 DRCAPTURE: out std_logic;
101 DRSHIFT: out std_logic;
102 DRUPDATE: out std_logic;
103
104 -- The Instruction Register
105 IR: out std_logic_vector({ir_width}-1 downto 0);
106
107 -- The I/O access ports
108 CORE_IN: out std_logic_vector({ios}-1 downto 0);
109 CORE_EN: in std_logic_vector({ios}-1 downto 0);
110 CORE_OUT: in std_logic_vector({ios}-1 downto 0);
111
112 -- The pad connections
113 PAD_IN: in std_logic_vector({ios}-1 downto 0);
114 PAD_EN: out std_logic_vector({ios}-1 downto 0);
115 PAD_OUT: out std_logic_vector({ios}-1 downto 0)
116 );
117 end {name};
118
119 architecture rtl of {name} is
120 begin
121 jtag : c4m_jtag_tap_controller
122 generic map(
123 DEBUG => FALSE,
124 IR_WIDTH => {ir_width},
125 IOS => {ios},
126 MANUFACTURER => "{manufacturer:011b}",
127 PART_NUMBER => "{part:016b}",
128 VERSION => "{version:04b}"
129 )
130 port map(
131 TCK => TCK,
132 TMS => TMS,
133 TDI => TDI,
134 TDO => TDO,
135 TRST_N => TRST_N,
136 RESET => RESET,
137 DRCAPTURE => DRCAPTURE,
138 DRSHIFT => DRSHIFT,
139 DRUPDATE => DRUPDATE,
140 IR => IR,
141 CORE_IN => CORE_IN,
142 CORE_EN => CORE_EN,
143 CORE_OUT => CORE_OUT,
144 PAD_IN => PAD_IN,
145 PAD_EN => PAD_EN,
146 PAD_OUT => PAD_OUT
147 );
148 end architecture rtl;
149 """)
150 _cell_inst = 0
151 @classmethod
152 def _add_instance(cls, platform, prefix, *, ir_width, ios, manufacturer, part, version):
153 name = "jtag_controller_i{}".format(cls._cell_inst)
154 cls._cell_inst += 1
155
156 platform.add_file(
157 "{}{}.vhdl".format(prefix, name),
158 cls._controller_templ.format(
159 name=name, ir_width=ir_width, ios=ios,
160 manufacturer=manufacturer, part=part, version=version,
161 )
162 )
163
164 return name
165
166
167 def __init__(
168 self, io_count, *, with_reset=False, ir_width=None,
169 manufacturer_id=Const(0b10001111111, 11), part_number=Const(1, 16),
170 version=Const(0, 4),
171 name=None, src_loc_at=0
172 ):
173 assert(isinstance(io_count, int) and io_count > 0)
174 assert((ir_width is None) or (isinstance(ir_width, int) and ir_width >= 2))
175 assert(len(version) == 4)
176
177 self.name = name if name is not None else get_var_name(depth=src_loc_at+2, default="TAP")
178 self.bus = Interface(with_reset=with_reset, name=self.name+"_bus",
179 src_loc_at=src_loc_at+1)
180
181 # TODO: Handle IOs with different directions
182 self.core = Array(Pin(1, "io") for _ in range(io_count)) # Signals to use for core
183 self.pad = Array(Pin(1, "io") for _ in range(io_count)) # Signals going to IO pads
184
185 ##
186
187 self._io_count = io_count
188 self._ir_width = ir_width
189 self._manufacturer_id = manufacturer_id
190 self._part_number = part_number
191 self._version = version
192
193 self._ircodes = [0, 1, 2] # Already taken codes, all ones added at the end
194 self._srs = []
195
196 self._wbs = []
197
198
199 def elaborate(self, platform):
200 self.__class__._add_files(platform, "jtag" + os.path.sep)
201
202 m = Module()
203
204 # Determine ir_width if not fixed.
205 ir_max = max(self._ircodes) + 1 # One extra code needed with all ones
206 ir_width = len("{:b}".format(ir_max))
207 if self._ir_width is not None:
208 assert self._ir_width >= ir_width, "Specified JTAG IR width not big enough for allocated shiift registers"
209 ir_width = self._ir_width
210
211 cell = self.__class__._add_instance(
212 platform, "jtag" + os.path.sep, ir_width=ir_width, ios=self._io_count,
213 manufacturer=self._manufacturer_id.value, part=self._part_number.value,
214 version=self._version.value,
215 )
216
217 sigs = Record([
218 ("capture", 1),
219 ("shift", 1),
220 ("update", 1),
221 ("ir", ir_width),
222 ("tdo_jtag", 1),
223 ])
224
225 reset = Signal()
226
227 trst_n = Signal()
228 m.d.comb += trst_n.eq(~self.bus.trst if hasattr(self.bus, "trst") else Const(1))
229
230 core_i = Cat(pin.i for pin in self.core)
231 core_o = Cat(pin.o for pin in self.core)
232 core_oe = Cat(pin.oe for pin in self.core)
233 pad_i = Cat(pin.i for pin in self.pad)
234 pad_o = Cat(pin.o for pin in self.pad)
235 pad_oe = Cat(pin.oe for pin in self.pad)
236
237 m.submodules.tap = Instance(cell,
238 i_TCK=self.bus.tck,
239 i_TMS=self.bus.tms,
240 i_TDI=self.bus.tdi,
241 o_TDO=sigs.tdo_jtag,
242 i_TRST_N=trst_n,
243 o_RESET=reset,
244 o_CAPTURE=sigs.capture,
245 o_SHIFT=sigs.shift,
246 o_UPDATE=sigs.update,
247 o_IR=sigs.ir,
248 o_CORE_IN=core_i,
249 i_CORE_OUT=core_o,
250 i_CORE_EN=core_oe,
251 i_PAD_IN=pad_i,
252 o_PAD_OUT=pad_o,
253 o_PAD_EN=pad_oe,
254 )
255
256 # Own clock domain using TCK as clock signal
257 m.domains.jtag = jtag_cd = ClockDomain(name="jtag", local=True)
258 m.d.comb += [
259 jtag_cd.clk.eq(self.bus.tck),
260 jtag_cd.rst.eq(reset),
261 ]
262
263 self._elaborate_shiftregs(m, sigs)
264 self._elaborate_wishbones(m)
265
266 return m
267
268
269 def add_shiftreg(self, ircode, length, domain="sync", name=None, src_loc_at=0):
270 """Add a shift register to the JTAG interface
271
272 Parameters:
273 - ircode: code(s) for the IR; int or sequence of ints. In the latter case this
274 shiftreg is shared between different IR codes.
275 - length: the length of the shift register
276 - domain: the domain on which the signal will be used"""
277
278 try:
279 ir_it = iter(ircode)
280 ircodes = ircode
281 except TypeError:
282 ir_it = ircodes = (ircode,)
283 for _ircode in ir_it:
284 if not isinstance(_ircode, int) or _ircode <= 0:
285 raise ValueError("IR code '{}' is not an int greater than 0".format(_ircode))
286 if _ircode in self._ircodes:
287 raise ValueError("IR code '{}' already taken".format(_ircode))
288
289 self._ircodes.extend(ircodes)
290
291 if name is None:
292 name = self.name + "_sr{}".format(len(self._srs))
293 sr = ShiftReg(sr_length=length, cmds=len(ircodes), name=name, src_loc_at=src_loc_at+1)
294 self._srs.append((ircodes, domain, sr))
295
296 return sr
297
298 def _elaborate_shiftregs(self, m, sigs):
299 # tdos is tuple of (tdo, tdo_en) for each shiftreg
300 tdos = []
301 for ircodes, domain, sr in self._srs:
302 reg = Signal(len(sr.o), name=sr.name+"_reg")
303 m.d.comb += sr.o.eq(reg)
304
305 isir = Signal(len(ircodes), name=sr.name+"_isir")
306 capture = Signal(name=sr.name+"_capture")
307 shift = Signal(name=sr.name+"_shift")
308 update = Signal(name=sr.name+"_update")
309 m.d.comb += [
310 isir.eq(Cat(sigs.ir == ircode for ircode in ircodes)),
311 capture.eq((isir != 0) & sigs.capture),
312 shift.eq((isir != 0) & sigs.shift),
313 update.eq((isir != 0) & sigs.update),
314 ]
315
316 # update signal is on the JTAG clockdomain, sr.oe is on `domain` clockdomain
317 # latch update in `domain` clockdomain and see when it has falling edge.
318 # At that edge put isir in sr.oe for one `domain` clockdomain
319 update_core = Signal(name=sr.name+"_update_core")
320 update_core_prev = Signal(name=sr.name+"_update_core_prev")
321 m.d[domain] += [
322 update_core.eq(update), # This is CDC from JTAG domain to given domain
323 update_core_prev.eq(update_core)
324 ]
325 with m.If(update_core_prev & ~update_core == 0):
326 # Falling edge of update
327 m.d[domain] += sr.oe.eq(isir)
328 with m.Else():
329 m.d[domain] += sr.oe.eq(0)
330
331 with m.If(shift):
332 m.d.jtag += reg.eq(Cat(reg[1:], self.bus.tdi))
333 with m.If(capture):
334 m.d.jtag += reg.eq(sr.i)
335
336 # tdo = reg[0], tdo_en = shift
337 tdos.append((reg[0], shift))
338
339 for i, (tdo, tdo_en) in enumerate(tdos):
340 if i == 0:
341 with m.If(shift):
342 m.d.comb += self.bus.tdo.eq(tdo)
343 else:
344 with m.Elif(shift):
345 m.d.comb += self.bus.tdo.eq(tdo)
346
347 if len(tdos) > 0:
348 with m.Else():
349 m.d.comb += self.bus.tdo.eq(sigs.tdo_jtag)
350 else:
351 # Always connect tdo_jtag to
352 m.d.comb += self.bus.tdo.eq(sigs.tdo_jtag)
353
354
355 def add_wishbone(self, *, ircodes, address_width, data_width, granularity=None, domain="sync"):
356 """Add a wishbone interface
357
358 In order to allow high JTAG clock speed, data will be cached. This means that if data is
359 output the value of the next address will be read automatically.
360
361 Parameters:
362 -----------
363 ircodes: sequence of three integer for the JTAG IR codes;
364 they represent resp. WBADDR, WBREAD and WBREADWRITE. First code
365 has a shift register of length 'address_width', the two other codes
366 share a shift register of length data_width.
367 address_width: width of the address
368 data_width: width of the data
369
370 Returns:
371 wb: nmigen_soc.wishbone.bus.Interface
372 The Wishbone interface, is pipelined and has stall field.
373 """
374 if len(ircodes) != 3:
375 raise ValueError("3 IR Codes have to be provided")
376
377 sr_addr = self.add_shiftreg(ircodes[0], address_width, domain=domain)
378 sr_data = self.add_shiftreg(ircodes[1:], data_width, domain=domain)
379
380 wb = WishboneInterface(data_width=data_width, addr_width=address_width,
381 granularity=granularity, features={"stall", "lock", "err", "rty"})
382
383 self._wbs.append((sr_addr, sr_data, wb, domain))
384
385 return wb
386
387 def _elaborate_wishbones(self, m):
388 for sr_addr, sr_data, wb, domain in self._wbs:
389 if hasattr(wb, "sel"):
390 # Always selected
391 m.d.comb += [s.eq(1) for s in wb.sel]
392
393 with m.FSM(domain=domain) as fsm:
394 with m.State("IDLE"):
395 with m.If(sr_addr.oe): # WBADDR code
396 m.d[domain] += wb.adr.eq(sr_addr.o)
397 m.next = "READ"
398 with m.Elif(sr_data.oe[0]): # WBREAD code
399 # If data is
400 m.d[domain] += wb.adr.eq(wb.adr + 1)
401 m.next = "READ"
402 with m.Elif(sr_data.oe[1]): # WBWRITE code
403 m.d[domain] += wb.dat_w.eq(sr_data.o)
404 m.next = "WRITEREAD"
405 with m.State("READ"):
406 with m.If(~wb.stall):
407 m.next = "READACK"
408 with m.State("READACK"):
409 with m.If(wb.ack):
410 # Store read data in sr_data.i and keep it there til next read
411 m.d[domain] += sr_data.i.eq(wb.dat_r)
412 m.next = "IDLE"
413 with m.State("WRITEREAD"):
414 with m.If(~wb.stall):
415 m.next = "WRITEREADACK"
416 with m.State("WRITEREADACK"):
417 with m.If(wb.ack):
418 m.d[domain] += wb.adr.eq(wb.adr + 1)
419 m.next = "READ"
420
421 m.d.comb += [
422 wb.cyc.eq(~fsm.ongoing("IDLE")),
423 wb.stb.eq(fsm.ongoing("READ") | fsm.ongoing("WRITEREAD")),
424 wb.we.eq(fsm.ongoing("WRITEREAD")),
425 ]