Print an error message when the OpenOCD config file can't be read
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv%d-unknown-elf-gcc" % xlen)
23 cmd = [cc, "-g"]
24 for arg in args:
25 found = find_file(arg)
26 if found:
27 cmd.append(found)
28 else:
29 cmd.append(arg)
30 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
31 stderr=subprocess.PIPE)
32 stdout, stderr = process.communicate()
33 if process.returncode:
34 print
35 header("Compile failed")
36 print "+", " ".join(cmd)
37 print stdout,
38 print stderr,
39 header("")
40 raise Exception("Compile failed!")
41
42 def unused_port():
43 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
44 import socket
45 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
46 s.bind(("", 0))
47 port = s.getsockname()[1]
48 s.close()
49 return port
50
51 class Spike(object):
52 logname = "spike.log"
53
54 def __init__(self, cmd, binary=None, halted=False, with_jtag_gdb=True,
55 timeout=None, xlen=64):
56 """Launch spike. Return tuple of its process and the port it's running
57 on."""
58 if cmd:
59 cmd = shlex.split(cmd)
60 else:
61 cmd = ["spike"]
62 if xlen == 32:
63 cmd += ["--isa", "RV32"]
64
65 if timeout:
66 cmd = ["timeout", str(timeout)] + cmd
67
68 if halted:
69 cmd.append('-H')
70 if with_jtag_gdb:
71 cmd += ['--rbb-port', '0']
72 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
73 cmd.append("-m32")
74 cmd.append('pk')
75 if binary:
76 cmd.append(binary)
77 logfile = open(self.logname, "w")
78 logfile.write("+ %s\n" % " ".join(cmd))
79 logfile.flush()
80 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
81 stdout=logfile, stderr=logfile)
82
83 if with_jtag_gdb:
84 self.port = None
85 for _ in range(30):
86 m = re.search(r"Listening for remote bitbang connection on "
87 r"port (\d+).", open(self.logname).read())
88 if m:
89 self.port = int(m.group(1))
90 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
91 break
92 time.sleep(0.11)
93 assert self.port, "Didn't get spike message about bitbang " \
94 "connection"
95
96 def __del__(self):
97 try:
98 self.process.kill()
99 self.process.wait()
100 except OSError:
101 pass
102
103 def wait(self, *args, **kwargs):
104 return self.process.wait(*args, **kwargs)
105
106 class VcsSim(object):
107 def __init__(self, simv=None, debug=False):
108 if simv:
109 cmd = shlex.split(simv)
110 else:
111 cmd = ["simv"]
112 cmd += ["+jtag_vpi_enable"]
113 if debug:
114 cmd[0] = cmd[0] + "-debug"
115 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
116 logfile = open("simv.log", "w")
117 logfile.write("+ %s\n" % " ".join(cmd))
118 logfile.flush()
119 listenfile = open("simv.log", "r")
120 listenfile.seek(0, 2)
121 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
122 stdout=logfile, stderr=logfile)
123 done = False
124 while not done:
125 line = listenfile.readline()
126 if not line:
127 time.sleep(1)
128 match = re.match(r"^Listening on port (\d+)$", line)
129 if match:
130 done = True
131 self.port = int(match.group(1))
132 os.environ['JTAG_VPI_PORT'] = str(self.port)
133
134 def __del__(self):
135 try:
136 self.process.kill()
137 self.process.wait()
138 except OSError:
139 pass
140
141 class Openocd(object):
142 logname = "openocd.log"
143
144 def __init__(self, cmd=None, config=None, debug=False):
145 if cmd:
146 cmd = shlex.split(cmd)
147 else:
148 cmd = ["openocd", "-d"]
149
150 # This command needs to come before any config scripts on the command
151 # line, since they are executed in order.
152 cmd += [
153 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
154 "--command",
155 "gdb_port 0",
156 # Disable tcl and telnet servers, since they are unused and because
157 # the port numbers will conflict if multiple OpenOCD processes are
158 # running on the same server.
159 "--command",
160 "tcl_port disabled",
161 "--command",
162 "telnet_port disabled",
163 ]
164
165 if config:
166 f = find_file(config)
167 if f is None:
168 print("Unable to read file " + config)
169 exit(1)
170
171 cmd += ["-f", f]
172 if debug:
173 cmd.append("-d")
174
175 logfile = open(Openocd.logname, "w")
176 logfile.write("+ %s\n" % " ".join(cmd))
177 logfile.flush()
178 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
179 stdout=logfile, stderr=logfile)
180
181 # Wait for OpenOCD to have made it through riscv_examine(). When using
182 # OpenOCD to communicate with a simulator this may take a long time,
183 # and gdb will time out when trying to connect if we attempt too early.
184 start = time.time()
185 messaged = False
186 while True:
187 log = open(Openocd.logname).read()
188 if "Examined RISCV core" in log:
189 break
190 if not self.process.poll() is None:
191 raise Exception(
192 "OpenOCD exited before completing riscv_examine()")
193 if not messaged and time.time() - start > 1:
194 messaged = True
195 print "Waiting for OpenOCD to examine RISCV core..."
196
197 self.port = self._get_gdb_server_port()
198
199 def _get_gdb_server_port(self):
200 """Get port that OpenOCD's gdb server is listening on."""
201 MAX_ATTEMPTS = 50
202 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
203 for _ in range(MAX_ATTEMPTS):
204 with open(os.devnull, 'w') as devnull:
205 try:
206 output = subprocess.check_output([
207 'lsof',
208 '-a', # Take the AND of the following selectors
209 '-p{}'.format(self.process.pid), # Filter on PID
210 '-iTCP', # Filter only TCP sockets
211 ], stderr=devnull)
212 except subprocess.CalledProcessError:
213 output = ""
214 matches = list(PORT_REGEX.finditer(output))
215 matches = [m for m in matches
216 if m.group('port') not in ('6666', '4444')]
217 if len(matches) > 1:
218 print output
219 raise Exception(
220 "OpenOCD listening on multiple ports. Cannot uniquely "
221 "identify gdb server port.")
222 elif matches:
223 [match] = matches
224 return int(match.group('port'))
225 time.sleep(0.1)
226 raise Exception("Timed out waiting for gdb server to obtain port.")
227
228 def __del__(self):
229 try:
230 self.process.kill()
231 self.process.wait()
232 except OSError:
233 pass
234
235 class OpenocdCli(object):
236 def __init__(self, port=4444):
237 self.child = pexpect.spawn(
238 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
239 self.child.expect("> ")
240
241 def command(self, cmd):
242 self.child.sendline(cmd)
243 self.child.expect(cmd)
244 self.child.expect("\n")
245 self.child.expect("> ")
246 return self.child.before.strip("\t\r\n \0")
247
248 def reg(self, reg=''):
249 output = self.command("reg %s" % reg)
250 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
251 values = {r: int(v, 0) for r, v in matches}
252 if reg:
253 return values[reg]
254 return values
255
256 def load_image(self, image):
257 output = self.command("load_image %s" % image)
258 if 'invalid ELF file, only 32bits files are supported' in output:
259 raise TestNotApplicable(output)
260
261 class CannotAccess(Exception):
262 def __init__(self, address):
263 Exception.__init__(self)
264 self.address = address
265
266 class Gdb(object):
267 def __init__(self,
268 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
269 self.child = pexpect.spawn(cmd)
270 self.child.logfile = open("gdb.log", "w")
271 self.child.logfile.write("+ %s\n" % cmd)
272 self.wait()
273 self.command("set confirm off")
274 self.command("set width 0")
275 self.command("set height 0")
276 # Force consistency.
277 self.command("set print entry-values no")
278
279 def wait(self):
280 """Wait for prompt."""
281 self.child.expect(r"\(gdb\)")
282
283 def command(self, command, timeout=-1):
284 self.child.sendline(command)
285 self.child.expect("\n", timeout=timeout)
286 self.child.expect(r"\(gdb\)", timeout=timeout)
287 return self.child.before.strip()
288
289 def c(self, wait=True, timeout=-1):
290 if wait:
291 output = self.command("c", timeout=timeout)
292 assert "Continuing" in output
293 return output
294 else:
295 self.child.sendline("c")
296 self.child.expect("Continuing")
297
298 def interrupt(self):
299 self.child.send("\003")
300 self.child.expect(r"\(gdb\)", timeout=60)
301 return self.child.before.strip()
302
303 def x(self, address, size='w'):
304 output = self.command("x/%s %s" % (size, address))
305 value = int(output.split(':')[1].strip(), 0)
306 return value
307
308 def p_raw(self, obj):
309 output = self.command("p %s" % obj)
310 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
311 if m:
312 raise CannotAccess(int(m.group(1), 0))
313 return output.split('=')[-1].strip()
314
315 def p(self, obj):
316 output = self.command("p/x %s" % obj)
317 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
318 if m:
319 raise CannotAccess(int(m.group(1), 0))
320 value = int(output.split('=')[-1].strip(), 0)
321 return value
322
323 def p_string(self, obj):
324 output = self.command("p %s" % obj)
325 value = shlex.split(output.split('=')[-1].strip())[1]
326 return value
327
328 def stepi(self):
329 output = self.command("stepi")
330 return output
331
332 def load(self):
333 output = self.command("load", timeout=60)
334 assert "failed" not in output
335 assert "Transfer rate" in output
336
337 def b(self, location):
338 output = self.command("b %s" % location)
339 assert "not defined" not in output
340 assert "Breakpoint" in output
341 return output
342
343 def hbreak(self, location):
344 output = self.command("hbreak %s" % location)
345 assert "not defined" not in output
346 assert "Hardware assisted breakpoint" in output
347 return output
348
349 def run_all_tests(module, target, parsed):
350 good_results = set(('pass', 'not_applicable'))
351
352 start = time.time()
353
354 results = {}
355 count = 0
356
357 global gdb_cmd # pylint: disable=global-statement
358 gdb_cmd = parsed.gdb
359
360 todo = [("ExamineTarget", ExamineTarget)]
361 for name in dir(module):
362 definition = getattr(module, name)
363 if type(definition) == type and hasattr(definition, 'test') and \
364 (not parsed.test or any(test in name for test in parsed.test)):
365 todo.append((name, definition))
366
367 for name, definition in todo:
368 instance = definition(target)
369 result = instance.run()
370 results.setdefault(result, []).append(name)
371 count += 1
372 if result not in good_results and parsed.fail_fast:
373 break
374
375 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
376
377 result = 0
378 for key, value in results.iteritems():
379 print "%d tests returned %s" % (len(value), key)
380 if key not in good_results:
381 result = 1
382 for test in value:
383 print " ", test
384
385 return result
386
387 def add_test_run_options(parser):
388 parser.add_argument("--fail-fast", "-f", action="store_true",
389 help="Exit as soon as any test fails.")
390 parser.add_argument("test", nargs='*',
391 help="Run only tests that are named here.")
392 parser.add_argument("--gdb",
393 help="The command to use to start gdb.")
394
395 def header(title, dash='-'):
396 if title:
397 dashes = dash * (36 - len(title))
398 before = dashes[:len(dashes)/2]
399 after = dashes[len(dashes)/2:]
400 print "%s[ %s ]%s" % (before, title, after)
401 else:
402 print dash * 40
403
404 def print_log(path):
405 header(path)
406 lines = open(path, "r").readlines()
407 if len(lines) > 1000:
408 for l in lines[:500]:
409 sys.stdout.write(l)
410 print "..."
411 for l in lines[-500:]:
412 sys.stdout.write(l)
413 else:
414 for l in lines:
415 sys.stdout.write(l)
416
417 class BaseTest(object):
418 compiled = {}
419
420 def __init__(self, target):
421 self.target = target
422 self.server = None
423 self.target_process = None
424 self.binary = None
425 self.start = 0
426 self.logs = []
427
428 def early_applicable(self):
429 """Return a false value if the test has determined it cannot run
430 without ever needing to talk to the target or server."""
431 # pylint: disable=no-self-use
432 return True
433
434 def setup(self):
435 pass
436
437 def compile(self):
438 compile_args = getattr(self, 'compile_args', None)
439 if compile_args:
440 if compile_args not in BaseTest.compiled:
441 # pylint: disable=star-args
442 BaseTest.compiled[compile_args] = \
443 self.target.compile(*compile_args)
444 self.binary = BaseTest.compiled.get(compile_args)
445
446 def classSetup(self):
447 self.compile()
448 self.target_process = self.target.target()
449 self.server = self.target.server()
450 self.logs.append(self.server.logname)
451
452 def classTeardown(self):
453 del self.server
454 del self.target_process
455
456 def run(self):
457 """
458 If compile_args is set, compile a program and set self.binary.
459
460 Call setup().
461
462 Then call test() and return the result, displaying relevant information
463 if an exception is raised.
464 """
465
466 print "Running", type(self).__name__, "...",
467 sys.stdout.flush()
468
469 if not self.early_applicable():
470 print "not_applicable"
471 return "not_applicable"
472
473 self.start = time.time()
474
475 self.classSetup()
476
477 try:
478 self.setup()
479 result = self.test() # pylint: disable=no-member
480 except TestNotApplicable:
481 result = "not_applicable"
482 except Exception as e: # pylint: disable=broad-except
483 if isinstance(e, TestFailed):
484 result = "fail"
485 else:
486 result = "exception"
487 print "%s in %.2fs" % (result, time.time() - self.start)
488 print "=" * 40
489 if isinstance(e, TestFailed):
490 header("Message")
491 print e.message
492 header("Traceback")
493 traceback.print_exc(file=sys.stdout)
494 for log in self.logs:
495 print_log(log)
496 print "/" * 40
497 return result
498
499 finally:
500 self.classTeardown()
501
502 if not result:
503 result = 'pass'
504 print "%s in %.2fs" % (result, time.time() - self.start)
505 return result
506
507 gdb_cmd = None
508 class GdbTest(BaseTest):
509 def __init__(self, target):
510 BaseTest.__init__(self, target)
511 self.gdb = None
512
513 def classSetup(self):
514 BaseTest.classSetup(self)
515 self.logs.append("gdb.log")
516
517 if gdb_cmd:
518 self.gdb = Gdb(gdb_cmd)
519 else:
520 self.gdb = Gdb()
521
522 if self.binary:
523 self.gdb.command("file %s" % self.binary)
524 if self.target:
525 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
526 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
527 if self.server.port:
528 self.gdb.command(
529 "target extended-remote localhost:%d" % self.server.port)
530
531 self.gdb.p("$priv=3")
532
533 def classTeardown(self):
534 del self.gdb
535 BaseTest.classTeardown(self)
536
537 class ExamineTarget(GdbTest):
538 def test(self):
539 self.target.misa = self.gdb.p("$misa")
540
541 txt = "RV"
542 if (self.target.misa >> 30) == 1:
543 txt += "32"
544 elif (self.target.misa >> 62) == 2:
545 txt += "64"
546 elif (self.target.misa >> 126) == 3:
547 txt += "128"
548 else:
549 txt += "??"
550
551 for i in range(26):
552 if self.target.misa & (1<<i):
553 txt += chr(i + ord('A'))
554 print txt,
555
556 class TestFailed(Exception):
557 def __init__(self, message):
558 Exception.__init__(self)
559 self.message = message
560
561 class TestNotApplicable(Exception):
562 def __init__(self, message):
563 Exception.__init__(self)
564 self.message = message
565
566 def assertEqual(a, b):
567 if a != b:
568 raise TestFailed("%r != %r" % (a, b))
569
570 def assertNotEqual(a, b):
571 if a == b:
572 raise TestFailed("%r == %r" % (a, b))
573
574 def assertIn(a, b):
575 if a not in b:
576 raise TestFailed("%r not in %r" % (a, b))
577
578 def assertNotIn(a, b):
579 if a in b:
580 raise TestFailed("%r in %r" % (a, b))
581
582 def assertGreater(a, b):
583 if not a > b:
584 raise TestFailed("%r not greater than %r" % (a, b))
585
586 def assertLess(a, b):
587 if not a < b:
588 raise TestFailed("%r not less than %r" % (a, b))
589
590 def assertTrue(a):
591 if not a:
592 raise TestFailed("%r is not True" % a)
593
594 def assertRegexpMatches(text, regexp):
595 if not re.search(regexp, text):
596 raise TestFailed("can't find %r in %r" % (regexp, text))