461c9d91faaf1bbee17f4506ad7b7d7f1b186260
[c4m-jtag.git] / c4m / nmigen / jtag / tap.py
1 #!/bin/env python3
2 import os
3
4 from nmigen import *
5 from nmigen.build import *
6 from nmigen.lib.io import *
7 from nmigen.hdl.rec import Direction
8 from nmigen.tracer import get_var_name
9
10 from c4m_repo.nmigen.lib import Wishbone
11
12 from .bus import Interface
13
14 __all__ = [
15 "TAP", "ShiftReg",
16 ]
17
18
19 class ShiftReg(Record):
20 """Object with interface for extra shift registers on a TAP.
21
22 Parameters
23 ----------
24 sr_length : int
25 cmds : int, default=1
26 The number of corresponding JTAG instructions
27
28 This object is normally only allocated and returned from ``TAP.add_shiftreg``
29 It is a Record subclass.
30
31 Attributes
32 ----------
33 i: length=sr_length, FANIN
34 The input data sampled during capture state of the TAP
35 ie: length=cmds, FANOUT
36 Indicates that data is to be sampled by the JTAG TAP and
37 should be held stable. The bit indicates the corresponding
38 instruction for which data is asked.
39 This signal is kept high for a whole JTAG TAP clock cycle
40 and may thus be kept higher for more than one clock cycle
41 on the domain where ShiftReg is used.
42 The JTAG protocol does not allow insertion of wait states
43 so data need to be provided before ie goes down. The speed
44 of the response will determine the max. frequency for the
45 JTAG interface.
46 o: length=sr_length, FANOUT
47 The value of the shift register.
48 oe: length=cmds, FANOUT
49 Indicates that output needs to be sampled downstream because
50 JTAG TAP in in the Update state. The bit indicated the corresponding
51 instruction. The bit is only kept high for one clock cycle.
52 """
53 def __init__(self, *, sr_length, cmds=1, name=None, src_loc_at=0):
54 layout = [
55 ("i", sr_length, Direction.FANIN),
56 ("ie", cmds, Direction.FANOUT),
57 ("o", sr_length, Direction.FANOUT),
58 ("oe", cmds, Direction.FANOUT),
59 ]
60 super().__init__(layout, name=name, src_loc_at=src_loc_at+1)
61
62
63 class TAP(Elaboratable):
64 #TODO: Document TAP
65 @staticmethod
66 def _add_files(platform, prefix):
67 d = os.path.realpath("{dir}{sep}{par}{sep}{par}{sep}vhdl{sep}jtag".format(
68 dir=os.path.dirname(__file__), sep=os.path.sep, par=os.path.pardir
69 )) + os.path.sep
70 for fname in [
71 "c4m_jtag_pkg.vhdl",
72 "c4m_jtag_idblock.vhdl",
73 "c4m_jtag_iocell.vhdl",
74 "c4m_jtag_ioblock.vhdl",
75 "c4m_jtag_irblock.vhdl",
76 "c4m_jtag_tap_fsm.vhdl",
77 "c4m_jtag_tap_controller.vhdl",
78 ]:
79 f = open(d + fname, "r")
80 platform.add_file(prefix + fname, f)
81 f.close()
82
83
84 def __init__(
85 self, io_count, *, ir_width=None,
86 manufacturer_id=Const(0b10001111111, 11), part_number=Const(1, 16),
87 version=Const(0, 4),
88 name=None, src_loc_at=0
89 ):
90 assert(isinstance(io_count, int) and io_count > 0)
91 assert((ir_width is None) or (isinstance(ir_width, int) and ir_width >= 2))
92 assert(len(version) == 4)
93
94 self.name = name if name is not None else get_var_name(depth=src_loc_at+2, default="TAP")
95 self.bus = Interface(name=self.name+"_bus", src_loc_at=src_loc_at+1)
96
97 # TODO: Handle IOs with different directions
98 self.core = Array(Pin(1, "io") for _ in range(io_count)) # Signals to use for core
99 self.pad = Array(Pin(1, "io") for _ in range(io_count)) # Signals going to IO pads
100
101 ##
102
103 self._io_count = io_count
104 self._ir_width = ir_width
105 self._manufacturer_id = manufacturer_id
106 self._part_number = part_number
107 self._version = version
108
109 self._ircodes = [0, 1, 2] # Already taken codes, all ones added at the end
110 self._srs = []
111
112 self._wbs = []
113
114
115 def elaborate(self, platform):
116 TAP._add_files(platform, "jtag" + os.path.sep)
117
118 m = Module()
119
120 # Determine ir_width if not fixed.
121 ir_max = max(self._ircodes) + 1 # One extra code needed with all ones
122 ir_width = len("{:b}".format(ir_max))
123 if self._ir_width is not None:
124 assert self._ir_width >= ir_width, "Specified JTAG IR width not big enough for allocated shiift registers"
125 ir_width = self._ir_width
126
127 sigs = Record([
128 ("capture", 1),
129 ("shift", 1),
130 ("update", 1),
131 ("ir", ir_width),
132 ("tdo_jtag", 1),
133 ])
134
135 reset = Signal()
136
137 core_i = Cat(pin.i for pin in self.core)
138 core_o = Cat(pin.o for pin in self.core)
139 core_oe = Cat(pin.oe for pin in self.core)
140 pad_i = Cat(pin.i for pin in self.pad)
141 pad_o = Cat(pin.o for pin in self.pad)
142 pad_oe = Cat(pin.oe for pin in self.pad)
143
144 params = {
145 "p_IOS": self._io_count,
146 "p_IR_WIDTH": ir_width,
147 "p_MANUFACTURER": self._manufacturer_id,
148 "p_PART_NUMBER": self._part_number,
149 "p_VERSION": self._version,
150 "i_TCK": self.bus.tck,
151 "i_TMS": self.bus.tms,
152 "i_TDI": self.bus.tdi,
153 "o_TDO": sigs.tdo_jtag,
154 "i_TRST_N": Const(1),
155 "o_RESET": reset,
156 "o_DRCAPTURE": sigs.capture,
157 "o_DRSHIFT": sigs.shift,
158 "o_DRUPDATE": sigs.update,
159 "o_IR": sigs.ir,
160 "o_CORE_IN": core_i,
161 "i_CORE_OUT": core_o,
162 "i_CORE_EN": core_oe,
163 "i_PAD_IN": pad_i,
164 "o_PAD_OUT": pad_o,
165 "o_PAD_EN": pad_oe,
166 }
167 m.submodules.tap = Instance("c4m_jtag_tap_controller", **params)
168
169 # Own clock domain using TCK as clock signal
170 m.domains.jtag = jtag_cd = ClockDomain(name="jtag", local=True)
171 m.d.comb += [
172 jtag_cd.clk.eq(self.bus.tck),
173 jtag_cd.rst.eq(reset),
174 ]
175
176 self._elaborate_shiftregs(m, sigs)
177 self._elaborate_wishbones(m)
178
179 return m
180
181
182 def add_shiftreg(self, ircode, length, domain="sync", name=None, src_loc_at=0):
183 """Add a shift register to the JTAG interface
184
185 Parameters:
186 - ircode: code(s) for the IR; int or sequence of ints. In the latter case this
187 shiftreg is shared between different IR codes.
188 - length: the length of the shift register
189 - domain: the domain on which the signal will be used"""
190
191 try:
192 ir_it = iter(ircode)
193 ircodes = ircode
194 except TypeError:
195 ir_it = ircodes = (ircode,)
196 for _ircode in ir_it:
197 if not isinstance(_ircode, int) or _ircode <= 0:
198 raise ValueError("IR code '{}' is not an int greater than 0".format(_ircode))
199 if _ircode in self._ircodes:
200 raise ValueError("IR code '{}' already taken".format(_ircode))
201
202 self._ircodes.extend(ircodes)
203
204 if name is None:
205 name = self.name + "_sr{}".format(len(self._srs))
206 sr = ShiftReg(sr_length=length, cmds=len(ircodes), name=name, src_loc_at=src_loc_at+1)
207 self._srs.append((ircodes, domain, sr))
208
209 return sr
210
211 def _elaborate_shiftregs(self, m, sigs):
212 # tdos is tuple of (tdo, tdo_en) for each shiftreg
213 tdos = []
214 for ircodes, domain, sr in self._srs:
215 reg = Signal(len(sr.o), name=sr.name+"_reg")
216 m.d.comb += sr.o.eq(reg)
217
218 isir = Signal(len(ircodes), name=sr.name+"_isir")
219 capture = Signal(name=sr.name+"_capture")
220 shift = Signal(name=sr.name+"_shift")
221 update = Signal(name=sr.name+"_update")
222 m.d.comb += [
223 isir.eq(Cat(sigs.ir == ircode for ircode in ircodes)),
224 capture.eq((isir != 0) & sigs.capture),
225 shift.eq((isir != 0) & sigs.shift),
226 update.eq((isir != 0) & sigs.update),
227 ]
228
229 # update signal is on the JTAG clockdomain, sr.oe is on `domain` clockdomain
230 # latch update in `domain` clockdomain and see when it has falling edge.
231 # At that edge put isir in sr.oe for one `domain` clockdomain
232 update_core = Signal(name=sr.name+"_update_core")
233 update_core_prev = Signal(name=sr.name+"_update_core_prev")
234 m.d[domain] += [
235 update_core.eq(update), # This is CDC from JTAG domain to given domain
236 update_core_prev.eq(update_core)
237 ]
238 with m.If(update_core_prev & ~update_core == 0):
239 # Falling edge of update
240 m.d[domain] += sr.oe.eq(isir)
241 with m.Else():
242 m.d[domain] += sr.oe.eq(0)
243
244 with m.If(shift):
245 m.d.jtag += reg.eq(Cat(reg[1:], self.bus.tdi))
246 with m.If(capture):
247 m.d.jtag += reg.eq(sr.i)
248
249 # tdo = reg[0], tdo_en = shift
250 tdos.append((reg[0], shift))
251
252 for i, (tdo, tdo_en) in enumerate(tdos):
253 if i == 0:
254 with m.If(shift):
255 m.d.comb += self.bus.tdo.eq(tdo)
256 else:
257 with m.Elif(shift):
258 m.d.comb += self.bus.tdo.eq(tdo)
259
260 if len(tdos) > 0:
261 with m.Else():
262 m.d.comb += self.bus.tdo.eq(sigs.tdo_jtag)
263 else:
264 # Always connect tdo_jtag to
265 m.d.comb += self.bus.tdo.eq(sigs.tdo_jtag)
266
267
268 def add_wishbone(self, *, ircodes, address_width, data_width, sel_width=None, domain="sync"):
269 """Add a wishbone interface
270
271 In order to allow high JTAG clock speed, data will be cached. This means that if data is
272 output the value of the next address will be read automatically.
273
274 Parameters:
275 ircodes: sequence of three integer for the JTAG IR codes;
276 they represent resp. WBADDR, WBREAD and WBREADWRITE. First code
277 has a shift register of length 'address_width', the two other codes
278 share a shift register of length data_width.
279 address_width: width of the address
280 data_width: width of the data
281 """
282 if len(ircodes) != 3:
283 raise ValueError("3 IR Codes have to be provided")
284
285 sr_addr = self.add_shiftreg(ircodes[0], address_width, domain=domain)
286 sr_data = self.add_shiftreg(ircodes[1:], data_width, domain=domain)
287
288 wb = Wishbone(data_width=data_width, address_width=address_width, sel_width=sel_width, master=True)
289
290 self._wbs.append((sr_addr, sr_data, wb, domain))
291
292 return wb
293
294 def _elaborate_wishbones(self, m):
295 for sr_addr, sr_data, wb, domain in self._wbs:
296 if hasattr(wb, "sel"):
297 # Always selected
298 m.d.comb += [s.eq(1) for s in wb.sel]
299
300 with m.FSM(domain=domain) as fsm:
301 with m.State("IDLE"):
302 m.d.comb += [
303 wb.cyc.eq(0),
304 wb.stb.eq(0),
305 wb.we.eq(0),
306 ]
307 with m.If(sr_addr.oe): # WBADDR code
308 m.d[domain] += wb.addr.eq(sr_addr.o)
309 m.next = "READ"
310 with m.Elif(sr_data.oe[0]): # WBREAD code
311 # If data is
312 m.d[domain] += wb.addr.eq(wb.addr + 1)
313 m.next = "READ"
314
315 with m.If(sr_data.oe[1]): # WBWRITE code
316 m.d[domain] += wb.dat_w.eq(sr_data.o)
317 m.next = "WRITEREAD"
318 with m.State("READ"):
319 m.d.comb += [
320 wb.cyc.eq(1),
321 wb.stb.eq(1),
322 wb.we.eq(0),
323 ]
324 with m.If(~wb.stall):
325 m.next = "READACK"
326 with m.State("READACK"):
327 m.d.comb += [
328 wb.cyc.eq(1),
329 wb.stb.eq(0),
330 wb.we.eq(0),
331 ]
332 with m.If(wb.ack):
333 # Store read data in sr_data.i and keep it there til next read
334 m.d[domain] += sr_data.i.eq(wb.dat_r)
335 m.next = "IDLE"
336 with m.State("WRITEREAD"):
337 m.d.comb += [
338 wb.cyc.eq(1),
339 wb.stb.eq(1),
340 wb.we.eq(1),
341 ]
342 with m.If(~wb.stall):
343 m.next = "WRITEREADACK"
344 with m.State("WRITEREADACK"):
345 m.d.comb += [
346 wb.cyc.eq(1),
347 wb.stb.eq(0),
348 wb.we.eq(0),
349 ]
350 with m.If(wb.ack):
351 m.d[domain] += wb.addr.eq(wb.addr + 1)
352 m.next = "READ"