Fail if simulator exits early.
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
23 cmd = [cc, "-g"]
24 if (xlen == 32):
25 cmd.append("-march=rv32imac")
26 cmd.append("-mabi=ilp32")
27 else:
28 cmd.append("-march=rv64imac")
29 cmd.append("-mabi=lp64")
30 for arg in args:
31 found = find_file(arg)
32 if found:
33 cmd.append(found)
34 else:
35 cmd.append(arg)
36 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
37 stderr=subprocess.PIPE)
38 stdout, stderr = process.communicate()
39 if process.returncode:
40 print
41 header("Compile failed")
42 print "+", " ".join(cmd)
43 print stdout,
44 print stderr,
45 header("")
46 raise Exception("Compile failed!")
47
48 def unused_port():
49 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
50 import socket
51 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
52 s.bind(("", 0))
53 port = s.getsockname()[1]
54 s.close()
55 return port
56
57 class Spike(object):
58 logname = "spike.log"
59
60 def __init__(self, sim_cmd, binary=None, halted=False, with_jtag_gdb=True,
61 timeout=None, xlen=64):
62 """Launch spike. Return tuple of its process and the port it's running
63 on."""
64 if sim_cmd:
65 cmd = shlex.split(sim_cmd)
66 else:
67 spike = os.path.expandvars("$RISCV/bin/spike")
68 cmd = [spike]
69 if xlen == 32:
70 cmd += ["--isa", "RV32G"]
71 else:
72 cmd += ["--isa", "RV64G"]
73
74 if timeout:
75 cmd = ["timeout", str(timeout)] + cmd
76
77 cmd += ["-m0x10000000:0x10000000"]
78
79 if halted:
80 cmd.append('-H')
81 if with_jtag_gdb:
82 cmd += ['--rbb-port', '0']
83 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
84 cmd.append('programs/infinite_loop')
85 if binary:
86 cmd.append(binary)
87 logfile = open(self.logname, "w")
88 logfile.write("+ %s\n" % " ".join(cmd))
89 logfile.flush()
90 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
91 stdout=logfile, stderr=logfile)
92
93 if with_jtag_gdb:
94 self.port = None
95 for _ in range(30):
96 m = re.search(r"Listening for remote bitbang connection on "
97 r"port (\d+).", open(self.logname).read())
98 if m:
99 self.port = int(m.group(1))
100 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
101 break
102 time.sleep(0.11)
103 assert self.port, "Didn't get spike message about bitbang " \
104 "connection"
105
106 def __del__(self):
107 try:
108 self.process.kill()
109 self.process.wait()
110 except OSError:
111 pass
112
113 def wait(self, *args, **kwargs):
114 return self.process.wait(*args, **kwargs)
115
116 class VcsSim(object):
117 def __init__(self, sim_cmd=None, debug=False):
118 if sim_cmd:
119 cmd = shlex.split(sim_cmd)
120 else:
121 cmd = ["simv"]
122 cmd += ["+jtag_vpi_enable"]
123 if debug:
124 cmd[0] = cmd[0] + "-debug"
125 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
126 logfile = open("simv.log", "w")
127 logfile.write("+ %s\n" % " ".join(cmd))
128 logfile.flush()
129 listenfile = open("simv.log", "r")
130 listenfile.seek(0, 2)
131 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
132 stdout=logfile, stderr=logfile)
133 done = False
134 while not done:
135 # Fail if VCS exits early
136 exit_code = self.process.poll()
137 if exit_code is not None:
138 raise RuntimeError('VCS simulator exited early with status %d'
139 % exit_code)
140
141 line = listenfile.readline()
142 if not line:
143 time.sleep(1)
144 match = re.match(r"^Listening on port (\d+)$", line)
145 if match:
146 done = True
147 self.port = int(match.group(1))
148 os.environ['JTAG_VPI_PORT'] = str(self.port)
149
150 def __del__(self):
151 try:
152 self.process.kill()
153 self.process.wait()
154 except OSError:
155 pass
156
157 class Openocd(object):
158 logname = "openocd.log"
159
160 def __init__(self, server_cmd=None, config=None, debug=False):
161 if server_cmd:
162 cmd = shlex.split(server_cmd)
163 else:
164 openocd = os.path.expandvars("$RISCV/bin/riscv-openocd")
165 cmd = [openocd]
166 if (debug):
167 cmd.append("-d")
168
169 # This command needs to come before any config scripts on the command
170 # line, since they are executed in order.
171 cmd += [
172 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
173 "--command",
174 "gdb_port 0",
175 # Disable tcl and telnet servers, since they are unused and because
176 # the port numbers will conflict if multiple OpenOCD processes are
177 # running on the same server.
178 "--command",
179 "tcl_port disabled",
180 "--command",
181 "telnet_port disabled",
182 ]
183
184 if config:
185 f = find_file(config)
186 if f is None:
187 print("Unable to read file " + config)
188 exit(1)
189
190 cmd += ["-f", f]
191 if debug:
192 cmd.append("-d")
193
194 logfile = open(Openocd.logname, "w")
195 logfile.write("+ %s\n" % " ".join(cmd))
196 logfile.flush()
197 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
198 stdout=logfile, stderr=logfile)
199
200 # Wait for OpenOCD to have made it through riscv_examine(). When using
201 # OpenOCD to communicate with a simulator this may take a long time,
202 # and gdb will time out when trying to connect if we attempt too early.
203 start = time.time()
204 messaged = False
205 while True:
206 log = open(Openocd.logname).read()
207 if "Ready for Remote Connections" in log:
208 break
209 if not self.process.poll() is None:
210 raise Exception(
211 "OpenOCD exited before completing riscv_examine()")
212 if not messaged and time.time() - start > 1:
213 messaged = True
214 print "Waiting for OpenOCD to examine RISCV core..."
215
216 self.port = self._get_gdb_server_port()
217
218 def _get_gdb_server_port(self):
219 """Get port that OpenOCD's gdb server is listening on."""
220 MAX_ATTEMPTS = 50
221 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
222 for _ in range(MAX_ATTEMPTS):
223 with open(os.devnull, 'w') as devnull:
224 try:
225 output = subprocess.check_output([
226 'lsof',
227 '-a', # Take the AND of the following selectors
228 '-p{}'.format(self.process.pid), # Filter on PID
229 '-iTCP', # Filter only TCP sockets
230 ], stderr=devnull)
231 except subprocess.CalledProcessError:
232 output = ""
233 matches = list(PORT_REGEX.finditer(output))
234 matches = [m for m in matches
235 if m.group('port') not in ('6666', '4444')]
236 if len(matches) > 1:
237 print output
238 raise Exception(
239 "OpenOCD listening on multiple ports. Cannot uniquely "
240 "identify gdb server port.")
241 elif matches:
242 [match] = matches
243 return int(match.group('port'))
244 time.sleep(1)
245 raise Exception("Timed out waiting for gdb server to obtain port.")
246
247 def __del__(self):
248 try:
249 self.process.kill()
250 self.process.wait()
251 except OSError:
252 pass
253
254 class OpenocdCli(object):
255 def __init__(self, port=4444):
256 self.child = pexpect.spawn(
257 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
258 self.child.expect("> ")
259
260 def command(self, cmd):
261 self.child.sendline(cmd)
262 self.child.expect(cmd)
263 self.child.expect("\n")
264 self.child.expect("> ")
265 return self.child.before.strip("\t\r\n \0")
266
267 def reg(self, reg=''):
268 output = self.command("reg %s" % reg)
269 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
270 values = {r: int(v, 0) for r, v in matches}
271 if reg:
272 return values[reg]
273 return values
274
275 def load_image(self, image):
276 output = self.command("load_image %s" % image)
277 if 'invalid ELF file, only 32bits files are supported' in output:
278 raise TestNotApplicable(output)
279
280 class CannotAccess(Exception):
281 def __init__(self, address):
282 Exception.__init__(self)
283 self.address = address
284
285 class Gdb(object):
286 def __init__(self,
287 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
288 self.child = pexpect.spawn(cmd)
289 self.child.logfile = open("gdb.log", "w")
290 self.child.logfile.write("+ %s\n" % cmd)
291 self.wait()
292 self.command("set confirm off")
293 self.command("set width 0")
294 self.command("set height 0")
295 # Force consistency.
296 self.command("set print entry-values no")
297
298 def wait(self):
299 """Wait for prompt."""
300 self.child.expect(r"\(gdb\)")
301
302 def command(self, command, timeout=6000):
303 self.child.sendline(command)
304 self.child.expect("\n", timeout=timeout)
305 self.child.expect(r"\(gdb\)", timeout=timeout)
306 return self.child.before.strip()
307
308 def c(self, wait=True, timeout=-1):
309 if wait:
310 output = self.command("c", timeout=timeout)
311 assert "Continuing" in output
312 return output
313 else:
314 self.child.sendline("c")
315 self.child.expect("Continuing")
316
317 def interrupt(self):
318 self.child.send("\003")
319 self.child.expect(r"\(gdb\)", timeout=6000)
320 return self.child.before.strip()
321
322 def x(self, address, size='w'):
323 output = self.command("x/%s %s" % (size, address))
324 value = int(output.split(':')[1].strip(), 0)
325 return value
326
327 def p_raw(self, obj):
328 output = self.command("p %s" % obj)
329 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
330 if m:
331 raise CannotAccess(int(m.group(1), 0))
332 return output.split('=')[-1].strip()
333
334 def p(self, obj):
335 output = self.command("p/x %s" % obj)
336 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
337 if m:
338 raise CannotAccess(int(m.group(1), 0))
339 value = int(output.split('=')[-1].strip(), 0)
340 return value
341
342 def p_string(self, obj):
343 output = self.command("p %s" % obj)
344 value = shlex.split(output.split('=')[-1].strip())[1]
345 return value
346
347 def stepi(self):
348 output = self.command("stepi")
349 return output
350
351 def load(self):
352 output = self.command("load", timeout=6000)
353 assert "failed" not in output
354 assert "Transfer rate" in output
355
356 def b(self, location):
357 output = self.command("b %s" % location)
358 assert "not defined" not in output
359 assert "Breakpoint" in output
360 return output
361
362 def hbreak(self, location):
363 output = self.command("hbreak %s" % location)
364 assert "not defined" not in output
365 assert "Hardware assisted breakpoint" in output
366 return output
367
368 def run_all_tests(module, target, parsed):
369 good_results = set(('pass', 'not_applicable'))
370
371 start = time.time()
372
373 results = {}
374 count = 0
375
376 global gdb_cmd # pylint: disable=global-statement
377 gdb_cmd = parsed.gdb
378
379 todo = []
380 if (parsed.misaval):
381 target.misa = int(parsed.misaval, 16)
382 print "Assuming $MISA value of 0x%x. Skipping ExamineTarget." % target.misa
383 else:
384 todo.append(("ExamineTarget", ExamineTarget))
385
386 for name in dir(module):
387 definition = getattr(module, name)
388 if type(definition) == type and hasattr(definition, 'test') and \
389 (not parsed.test or any(test in name for test in parsed.test)):
390 todo.append((name, definition))
391
392 for name, definition in todo:
393 instance = definition(target)
394 result = instance.run()
395 results.setdefault(result, []).append(name)
396 count += 1
397 if result not in good_results and parsed.fail_fast:
398 break
399
400 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
401
402 result = 0
403 for key, value in results.iteritems():
404 print "%d tests returned %s" % (len(value), key)
405 if key not in good_results:
406 result = 1
407 for test in value:
408 print " ", test
409
410 return result
411
412 def add_test_run_options(parser):
413
414 parser.add_argument("--fail-fast", "-f", action="store_true",
415 help="Exit as soon as any test fails.")
416 parser.add_argument("test", nargs='*',
417 help="Run only tests that are named here.")
418 parser.add_argument("--gdb",
419 help="The command to use to start gdb.")
420 parser.add_argument("--misaval",
421 help="Don't run ExamineTarget, just assume the misa value which is specified.")
422
423 def header(title, dash='-'):
424 if title:
425 dashes = dash * (36 - len(title))
426 before = dashes[:len(dashes)/2]
427 after = dashes[len(dashes)/2:]
428 print "%s[ %s ]%s" % (before, title, after)
429 else:
430 print dash * 40
431
432 def print_log(path):
433 header(path)
434 lines = open(path, "r").readlines()
435 if len(lines) > 1000:
436 for l in lines[:500]:
437 sys.stdout.write(l)
438 print "..."
439 for l in lines[-500:]:
440 sys.stdout.write(l)
441 else:
442 for l in lines:
443 sys.stdout.write(l)
444
445 class BaseTest(object):
446 compiled = {}
447
448 def __init__(self, target):
449 self.target = target
450 self.server = None
451 self.target_process = None
452 self.binary = None
453 self.start = 0
454 self.logs = []
455
456 def early_applicable(self):
457 """Return a false value if the test has determined it cannot run
458 without ever needing to talk to the target or server."""
459 # pylint: disable=no-self-use
460 return True
461
462 def setup(self):
463 pass
464
465 def compile(self):
466 compile_args = getattr(self, 'compile_args', None)
467 if compile_args:
468 if compile_args not in BaseTest.compiled:
469 # pylint: disable=star-args
470 BaseTest.compiled[compile_args] = \
471 self.target.compile(*compile_args)
472 self.binary = BaseTest.compiled.get(compile_args)
473
474 def classSetup(self):
475 self.compile()
476 self.target_process = self.target.target()
477 self.server = self.target.server()
478 self.logs.append(self.server.logname)
479
480 def classTeardown(self):
481 del self.server
482 del self.target_process
483
484 def run(self):
485 """
486 If compile_args is set, compile a program and set self.binary.
487
488 Call setup().
489
490 Then call test() and return the result, displaying relevant information
491 if an exception is raised.
492 """
493
494 print "Running", type(self).__name__, "...",
495 sys.stdout.flush()
496
497 if not self.early_applicable():
498 print "not_applicable"
499 return "not_applicable"
500
501 self.start = time.time()
502
503 self.classSetup()
504
505 try:
506 self.setup()
507 result = self.test() # pylint: disable=no-member
508 except TestNotApplicable:
509 result = "not_applicable"
510 except Exception as e: # pylint: disable=broad-except
511 if isinstance(e, TestFailed):
512 result = "fail"
513 else:
514 result = "exception"
515 print "%s in %.2fs" % (result, time.time() - self.start)
516 print "=" * 40
517 if isinstance(e, TestFailed):
518 header("Message")
519 print e.message
520 header("Traceback")
521 traceback.print_exc(file=sys.stdout)
522 for log in self.logs:
523 print_log(log)
524 print "/" * 40
525 return result
526
527 finally:
528 self.classTeardown()
529
530 if not result:
531 result = 'pass'
532 print "%s in %.2fs" % (result, time.time() - self.start)
533 return result
534
535 gdb_cmd = None
536 class GdbTest(BaseTest):
537 def __init__(self, target):
538 BaseTest.__init__(self, target)
539 self.gdb = None
540
541 def classSetup(self):
542 BaseTest.classSetup(self)
543 self.logs.append("gdb.log")
544
545 if gdb_cmd:
546 self.gdb = Gdb(gdb_cmd)
547 else:
548 self.gdb = Gdb()
549
550 if self.binary:
551 self.gdb.command("file %s" % self.binary)
552 if self.target:
553 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
554 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
555 if self.server.port:
556 self.gdb.command(
557 "target extended-remote localhost:%d" % self.server.port)
558
559 # FIXME: OpenOCD doesn't handle PRIV now
560 #self.gdb.p("$priv=3")
561
562 def classTeardown(self):
563 del self.gdb
564 BaseTest.classTeardown(self)
565
566 class ExamineTarget(GdbTest):
567 def test(self):
568 self.target.misa = self.gdb.p("$misa")
569
570 txt = "RV"
571 if (self.target.misa >> 30) == 1:
572 txt += "32"
573 elif (self.target.misa >> 62) == 2:
574 txt += "64"
575 elif (self.target.misa >> 126) == 3:
576 txt += "128"
577 else:
578 txt += "??"
579
580 for i in range(26):
581 if self.target.misa & (1<<i):
582 txt += chr(i + ord('A'))
583 print txt,
584
585 class TestFailed(Exception):
586 def __init__(self, message):
587 Exception.__init__(self)
588 self.message = message
589
590 class TestNotApplicable(Exception):
591 def __init__(self, message):
592 Exception.__init__(self)
593 self.message = message
594
595 def assertEqual(a, b):
596 if a != b:
597 raise TestFailed("%r != %r" % (a, b))
598
599 def assertNotEqual(a, b):
600 if a == b:
601 raise TestFailed("%r == %r" % (a, b))
602
603 def assertIn(a, b):
604 if a not in b:
605 raise TestFailed("%r not in %r" % (a, b))
606
607 def assertNotIn(a, b):
608 if a in b:
609 raise TestFailed("%r in %r" % (a, b))
610
611 def assertGreater(a, b):
612 if not a > b:
613 raise TestFailed("%r not greater than %r" % (a, b))
614
615 def assertLess(a, b):
616 if not a < b:
617 raise TestFailed("%r not less than %r" % (a, b))
618
619 def assertTrue(a):
620 if not a:
621 raise TestFailed("%r is not True" % a)
622
623 def assertRegexpMatches(text, regexp):
624 if not re.search(regexp, text):
625 raise TestFailed("can't find %r in %r" % (regexp, text))