Print out name of logfile when debug test is run.
[riscv-tests.git] / debug / testlib.py
1 import collections
2 import os.path
3 import random
4 import re
5 import shlex
6 import subprocess
7 import sys
8 import tempfile
9 import time
10 import traceback
11
12 import pexpect
13
14 # Note that gdb comes with its own testsuite. I was unable to figure out how to
15 # run that testsuite against the spike simulator.
16
17 def find_file(path):
18 for directory in (os.getcwd(), os.path.dirname(__file__)):
19 fullpath = os.path.join(directory, path)
20 if os.path.exists(fullpath):
21 return fullpath
22 return None
23
24 def compile(args, xlen=32): # pylint: disable=redefined-builtin
25 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
26 cmd = [cc, "-g"]
27 if xlen == 32:
28 cmd.append("-march=rv32imac")
29 cmd.append("-mabi=ilp32")
30 else:
31 cmd.append("-march=rv64imac")
32 cmd.append("-mabi=lp64")
33 for arg in args:
34 found = find_file(arg)
35 if found:
36 cmd.append(found)
37 else:
38 cmd.append(arg)
39 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
40 stderr=subprocess.PIPE)
41 stdout, stderr = process.communicate()
42 if process.returncode:
43 print
44 header("Compile failed")
45 print "+", " ".join(cmd)
46 print stdout,
47 print stderr,
48 header("")
49 raise Exception("Compile failed!")
50
51 def unused_port():
52 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
53 import socket
54 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
55 s.bind(("", 0))
56 port = s.getsockname()[1]
57 s.close()
58 return port
59
60 class Spike(object):
61 logname = "spike-%d.log" % os.getpid()
62
63 def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True):
64 """Launch spike. Return tuple of its process and the port it's running
65 on."""
66 if target.sim_cmd:
67 cmd = shlex.split(target.sim_cmd)
68 else:
69 spike = os.path.expandvars("$RISCV/bin/spike")
70 cmd = [spike]
71 if target.xlen == 32:
72 cmd += ["--isa", "RV32G"]
73 else:
74 cmd += ["--isa", "RV64G"]
75 cmd += ["-m0x%x:0x%x" % (target.ram, target.ram_size)]
76
77 if timeout:
78 cmd = ["timeout", str(timeout)] + cmd
79
80 if halted:
81 cmd.append('-H')
82 if with_jtag_gdb:
83 cmd += ['--rbb-port', '0']
84 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
85 self.infinite_loop = target.compile(
86 "programs/checksum.c", "programs/tiny-malloc.c",
87 "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
88 cmd.append(self.infinite_loop)
89 logfile = open(self.logname, "w")
90 logfile.write("+ %s\n" % " ".join(cmd))
91 logfile.flush()
92 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
93 stdout=logfile, stderr=logfile)
94
95 if with_jtag_gdb:
96 self.port = None
97 for _ in range(30):
98 m = re.search(r"Listening for remote bitbang connection on "
99 r"port (\d+).", open(self.logname).read())
100 if m:
101 self.port = int(m.group(1))
102 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
103 break
104 time.sleep(0.11)
105 assert self.port, "Didn't get spike message about bitbang " \
106 "connection"
107
108 def __del__(self):
109 try:
110 self.process.kill()
111 self.process.wait()
112 except OSError:
113 pass
114
115 def wait(self, *args, **kwargs):
116 return self.process.wait(*args, **kwargs)
117
118 class VcsSim(object):
119 logname = "simv.log"
120
121 def __init__(self, sim_cmd=None, debug=False):
122 if sim_cmd:
123 cmd = shlex.split(sim_cmd)
124 else:
125 cmd = ["simv"]
126 cmd += ["+jtag_vpi_enable"]
127 if debug:
128 cmd[0] = cmd[0] + "-debug"
129 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
130 logfile = open(self.logname, "w")
131 logfile.write("+ %s\n" % " ".join(cmd))
132 logfile.flush()
133 listenfile = open(self.logname, "r")
134 listenfile.seek(0, 2)
135 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
136 stdout=logfile, stderr=logfile)
137 done = False
138 while not done:
139 # Fail if VCS exits early
140 exit_code = self.process.poll()
141 if exit_code is not None:
142 raise RuntimeError('VCS simulator exited early with status %d'
143 % exit_code)
144
145 line = listenfile.readline()
146 if not line:
147 time.sleep(1)
148 match = re.match(r"^Listening on port (\d+)$", line)
149 if match:
150 done = True
151 self.port = int(match.group(1))
152 os.environ['JTAG_VPI_PORT'] = str(self.port)
153
154 def __del__(self):
155 try:
156 self.process.kill()
157 self.process.wait()
158 except OSError:
159 pass
160
161 class Openocd(object):
162 logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')
163 logname = logfile.name
164
165 def __init__(self, server_cmd=None, config=None, debug=False):
166 if server_cmd:
167 cmd = shlex.split(server_cmd)
168 else:
169 openocd = os.path.expandvars("$RISCV/bin/openocd")
170 cmd = [openocd]
171 if debug:
172 cmd.append("-d")
173
174 # This command needs to come before any config scripts on the command
175 # line, since they are executed in order.
176 cmd += [
177 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
178 "--command",
179 "gdb_port 0",
180 # Disable tcl and telnet servers, since they are unused and because
181 # the port numbers will conflict if multiple OpenOCD processes are
182 # running on the same server.
183 "--command",
184 "tcl_port disabled",
185 "--command",
186 "telnet_port disabled",
187 ]
188
189 if config:
190 f = find_file(config)
191 if f is None:
192 print "Unable to read file " + config
193 exit(1)
194
195 cmd += ["-f", f]
196 if debug:
197 cmd.append("-d")
198
199 logfile = open(Openocd.logname, "w")
200 logfile.write("+ %s\n" % " ".join(cmd))
201 logfile.flush()
202 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
203 stdout=logfile, stderr=logfile)
204
205 # Wait for OpenOCD to have made it through riscv_examine(). When using
206 # OpenOCD to communicate with a simulator this may take a long time,
207 # and gdb will time out when trying to connect if we attempt too early.
208 start = time.time()
209 messaged = False
210 while True:
211 log = open(Openocd.logname).read()
212 m = re.search(r"Listening on port (\d+) for gdb connections", log)
213 if m:
214 self.port = int(m.group(1))
215 break
216
217 if not self.process.poll() is None:
218 header("OpenOCD log")
219 sys.stdout.write(log)
220 raise Exception(
221 "OpenOCD exited before completing riscv_examine()")
222 if not messaged and time.time() - start > 1:
223 messaged = True
224 print "Waiting for OpenOCD to start..."
225 if time.time() - start > 60:
226 raise Exception("ERROR: Timed out waiting for OpenOCD to "
227 "listen for gdb")
228
229 def __del__(self):
230 try:
231 self.process.kill()
232 self.process.wait()
233 except (OSError, AttributeError):
234 pass
235
236 class OpenocdCli(object):
237 def __init__(self, port=4444):
238 self.child = pexpect.spawn(
239 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
240 self.child.expect("> ")
241
242 def command(self, cmd):
243 self.child.sendline(cmd)
244 self.child.expect(cmd)
245 self.child.expect("\n")
246 self.child.expect("> ")
247 return self.child.before.strip("\t\r\n \0")
248
249 def reg(self, reg=''):
250 output = self.command("reg %s" % reg)
251 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
252 values = {r: int(v, 0) for r, v in matches}
253 if reg:
254 return values[reg]
255 return values
256
257 def load_image(self, image):
258 output = self.command("load_image %s" % image)
259 if 'invalid ELF file, only 32bits files are supported' in output:
260 raise TestNotApplicable(output)
261
262 class CannotAccess(Exception):
263 def __init__(self, address):
264 Exception.__init__(self)
265 self.address = address
266
267 Thread = collections.namedtuple('Thread', ('id', 'target_id', 'name',
268 'frame'))
269
270 class Gdb(object):
271 logfile = tempfile.NamedTemporaryFile(prefix="gdb", suffix=".log")
272 logname = logfile.name
273
274 def __init__(self,
275 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
276 self.child = pexpect.spawn(cmd)
277 self.child.logfile = open(self.logname, "w")
278 self.child.logfile.write("+ %s\n" % cmd)
279 self.wait()
280 self.command("set confirm off")
281 self.command("set width 0")
282 self.command("set height 0")
283 # Force consistency.
284 self.command("set print entry-values no")
285
286 def wait(self):
287 """Wait for prompt."""
288 self.child.expect(r"\(gdb\)")
289
290 def command(self, command, timeout=6000):
291 self.child.sendline(command)
292 self.child.expect("\n", timeout=timeout)
293 self.child.expect(r"\(gdb\)", timeout=timeout)
294 return self.child.before.strip()
295
296 def c(self, wait=True, timeout=-1, async=False):
297 if async:
298 async = "&"
299 else:
300 async = ""
301 if wait:
302 output = self.command("c%s" % async, timeout=timeout)
303 assert "Continuing" in output
304 return output
305 else:
306 self.child.sendline("c%s" % async)
307 self.child.expect("Continuing")
308
309 def interrupt(self):
310 self.child.send("\003")
311 self.child.expect(r"\(gdb\)", timeout=6000)
312 return self.child.before.strip()
313
314 def x(self, address, size='w'):
315 output = self.command("x/%s %s" % (size, address))
316 value = int(output.split(':')[1].strip(), 0)
317 return value
318
319 def p_raw(self, obj):
320 output = self.command("p %s" % obj)
321 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
322 if m:
323 raise CannotAccess(int(m.group(1), 0))
324 return output.split('=')[-1].strip()
325
326 def p(self, obj):
327 output = self.command("p/x %s" % obj)
328 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
329 if m:
330 raise CannotAccess(int(m.group(1), 0))
331 value = int(output.split('=')[-1].strip(), 0)
332 return value
333
334 def p_string(self, obj):
335 output = self.command("p %s" % obj)
336 value = shlex.split(output.split('=')[-1].strip())[1]
337 return value
338
339 def stepi(self):
340 output = self.command("stepi", timeout=60)
341 return output
342
343 def load(self):
344 output = self.command("load", timeout=6000)
345 assert "failed" not in output
346 assert "Transfer rate" in output
347
348 def b(self, location):
349 output = self.command("b %s" % location)
350 assert "not defined" not in output
351 assert "Breakpoint" in output
352 return output
353
354 def hbreak(self, location):
355 output = self.command("hbreak %s" % location)
356 assert "not defined" not in output
357 assert "Hardware assisted breakpoint" in output
358 return output
359
360 def threads(self):
361 output = self.command("info threads")
362 threads = []
363 for line in output.splitlines():
364 m = re.match(
365 r"[\s\*]*(\d+)\s*Thread (\d+)\s*\(Name: ([^\)]+)\s*(.*)",
366 line)
367 if m:
368 threads.append(Thread(*m.groups()))
369 if not threads:
370 threads.append(Thread('1', '1', 'Default', '???'))
371 return threads
372
373 def thread(self, thread):
374 return self.command("thread %s" % thread.id)
375
376 def run_all_tests(module, target, parsed):
377 if not os.path.exists(parsed.logs):
378 os.makedirs(parsed.logs)
379
380 overall_start = time.time()
381
382 global gdb_cmd # pylint: disable=global-statement
383 gdb_cmd = parsed.gdb
384
385 todo = []
386 if parsed.misaval:
387 target.misa = int(parsed.misaval, 16)
388 print "Using $misa from command line: 0x%x" % target.misa
389 elif target.misa:
390 print "Using $misa from target definition: 0x%x" % target.misa
391 else:
392 todo.append(("ExamineTarget", ExamineTarget))
393
394 for name in dir(module):
395 definition = getattr(module, name)
396 if type(definition) == type and hasattr(definition, 'test') and \
397 (not parsed.test or any(test in name for test in parsed.test)):
398 todo.append((name, definition))
399
400 results, count = run_tests(parsed, target, todo)
401
402 header("ran %d tests in %.0fs" % (count, time.time() - overall_start),
403 dash=':')
404
405 return print_results(results)
406
407 good_results = set(('pass', 'not_applicable'))
408 def run_tests(parsed, target, todo):
409 results = {}
410 count = 0
411
412 for name, definition in todo:
413 instance = definition(target)
414 log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %
415 (time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))
416 log_fd = open(log_name, 'w')
417 print "Running %s > %s ..." % (name, log_name),
418 sys.stdout.flush()
419 log_fd.write("Test: %s\n" % name)
420 log_fd.write("Target: %s\n" % type(target).__name__)
421 start = time.time()
422 real_stdout = sys.stdout
423 sys.stdout = log_fd
424 try:
425 result = instance.run()
426 log_fd.write("Result: %s\n" % result)
427 finally:
428 sys.stdout = real_stdout
429 log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
430 print "%s in %.2fs" % (result, time.time() - start)
431 if result not in good_results and parsed.print_failures:
432 sys.stdout.write(open(log_name).read())
433 sys.stdout.flush()
434 results.setdefault(result, []).append((name, log_name))
435 count += 1
436 if result not in good_results and parsed.fail_fast:
437 break
438
439 return results, count
440
441 def print_results(results):
442 result = 0
443 for key, value in results.iteritems():
444 print "%d tests returned %s" % (len(value), key)
445 if key not in good_results:
446 result = 1
447 for name, log_name in value:
448 print " %s > %s" % (name, log_name)
449
450 return result
451
452 def add_test_run_options(parser):
453 parser.add_argument("--logs", default="logs",
454 help="Store logs in the specified directory.")
455 parser.add_argument("--fail-fast", "-f", action="store_true",
456 help="Exit as soon as any test fails.")
457 parser.add_argument("--print-failures", action="store_true",
458 help="When a test fails, print the log file to stdout.")
459 parser.add_argument("test", nargs='*',
460 help="Run only tests that are named here.")
461 parser.add_argument("--gdb",
462 help="The command to use to start gdb.")
463 parser.add_argument("--misaval",
464 help="Don't run ExamineTarget, just assume the misa value which is "
465 "specified.")
466
467 def header(title, dash='-', length=78):
468 if title:
469 dashes = dash * (length - 4 - len(title))
470 before = dashes[:len(dashes)/2]
471 after = dashes[len(dashes)/2:]
472 print "%s[ %s ]%s" % (before, title, after)
473 else:
474 print dash * length
475
476 def print_log(path):
477 header(path)
478 lines = open(path, "r").readlines()
479 for l in lines:
480 sys.stdout.write(l)
481 print
482
483 class BaseTest(object):
484 compiled = {}
485
486 def __init__(self, target):
487 self.target = target
488 self.server = None
489 self.target_process = None
490 self.binary = None
491 self.start = 0
492 self.logs = []
493
494 def early_applicable(self):
495 """Return a false value if the test has determined it cannot run
496 without ever needing to talk to the target or server."""
497 # pylint: disable=no-self-use
498 return True
499
500 def setup(self):
501 pass
502
503 def compile(self):
504 compile_args = getattr(self, 'compile_args', None)
505 if compile_args:
506 if compile_args not in BaseTest.compiled:
507 # pylint: disable=star-args
508 BaseTest.compiled[compile_args] = \
509 self.target.compile(*compile_args)
510 self.binary = BaseTest.compiled.get(compile_args)
511
512 def classSetup(self):
513 self.compile()
514 self.target_process = self.target.create()
515 if self.target_process:
516 self.logs.append(self.target_process.logname)
517 try:
518 self.server = self.target.server()
519 self.logs.append(self.server.logname)
520 except Exception:
521 for log in self.logs:
522 print_log(log)
523 raise
524
525 def classTeardown(self):
526 del self.server
527 del self.target_process
528
529 def run(self):
530 """
531 If compile_args is set, compile a program and set self.binary.
532
533 Call setup().
534
535 Then call test() and return the result, displaying relevant information
536 if an exception is raised.
537 """
538
539 sys.stdout.flush()
540
541 if not self.early_applicable():
542 return "not_applicable"
543
544 self.start = time.time()
545
546 try:
547 self.classSetup()
548 self.setup()
549 result = self.test() # pylint: disable=no-member
550 except TestNotApplicable:
551 result = "not_applicable"
552 except Exception as e: # pylint: disable=broad-except
553 if isinstance(e, TestFailed):
554 result = "fail"
555 else:
556 result = "exception"
557 if isinstance(e, TestFailed):
558 header("Message")
559 print e.message
560 header("Traceback")
561 traceback.print_exc(file=sys.stdout)
562 return result
563
564 finally:
565 for log in self.logs:
566 print_log(log)
567 header("End of logs")
568 self.classTeardown()
569
570 if not result:
571 result = 'pass'
572 return result
573
574 gdb_cmd = None
575 class GdbTest(BaseTest):
576 def __init__(self, target):
577 BaseTest.__init__(self, target)
578 self.gdb = None
579
580 def classSetup(self):
581 BaseTest.classSetup(self)
582
583 if gdb_cmd:
584 self.gdb = Gdb(gdb_cmd)
585 else:
586 self.gdb = Gdb()
587
588 self.logs.append(self.gdb.logname)
589
590 if self.binary:
591 self.gdb.command("file %s" % self.binary)
592 if self.target:
593 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
594 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
595 if self.server.port:
596 self.gdb.command(
597 "target extended-remote localhost:%d" % self.server.port)
598 # Select a random thread.
599 # TODO: Allow a command line option to force a specific thread.
600 thread = random.choice(self.gdb.threads())
601 self.gdb.thread(thread)
602
603 for cmd in self.target.gdb_setup:
604 self.gdb.command(cmd)
605
606 # FIXME: OpenOCD doesn't handle PRIV now
607 #self.gdb.p("$priv=3")
608
609 def classTeardown(self):
610 del self.gdb
611 BaseTest.classTeardown(self)
612
613 class ExamineTarget(GdbTest):
614 def test(self):
615 self.target.misa = self.gdb.p("$misa")
616
617 txt = "RV"
618 if (self.target.misa >> 30) == 1:
619 txt += "32"
620 elif (self.target.misa >> 62) == 2:
621 txt += "64"
622 elif (self.target.misa >> 126) == 3:
623 txt += "128"
624 else:
625 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
626 self.target.misa)
627
628 for i in range(26):
629 if self.target.misa & (1<<i):
630 txt += chr(i + ord('A'))
631 print txt,
632
633 class TestFailed(Exception):
634 def __init__(self, message):
635 Exception.__init__(self)
636 self.message = message
637
638 class TestNotApplicable(Exception):
639 def __init__(self, message):
640 Exception.__init__(self)
641 self.message = message
642
643 def assertEqual(a, b):
644 if a != b:
645 raise TestFailed("%r != %r" % (a, b))
646
647 def assertNotEqual(a, b):
648 if a == b:
649 raise TestFailed("%r == %r" % (a, b))
650
651 def assertIn(a, b):
652 if a not in b:
653 raise TestFailed("%r not in %r" % (a, b))
654
655 def assertNotIn(a, b):
656 if a in b:
657 raise TestFailed("%r in %r" % (a, b))
658
659 def assertGreater(a, b):
660 if not a > b:
661 raise TestFailed("%r not greater than %r" % (a, b))
662
663 def assertLess(a, b):
664 if not a < b:
665 raise TestFailed("%r not less than %r" % (a, b))
666
667 def assertTrue(a):
668 if not a:
669 raise TestFailed("%r is not True" % a)
670
671 def assertRegexpMatches(text, regexp):
672 if not re.search(regexp, text):
673 raise TestFailed("can't find %r in %r" % (regexp, text))