debug: Allow skipping the ExamineTarget step by specifying misa
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
23 cmd = [cc, "-g"]
24 if (xlen == 32):
25 cmd.append("-march=rv32imac")
26 cmd.append("-mabi=ilp32")
27 else:
28 cmd.append("-march=rv64imac")
29 cmd.append("-mabi=lp64")
30 for arg in args:
31 found = find_file(arg)
32 if found:
33 cmd.append(found)
34 else:
35 cmd.append(arg)
36 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
37 stderr=subprocess.PIPE)
38 stdout, stderr = process.communicate()
39 if process.returncode:
40 print
41 header("Compile failed")
42 print "+", " ".join(cmd)
43 print stdout,
44 print stderr,
45 header("")
46 raise Exception("Compile failed!")
47
48 def unused_port():
49 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
50 import socket
51 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
52 s.bind(("", 0))
53 port = s.getsockname()[1]
54 s.close()
55 return port
56
57 class Spike(object):
58 logname = "spike.log"
59
60 def __init__(self, sim_cmd, binary=None, halted=False, with_jtag_gdb=True,
61 timeout=None, xlen=64):
62 """Launch spike. Return tuple of its process and the port it's running
63 on."""
64 if sim_cmd:
65 cmd = shlex.split(sim_cmd)
66 else:
67 spike = os.path.expandvars("$RISCV/bin/spike")
68 cmd = [spike]
69 if xlen == 32:
70 cmd += ["--isa", "RV32G"]
71 else:
72 cmd += ["--isa", "RV64G"]
73
74 if timeout:
75 cmd = ["timeout", str(timeout)] + cmd
76
77 cmd += ["-m0x10000000:0x10000000"]
78
79 if halted:
80 cmd.append('-H')
81 if with_jtag_gdb:
82 cmd += ['--rbb-port', '0']
83 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
84 cmd.append('programs/infinite_loop')
85 if binary:
86 cmd.append(binary)
87 logfile = open(self.logname, "w")
88 logfile.write("+ %s\n" % " ".join(cmd))
89 logfile.flush()
90 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
91 stdout=logfile, stderr=logfile)
92
93 if with_jtag_gdb:
94 self.port = None
95 for _ in range(30):
96 m = re.search(r"Listening for remote bitbang connection on "
97 r"port (\d+).", open(self.logname).read())
98 if m:
99 self.port = int(m.group(1))
100 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
101 break
102 time.sleep(0.11)
103 assert self.port, "Didn't get spike message about bitbang " \
104 "connection"
105
106 def __del__(self):
107 try:
108 self.process.kill()
109 self.process.wait()
110 except OSError:
111 pass
112
113 def wait(self, *args, **kwargs):
114 return self.process.wait(*args, **kwargs)
115
116 class VcsSim(object):
117 def __init__(self, sim_cmd=None, debug=False):
118 if sim_cmd:
119 cmd = shlex.split(sim_cmd)
120 else:
121 cmd = ["simv"]
122 cmd += ["+jtag_vpi_enable"]
123 if debug:
124 cmd[0] = cmd[0] + "-debug"
125 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
126 logfile = open("simv.log", "w")
127 logfile.write("+ %s\n" % " ".join(cmd))
128 logfile.flush()
129 listenfile = open("simv.log", "r")
130 listenfile.seek(0, 2)
131 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
132 stdout=logfile, stderr=logfile)
133 done = False
134 while not done:
135 line = listenfile.readline()
136 if not line:
137 time.sleep(1)
138 match = re.match(r"^Listening on port (\d+)$", line)
139 if match:
140 done = True
141 self.port = int(match.group(1))
142 os.environ['JTAG_VPI_PORT'] = str(self.port)
143
144 def __del__(self):
145 try:
146 self.process.kill()
147 self.process.wait()
148 except OSError:
149 pass
150
151 class Openocd(object):
152 logname = "openocd.log"
153
154 def __init__(self, server_cmd=None, config=None, debug=False):
155 if server_cmd:
156 cmd = shlex.split(server_cmd)
157 else:
158 openocd = os.path.expandvars("$RISCV/bin/riscv-openocd")
159 cmd = [openocd]
160 if (debug):
161 cmd.append("-d")
162
163 # This command needs to come before any config scripts on the command
164 # line, since they are executed in order.
165 cmd += [
166 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
167 "--command",
168 "gdb_port 0",
169 # Disable tcl and telnet servers, since they are unused and because
170 # the port numbers will conflict if multiple OpenOCD processes are
171 # running on the same server.
172 "--command",
173 "tcl_port disabled",
174 "--command",
175 "telnet_port disabled",
176 ]
177
178 if config:
179 f = find_file(config)
180 if f is None:
181 print("Unable to read file " + config)
182 exit(1)
183
184 cmd += ["-f", f]
185 if debug:
186 cmd.append("-d")
187
188 logfile = open(Openocd.logname, "w")
189 logfile.write("+ %s\n" % " ".join(cmd))
190 logfile.flush()
191 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
192 stdout=logfile, stderr=logfile)
193
194 # Wait for OpenOCD to have made it through riscv_examine(). When using
195 # OpenOCD to communicate with a simulator this may take a long time,
196 # and gdb will time out when trying to connect if we attempt too early.
197 start = time.time()
198 messaged = False
199 while True:
200 log = open(Openocd.logname).read()
201 if "Ready for Remote Connections" in log:
202 break
203 if not self.process.poll() is None:
204 raise Exception(
205 "OpenOCD exited before completing riscv_examine()")
206 if not messaged and time.time() - start > 1:
207 messaged = True
208 print "Waiting for OpenOCD to examine RISCV core..."
209
210 self.port = self._get_gdb_server_port()
211
212 def _get_gdb_server_port(self):
213 """Get port that OpenOCD's gdb server is listening on."""
214 MAX_ATTEMPTS = 50
215 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
216 for _ in range(MAX_ATTEMPTS):
217 with open(os.devnull, 'w') as devnull:
218 try:
219 output = subprocess.check_output([
220 'lsof',
221 '-a', # Take the AND of the following selectors
222 '-p{}'.format(self.process.pid), # Filter on PID
223 '-iTCP', # Filter only TCP sockets
224 ], stderr=devnull)
225 except subprocess.CalledProcessError:
226 output = ""
227 matches = list(PORT_REGEX.finditer(output))
228 matches = [m for m in matches
229 if m.group('port') not in ('6666', '4444')]
230 if len(matches) > 1:
231 print output
232 raise Exception(
233 "OpenOCD listening on multiple ports. Cannot uniquely "
234 "identify gdb server port.")
235 elif matches:
236 [match] = matches
237 return int(match.group('port'))
238 time.sleep(1)
239 raise Exception("Timed out waiting for gdb server to obtain port.")
240
241 def __del__(self):
242 try:
243 self.process.kill()
244 self.process.wait()
245 except OSError:
246 pass
247
248 class OpenocdCli(object):
249 def __init__(self, port=4444):
250 self.child = pexpect.spawn(
251 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
252 self.child.expect("> ")
253
254 def command(self, cmd):
255 self.child.sendline(cmd)
256 self.child.expect(cmd)
257 self.child.expect("\n")
258 self.child.expect("> ")
259 return self.child.before.strip("\t\r\n \0")
260
261 def reg(self, reg=''):
262 output = self.command("reg %s" % reg)
263 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
264 values = {r: int(v, 0) for r, v in matches}
265 if reg:
266 return values[reg]
267 return values
268
269 def load_image(self, image):
270 output = self.command("load_image %s" % image)
271 if 'invalid ELF file, only 32bits files are supported' in output:
272 raise TestNotApplicable(output)
273
274 class CannotAccess(Exception):
275 def __init__(self, address):
276 Exception.__init__(self)
277 self.address = address
278
279 class Gdb(object):
280 def __init__(self,
281 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
282 self.child = pexpect.spawn(cmd)
283 self.child.logfile = open("gdb.log", "w")
284 self.child.logfile.write("+ %s\n" % cmd)
285 self.wait()
286 self.command("set confirm off")
287 self.command("set width 0")
288 self.command("set height 0")
289 # Force consistency.
290 self.command("set print entry-values no")
291
292 def wait(self):
293 """Wait for prompt."""
294 self.child.expect(r"\(gdb\)")
295
296 def command(self, command, timeout=6000):
297 self.child.sendline(command)
298 self.child.expect("\n", timeout=timeout)
299 self.child.expect(r"\(gdb\)", timeout=timeout)
300 return self.child.before.strip()
301
302 def c(self, wait=True, timeout=-1):
303 if wait:
304 output = self.command("c", timeout=timeout)
305 assert "Continuing" in output
306 return output
307 else:
308 self.child.sendline("c")
309 self.child.expect("Continuing")
310
311 def interrupt(self):
312 self.child.send("\003")
313 self.child.expect(r"\(gdb\)", timeout=6000)
314 return self.child.before.strip()
315
316 def x(self, address, size='w'):
317 output = self.command("x/%s %s" % (size, address))
318 value = int(output.split(':')[1].strip(), 0)
319 return value
320
321 def p_raw(self, obj):
322 output = self.command("p %s" % obj)
323 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
324 if m:
325 raise CannotAccess(int(m.group(1), 0))
326 return output.split('=')[-1].strip()
327
328 def p(self, obj):
329 output = self.command("p/x %s" % obj)
330 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
331 if m:
332 raise CannotAccess(int(m.group(1), 0))
333 value = int(output.split('=')[-1].strip(), 0)
334 return value
335
336 def p_string(self, obj):
337 output = self.command("p %s" % obj)
338 value = shlex.split(output.split('=')[-1].strip())[1]
339 return value
340
341 def stepi(self):
342 output = self.command("stepi")
343 return output
344
345 def load(self):
346 output = self.command("load", timeout=6000)
347 assert "failed" not in output
348 assert "Transfer rate" in output
349
350 def b(self, location):
351 output = self.command("b %s" % location)
352 assert "not defined" not in output
353 assert "Breakpoint" in output
354 return output
355
356 def hbreak(self, location):
357 output = self.command("hbreak %s" % location)
358 assert "not defined" not in output
359 assert "Hardware assisted breakpoint" in output
360 return output
361
362 def run_all_tests(module, target, parsed):
363 good_results = set(('pass', 'not_applicable'))
364
365 start = time.time()
366
367 results = {}
368 count = 0
369
370 global gdb_cmd # pylint: disable=global-statement
371 gdb_cmd = parsed.gdb
372
373 todo = []
374 if (parsed.misa):
375 self.target.misa = parsed.misa
376 else:
377 todo.append(("ExamineTarget", ExamineTarget))
378
379 for name in dir(module):
380 definition = getattr(module, name)
381 if type(definition) == type and hasattr(definition, 'test') and \
382 (not parsed.test or any(test in name for test in parsed.test)):
383 todo.append((name, definition))
384
385 for name, definition in todo:
386 instance = definition(target)
387 result = instance.run()
388 results.setdefault(result, []).append(name)
389 count += 1
390 if result not in good_results and parsed.fail_fast:
391 break
392
393 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
394
395 result = 0
396 for key, value in results.iteritems():
397 print "%d tests returned %s" % (len(value), key)
398 if key not in good_results:
399 result = 1
400 for test in value:
401 print " ", test
402
403 return result
404
405 def add_test_run_options(parser):
406 parser.add_argument("--fail-fast", "-f", action="store_true",
407 help="Exit as soon as any test fails.")
408 parser.add_argument("test", nargs='*',
409 help="Run only tests that are named here.")
410 parser.add_argument("--gdb",
411 help="The command to use to start gdb.")
412 parser.add_argument("--misa", "-m",
413 help="Don't run ExamineTarget, just assume the misa which is specified.")
414
415 def header(title, dash='-'):
416 if title:
417 dashes = dash * (36 - len(title))
418 before = dashes[:len(dashes)/2]
419 after = dashes[len(dashes)/2:]
420 print "%s[ %s ]%s" % (before, title, after)
421 else:
422 print dash * 40
423
424 def print_log(path):
425 header(path)
426 lines = open(path, "r").readlines()
427 if len(lines) > 1000:
428 for l in lines[:500]:
429 sys.stdout.write(l)
430 print "..."
431 for l in lines[-500:]:
432 sys.stdout.write(l)
433 else:
434 for l in lines:
435 sys.stdout.write(l)
436
437 class BaseTest(object):
438 compiled = {}
439
440 def __init__(self, target):
441 self.target = target
442 self.server = None
443 self.target_process = None
444 self.binary = None
445 self.start = 0
446 self.logs = []
447
448 def early_applicable(self):
449 """Return a false value if the test has determined it cannot run
450 without ever needing to talk to the target or server."""
451 # pylint: disable=no-self-use
452 return True
453
454 def setup(self):
455 pass
456
457 def compile(self):
458 compile_args = getattr(self, 'compile_args', None)
459 if compile_args:
460 if compile_args not in BaseTest.compiled:
461 # pylint: disable=star-args
462 BaseTest.compiled[compile_args] = \
463 self.target.compile(*compile_args)
464 self.binary = BaseTest.compiled.get(compile_args)
465
466 def classSetup(self):
467 self.compile()
468 self.target_process = self.target.target()
469 self.server = self.target.server()
470 self.logs.append(self.server.logname)
471
472 def classTeardown(self):
473 del self.server
474 del self.target_process
475
476 def run(self):
477 """
478 If compile_args is set, compile a program and set self.binary.
479
480 Call setup().
481
482 Then call test() and return the result, displaying relevant information
483 if an exception is raised.
484 """
485
486 print "Running", type(self).__name__, "...",
487 sys.stdout.flush()
488
489 if not self.early_applicable():
490 print "not_applicable"
491 return "not_applicable"
492
493 self.start = time.time()
494
495 self.classSetup()
496
497 try:
498 self.setup()
499 result = self.test() # pylint: disable=no-member
500 except TestNotApplicable:
501 result = "not_applicable"
502 except Exception as e: # pylint: disable=broad-except
503 if isinstance(e, TestFailed):
504 result = "fail"
505 else:
506 result = "exception"
507 print "%s in %.2fs" % (result, time.time() - self.start)
508 print "=" * 40
509 if isinstance(e, TestFailed):
510 header("Message")
511 print e.message
512 header("Traceback")
513 traceback.print_exc(file=sys.stdout)
514 for log in self.logs:
515 print_log(log)
516 print "/" * 40
517 return result
518
519 finally:
520 self.classTeardown()
521
522 if not result:
523 result = 'pass'
524 print "%s in %.2fs" % (result, time.time() - self.start)
525 return result
526
527 gdb_cmd = None
528 class GdbTest(BaseTest):
529 def __init__(self, target):
530 BaseTest.__init__(self, target)
531 self.gdb = None
532
533 def classSetup(self):
534 BaseTest.classSetup(self)
535 self.logs.append("gdb.log")
536
537 if gdb_cmd:
538 self.gdb = Gdb(gdb_cmd)
539 else:
540 self.gdb = Gdb()
541
542 if self.binary:
543 self.gdb.command("file %s" % self.binary)
544 if self.target:
545 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
546 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
547 if self.server.port:
548 self.gdb.command(
549 "target extended-remote localhost:%d" % self.server.port)
550
551 # FIXME: OpenOCD doesn't handle PRIV now
552 #self.gdb.p("$priv=3")
553
554 def classTeardown(self):
555 del self.gdb
556 BaseTest.classTeardown(self)
557
558 class ExamineTarget(GdbTest):
559 def test(self):
560 self.target.misa = self.gdb.p("$misa")
561
562 txt = "RV"
563 if (self.target.misa >> 30) == 1:
564 txt += "32"
565 elif (self.target.misa >> 62) == 2:
566 txt += "64"
567 elif (self.target.misa >> 126) == 3:
568 txt += "128"
569 else:
570 txt += "??"
571
572 for i in range(26):
573 if self.target.misa & (1<<i):
574 txt += chr(i + ord('A'))
575 print txt,
576
577 class TestFailed(Exception):
578 def __init__(self, message):
579 Exception.__init__(self)
580 self.message = message
581
582 class TestNotApplicable(Exception):
583 def __init__(self, message):
584 Exception.__init__(self)
585 self.message = message
586
587 def assertEqual(a, b):
588 if a != b:
589 raise TestFailed("%r != %r" % (a, b))
590
591 def assertNotEqual(a, b):
592 if a == b:
593 raise TestFailed("%r == %r" % (a, b))
594
595 def assertIn(a, b):
596 if a not in b:
597 raise TestFailed("%r not in %r" % (a, b))
598
599 def assertNotIn(a, b):
600 if a in b:
601 raise TestFailed("%r in %r" % (a, b))
602
603 def assertGreater(a, b):
604 if not a > b:
605 raise TestFailed("%r not greater than %r" % (a, b))
606
607 def assertLess(a, b):
608 if not a < b:
609 raise TestFailed("%r not less than %r" % (a, b))
610
611 def assertTrue(a):
612 if not a:
613 raise TestFailed("%r is not True" % a)
614
615 def assertRegexpMatches(text, regexp):
616 if not re.search(regexp, text):
617 raise TestFailed("can't find %r in %r" % (regexp, text))