Don't rely on Spike's default ISA
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
23 cmd = [cc, "-g"]
24 if (xlen == 32):
25 cmd.append("-march=rv32imac")
26 cmd.append("-mabi=ilp32")
27 else:
28 cmd.append("-march=rv64imac")
29 cmd.append("-mabi=lp64")
30 for arg in args:
31 found = find_file(arg)
32 if found:
33 cmd.append(found)
34 else:
35 cmd.append(arg)
36 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
37 stderr=subprocess.PIPE)
38 stdout, stderr = process.communicate()
39 if process.returncode:
40 print
41 header("Compile failed")
42 print "+", " ".join(cmd)
43 print stdout,
44 print stderr,
45 header("")
46 raise Exception("Compile failed!")
47
48 def unused_port():
49 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
50 import socket
51 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
52 s.bind(("", 0))
53 port = s.getsockname()[1]
54 s.close()
55 return port
56
57 class Spike(object):
58 logname = "spike.log"
59
60 def __init__(self, sim_cmd, binary=None, halted=False, with_jtag_gdb=True,
61 timeout=None, xlen=64):
62 """Launch spike. Return tuple of its process and the port it's running
63 on."""
64 if sim_cmd:
65 cmd = shlex.split(sim_cmd)
66 else:
67 spike = os.path.expandvars("$RISCV/bin/spike")
68 cmd = [spike]
69 if xlen == 32:
70 cmd += ["--isa", "RV32G"]
71 else:
72 cmd += ["--isa", "RV64G"]
73
74 if timeout:
75 cmd = ["timeout", str(timeout)] + cmd
76
77 if halted:
78 cmd.append('-H')
79 if with_jtag_gdb:
80 cmd += ['--rbb-port', '0']
81 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
82 cmd.append('programs/infinite_loop')
83 if binary:
84 cmd.append(binary)
85 logfile = open(self.logname, "w")
86 logfile.write("+ %s\n" % " ".join(cmd))
87 logfile.flush()
88 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
89 stdout=logfile, stderr=logfile)
90
91 if with_jtag_gdb:
92 self.port = None
93 for _ in range(30):
94 m = re.search(r"Listening for remote bitbang connection on "
95 r"port (\d+).", open(self.logname).read())
96 if m:
97 self.port = int(m.group(1))
98 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
99 break
100 time.sleep(0.11)
101 assert self.port, "Didn't get spike message about bitbang " \
102 "connection"
103
104 def __del__(self):
105 try:
106 self.process.kill()
107 self.process.wait()
108 except OSError:
109 pass
110
111 def wait(self, *args, **kwargs):
112 return self.process.wait(*args, **kwargs)
113
114 class VcsSim(object):
115 def __init__(self, sim_cmd=None, debug=False):
116 if sim_cmd:
117 cmd = shlex.split(sim_cmd)
118 else:
119 cmd = ["simv"]
120 cmd += ["+jtag_vpi_enable"]
121 if debug:
122 cmd[0] = cmd[0] + "-debug"
123 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
124 logfile = open("simv.log", "w")
125 logfile.write("+ %s\n" % " ".join(cmd))
126 logfile.flush()
127 listenfile = open("simv.log", "r")
128 listenfile.seek(0, 2)
129 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
130 stdout=logfile, stderr=logfile)
131 done = False
132 while not done:
133 line = listenfile.readline()
134 if not line:
135 time.sleep(1)
136 match = re.match(r"^Listening on port (\d+)$", line)
137 if match:
138 done = True
139 self.port = int(match.group(1))
140 os.environ['JTAG_VPI_PORT'] = str(self.port)
141
142 def __del__(self):
143 try:
144 self.process.kill()
145 self.process.wait()
146 except OSError:
147 pass
148
149 class Openocd(object):
150 logname = "openocd.log"
151
152 def __init__(self, server_cmd=None, config=None, debug=False):
153 if server_cmd:
154 cmd = shlex.split(server_cmd)
155 else:
156 openocd = os.path.expandvars("$RISCV/bin/riscv-openocd")
157 cmd = [openocd]
158 if (debug):
159 cmd.append("-d")
160
161 # This command needs to come before any config scripts on the command
162 # line, since they are executed in order.
163 cmd += [
164 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
165 "--command",
166 "gdb_port 0",
167 # Disable tcl and telnet servers, since they are unused and because
168 # the port numbers will conflict if multiple OpenOCD processes are
169 # running on the same server.
170 "--command",
171 "tcl_port disabled",
172 "--command",
173 "telnet_port disabled",
174 ]
175
176 if config:
177 f = find_file(config)
178 if f is None:
179 print("Unable to read file " + config)
180 exit(1)
181
182 cmd += ["-f", f]
183 if debug:
184 cmd.append("-d")
185
186 logfile = open(Openocd.logname, "w")
187 logfile.write("+ %s\n" % " ".join(cmd))
188 logfile.flush()
189 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
190 stdout=logfile, stderr=logfile)
191
192 # Wait for OpenOCD to have made it through riscv_examine(). When using
193 # OpenOCD to communicate with a simulator this may take a long time,
194 # and gdb will time out when trying to connect if we attempt too early.
195 start = time.time()
196 messaged = False
197 while True:
198 log = open(Openocd.logname).read()
199 if "Ready for Remote Connections" in log:
200 break
201 if not self.process.poll() is None:
202 raise Exception(
203 "OpenOCD exited before completing riscv_examine()")
204 if not messaged and time.time() - start > 1:
205 messaged = True
206 print "Waiting for OpenOCD to examine RISCV core..."
207
208 self.port = self._get_gdb_server_port()
209
210 def _get_gdb_server_port(self):
211 """Get port that OpenOCD's gdb server is listening on."""
212 MAX_ATTEMPTS = 50
213 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
214 for _ in range(MAX_ATTEMPTS):
215 with open(os.devnull, 'w') as devnull:
216 try:
217 output = subprocess.check_output([
218 'lsof',
219 '-a', # Take the AND of the following selectors
220 '-p{}'.format(self.process.pid), # Filter on PID
221 '-iTCP', # Filter only TCP sockets
222 ], stderr=devnull)
223 except subprocess.CalledProcessError:
224 output = ""
225 matches = list(PORT_REGEX.finditer(output))
226 matches = [m for m in matches
227 if m.group('port') not in ('6666', '4444')]
228 if len(matches) > 1:
229 print output
230 raise Exception(
231 "OpenOCD listening on multiple ports. Cannot uniquely "
232 "identify gdb server port.")
233 elif matches:
234 [match] = matches
235 return int(match.group('port'))
236 time.sleep(1)
237 raise Exception("Timed out waiting for gdb server to obtain port.")
238
239 def __del__(self):
240 try:
241 self.process.kill()
242 self.process.wait()
243 except OSError:
244 pass
245
246 class OpenocdCli(object):
247 def __init__(self, port=4444):
248 self.child = pexpect.spawn(
249 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
250 self.child.expect("> ")
251
252 def command(self, cmd):
253 self.child.sendline(cmd)
254 self.child.expect(cmd)
255 self.child.expect("\n")
256 self.child.expect("> ")
257 return self.child.before.strip("\t\r\n \0")
258
259 def reg(self, reg=''):
260 output = self.command("reg %s" % reg)
261 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
262 values = {r: int(v, 0) for r, v in matches}
263 if reg:
264 return values[reg]
265 return values
266
267 def load_image(self, image):
268 output = self.command("load_image %s" % image)
269 if 'invalid ELF file, only 32bits files are supported' in output:
270 raise TestNotApplicable(output)
271
272 class CannotAccess(Exception):
273 def __init__(self, address):
274 Exception.__init__(self)
275 self.address = address
276
277 class Gdb(object):
278 def __init__(self,
279 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
280 self.child = pexpect.spawn(cmd)
281 self.child.logfile = open("gdb.log", "w")
282 self.child.logfile.write("+ %s\n" % cmd)
283 self.wait()
284 self.command("set confirm off")
285 self.command("set width 0")
286 self.command("set height 0")
287 # Force consistency.
288 self.command("set print entry-values no")
289
290 def wait(self):
291 """Wait for prompt."""
292 self.child.expect(r"\(gdb\)")
293
294 def command(self, command, timeout=6000):
295 self.child.sendline(command)
296 self.child.expect("\n", timeout=timeout)
297 self.child.expect(r"\(gdb\)", timeout=timeout)
298 return self.child.before.strip()
299
300 def c(self, wait=True, timeout=-1):
301 if wait:
302 output = self.command("c", timeout=timeout)
303 assert "Continuing" in output
304 return output
305 else:
306 self.child.sendline("c")
307 self.child.expect("Continuing")
308
309 def interrupt(self):
310 self.child.send("\003")
311 self.child.expect(r"\(gdb\)", timeout=6000)
312 return self.child.before.strip()
313
314 def x(self, address, size='w'):
315 output = self.command("x/%s %s" % (size, address))
316 value = int(output.split(':')[1].strip(), 0)
317 return value
318
319 def p_raw(self, obj):
320 output = self.command("p %s" % obj)
321 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
322 if m:
323 raise CannotAccess(int(m.group(1), 0))
324 return output.split('=')[-1].strip()
325
326 def p(self, obj):
327 output = self.command("p/x %s" % obj)
328 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
329 if m:
330 raise CannotAccess(int(m.group(1), 0))
331 value = int(output.split('=')[-1].strip(), 0)
332 return value
333
334 def p_string(self, obj):
335 output = self.command("p %s" % obj)
336 value = shlex.split(output.split('=')[-1].strip())[1]
337 return value
338
339 def stepi(self):
340 output = self.command("stepi")
341 return output
342
343 def load(self):
344 output = self.command("load", timeout=6000)
345 assert "failed" not in output
346 assert "Transfer rate" in output
347
348 def b(self, location):
349 output = self.command("b %s" % location)
350 assert "not defined" not in output
351 assert "Breakpoint" in output
352 return output
353
354 def hbreak(self, location):
355 output = self.command("hbreak %s" % location)
356 assert "not defined" not in output
357 assert "Hardware assisted breakpoint" in output
358 return output
359
360 def run_all_tests(module, target, parsed):
361 good_results = set(('pass', 'not_applicable'))
362
363 start = time.time()
364
365 results = {}
366 count = 0
367
368 global gdb_cmd # pylint: disable=global-statement
369 gdb_cmd = parsed.gdb
370
371 todo = [("ExamineTarget", ExamineTarget)]
372 for name in dir(module):
373 definition = getattr(module, name)
374 if type(definition) == type and hasattr(definition, 'test') and \
375 (not parsed.test or any(test in name for test in parsed.test)):
376 todo.append((name, definition))
377
378 for name, definition in todo:
379 instance = definition(target)
380 result = instance.run()
381 results.setdefault(result, []).append(name)
382 count += 1
383 if result not in good_results and parsed.fail_fast:
384 break
385
386 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
387
388 result = 0
389 for key, value in results.iteritems():
390 print "%d tests returned %s" % (len(value), key)
391 if key not in good_results:
392 result = 1
393 for test in value:
394 print " ", test
395
396 return result
397
398 def add_test_run_options(parser):
399 parser.add_argument("--fail-fast", "-f", action="store_true",
400 help="Exit as soon as any test fails.")
401 parser.add_argument("test", nargs='*',
402 help="Run only tests that are named here.")
403 parser.add_argument("--gdb",
404 help="The command to use to start gdb.")
405
406 def header(title, dash='-'):
407 if title:
408 dashes = dash * (36 - len(title))
409 before = dashes[:len(dashes)/2]
410 after = dashes[len(dashes)/2:]
411 print "%s[ %s ]%s" % (before, title, after)
412 else:
413 print dash * 40
414
415 def print_log(path):
416 header(path)
417 lines = open(path, "r").readlines()
418 if len(lines) > 1000:
419 for l in lines[:500]:
420 sys.stdout.write(l)
421 print "..."
422 for l in lines[-500:]:
423 sys.stdout.write(l)
424 else:
425 for l in lines:
426 sys.stdout.write(l)
427
428 class BaseTest(object):
429 compiled = {}
430
431 def __init__(self, target):
432 self.target = target
433 self.server = None
434 self.target_process = None
435 self.binary = None
436 self.start = 0
437 self.logs = []
438
439 def early_applicable(self):
440 """Return a false value if the test has determined it cannot run
441 without ever needing to talk to the target or server."""
442 # pylint: disable=no-self-use
443 return True
444
445 def setup(self):
446 pass
447
448 def compile(self):
449 compile_args = getattr(self, 'compile_args', None)
450 if compile_args:
451 if compile_args not in BaseTest.compiled:
452 # pylint: disable=star-args
453 BaseTest.compiled[compile_args] = \
454 self.target.compile(*compile_args)
455 self.binary = BaseTest.compiled.get(compile_args)
456
457 def classSetup(self):
458 self.compile()
459 self.target_process = self.target.target()
460 self.server = self.target.server()
461 self.logs.append(self.server.logname)
462
463 def classTeardown(self):
464 del self.server
465 del self.target_process
466
467 def run(self):
468 """
469 If compile_args is set, compile a program and set self.binary.
470
471 Call setup().
472
473 Then call test() and return the result, displaying relevant information
474 if an exception is raised.
475 """
476
477 print "Running", type(self).__name__, "...",
478 sys.stdout.flush()
479
480 if not self.early_applicable():
481 print "not_applicable"
482 return "not_applicable"
483
484 self.start = time.time()
485
486 self.classSetup()
487
488 try:
489 self.setup()
490 result = self.test() # pylint: disable=no-member
491 except TestNotApplicable:
492 result = "not_applicable"
493 except Exception as e: # pylint: disable=broad-except
494 if isinstance(e, TestFailed):
495 result = "fail"
496 else:
497 result = "exception"
498 print "%s in %.2fs" % (result, time.time() - self.start)
499 print "=" * 40
500 if isinstance(e, TestFailed):
501 header("Message")
502 print e.message
503 header("Traceback")
504 traceback.print_exc(file=sys.stdout)
505 for log in self.logs:
506 print_log(log)
507 print "/" * 40
508 return result
509
510 finally:
511 self.classTeardown()
512
513 if not result:
514 result = 'pass'
515 print "%s in %.2fs" % (result, time.time() - self.start)
516 return result
517
518 gdb_cmd = None
519 class GdbTest(BaseTest):
520 def __init__(self, target):
521 BaseTest.__init__(self, target)
522 self.gdb = None
523
524 def classSetup(self):
525 BaseTest.classSetup(self)
526 self.logs.append("gdb.log")
527
528 if gdb_cmd:
529 self.gdb = Gdb(gdb_cmd)
530 else:
531 self.gdb = Gdb()
532
533 if self.binary:
534 self.gdb.command("file %s" % self.binary)
535 if self.target:
536 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
537 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
538 if self.server.port:
539 self.gdb.command(
540 "target extended-remote localhost:%d" % self.server.port)
541
542 self.gdb.p("$priv=3")
543
544 def classTeardown(self):
545 del self.gdb
546 BaseTest.classTeardown(self)
547
548 class ExamineTarget(GdbTest):
549 def test(self):
550 self.target.misa = self.gdb.p("$misa")
551
552 txt = "RV"
553 if (self.target.misa >> 30) == 1:
554 txt += "32"
555 elif (self.target.misa >> 62) == 2:
556 txt += "64"
557 elif (self.target.misa >> 126) == 3:
558 txt += "128"
559 else:
560 txt += "??"
561
562 for i in range(26):
563 if self.target.misa & (1<<i):
564 txt += chr(i + ord('A'))
565 print txt,
566
567 class TestFailed(Exception):
568 def __init__(self, message):
569 Exception.__init__(self)
570 self.message = message
571
572 class TestNotApplicable(Exception):
573 def __init__(self, message):
574 Exception.__init__(self)
575 self.message = message
576
577 def assertEqual(a, b):
578 if a != b:
579 raise TestFailed("%r != %r" % (a, b))
580
581 def assertNotEqual(a, b):
582 if a == b:
583 raise TestFailed("%r == %r" % (a, b))
584
585 def assertIn(a, b):
586 if a not in b:
587 raise TestFailed("%r not in %r" % (a, b))
588
589 def assertNotIn(a, b):
590 if a in b:
591 raise TestFailed("%r in %r" % (a, b))
592
593 def assertGreater(a, b):
594 if not a > b:
595 raise TestFailed("%r not greater than %r" % (a, b))
596
597 def assertLess(a, b):
598 if not a < b:
599 raise TestFailed("%r not less than %r" % (a, b))
600
601 def assertTrue(a):
602 if not a:
603 raise TestFailed("%r is not True" % a)
604
605 def assertRegexpMatches(text, regexp):
606 if not re.search(regexp, text):
607 raise TestFailed("can't find %r in %r" % (regexp, text))