Disable another PRIV mention, for now
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
23 cmd = [cc, "-g"]
24 if (xlen == 32):
25 cmd.append("-march=rv32imac")
26 cmd.append("-mabi=ilp32")
27 else:
28 cmd.append("-march=rv64imac")
29 cmd.append("-mabi=lp64")
30 for arg in args:
31 found = find_file(arg)
32 if found:
33 cmd.append(found)
34 else:
35 cmd.append(arg)
36 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
37 stderr=subprocess.PIPE)
38 stdout, stderr = process.communicate()
39 if process.returncode:
40 print
41 header("Compile failed")
42 print "+", " ".join(cmd)
43 print stdout,
44 print stderr,
45 header("")
46 raise Exception("Compile failed!")
47
48 def unused_port():
49 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
50 import socket
51 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
52 s.bind(("", 0))
53 port = s.getsockname()[1]
54 s.close()
55 return port
56
57 class Spike(object):
58 logname = "spike.log"
59
60 def __init__(self, sim_cmd, binary=None, halted=False, with_jtag_gdb=True,
61 timeout=None, xlen=64):
62 """Launch spike. Return tuple of its process and the port it's running
63 on."""
64 if sim_cmd:
65 cmd = shlex.split(sim_cmd)
66 else:
67 spike = os.path.expandvars("$RISCV/bin/spike")
68 cmd = [spike]
69 if xlen == 32:
70 cmd += ["--isa", "RV32G"]
71 else:
72 cmd += ["--isa", "RV64G"]
73
74 if timeout:
75 cmd = ["timeout", str(timeout)] + cmd
76
77 if halted:
78 cmd.append('-H')
79 if with_jtag_gdb:
80 cmd += ['--rbb-port', '0']
81 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
82 cmd.append('programs/infinite_loop')
83 if binary:
84 cmd.append(binary)
85 logfile = open(self.logname, "w")
86 logfile.write("+ %s\n" % " ".join(cmd))
87 logfile.flush()
88 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
89 stdout=logfile, stderr=logfile)
90
91 if with_jtag_gdb:
92 self.port = None
93 for _ in range(30):
94 m = re.search(r"Listening for remote bitbang connection on "
95 r"port (\d+).", open(self.logname).read())
96 if m:
97 self.port = int(m.group(1))
98 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
99 break
100 time.sleep(0.11)
101 assert self.port, "Didn't get spike message about bitbang " \
102 "connection"
103
104 def __del__(self):
105 try:
106 self.process.kill()
107 self.process.wait()
108 except OSError:
109 pass
110
111 def wait(self, *args, **kwargs):
112 return self.process.wait(*args, **kwargs)
113
114 class VcsSim(object):
115 def __init__(self, sim_cmd=None, debug=False):
116 if sim_cmd:
117 cmd = shlex.split(sim_cmd)
118 else:
119 cmd = ["simv"]
120 cmd += ["+jtag_vpi_enable"]
121 if debug:
122 cmd[0] = cmd[0] + "-debug"
123 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
124 logfile = open("simv.log", "w")
125 logfile.write("+ %s\n" % " ".join(cmd))
126 logfile.flush()
127 listenfile = open("simv.log", "r")
128 listenfile.seek(0, 2)
129 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
130 stdout=logfile, stderr=logfile)
131 done = False
132 while not done:
133 line = listenfile.readline()
134 if not line:
135 time.sleep(1)
136 match = re.match(r"^Listening on port (\d+)$", line)
137 if match:
138 done = True
139 self.port = int(match.group(1))
140 os.environ['JTAG_VPI_PORT'] = str(self.port)
141
142 def __del__(self):
143 try:
144 self.process.kill()
145 self.process.wait()
146 except OSError:
147 pass
148
149 class Openocd(object):
150 logname = "openocd.log"
151
152 def __init__(self, server_cmd=None, config=None, debug=False):
153 if server_cmd:
154 cmd = shlex.split(server_cmd)
155 else:
156 openocd = os.path.expandvars("$RISCV/bin/riscv-openocd")
157 cmd = [openocd]
158 if (debug):
159 cmd.append("-d")
160
161 # This command needs to come before any config scripts on the command
162 # line, since they are executed in order.
163 cmd += [
164 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
165 "--command",
166 "gdb_port 0",
167 # Disable tcl and telnet servers, since they are unused and because
168 # the port numbers will conflict if multiple OpenOCD processes are
169 # running on the same server.
170 "--command",
171 "tcl_port disabled",
172 "--command",
173 "telnet_port disabled",
174 ]
175
176 if config:
177 f = find_file(config)
178 if f is None:
179 print("Unable to read file " + config)
180 exit(1)
181
182 cmd += ["-f", f]
183 if debug:
184 cmd.append("-d")
185
186 logfile = open(Openocd.logname, "w")
187 logfile.write("+ %s\n" % " ".join(cmd))
188 logfile.flush()
189 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
190 stdout=logfile, stderr=logfile)
191
192 # Wait for OpenOCD to have made it through riscv_examine(). When using
193 # OpenOCD to communicate with a simulator this may take a long time,
194 # and gdb will time out when trying to connect if we attempt too early.
195 start = time.time()
196 messaged = False
197 while True:
198 log = open(Openocd.logname).read()
199 if "Ready for Remote Connections" in log:
200 break
201 if not self.process.poll() is None:
202 raise Exception(
203 "OpenOCD exited before completing riscv_examine()")
204 if not messaged and time.time() - start > 1:
205 messaged = True
206 print "Waiting for OpenOCD to examine RISCV core..."
207
208 self.port = self._get_gdb_server_port()
209
210 def _get_gdb_server_port(self):
211 """Get port that OpenOCD's gdb server is listening on."""
212 MAX_ATTEMPTS = 50
213 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
214 for _ in range(MAX_ATTEMPTS):
215 with open(os.devnull, 'w') as devnull:
216 try:
217 output = subprocess.check_output([
218 'lsof',
219 '-a', # Take the AND of the following selectors
220 '-p{}'.format(self.process.pid), # Filter on PID
221 '-iTCP', # Filter only TCP sockets
222 ], stderr=devnull)
223 except subprocess.CalledProcessError:
224 output = ""
225 matches = list(PORT_REGEX.finditer(output))
226 matches = [m for m in matches
227 if m.group('port') not in ('6666', '4444')]
228 if len(matches) > 1:
229 print output
230 raise Exception(
231 "OpenOCD listening on multiple ports. Cannot uniquely "
232 "identify gdb server port.")
233 elif matches:
234 [match] = matches
235 return int(match.group('port'))
236 time.sleep(1)
237 raise Exception("Timed out waiting for gdb server to obtain port.")
238
239 def __del__(self):
240 try:
241 self.process.kill()
242 self.process.wait()
243 except OSError:
244 pass
245
246 class OpenocdCli(object):
247 def __init__(self, port=4444):
248 self.child = pexpect.spawn(
249 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
250 self.child.expect("> ")
251
252 def command(self, cmd):
253 self.child.sendline(cmd)
254 self.child.expect(cmd)
255 self.child.expect("\n")
256 self.child.expect("> ")
257 return self.child.before.strip("\t\r\n \0")
258
259 def reg(self, reg=''):
260 output = self.command("reg %s" % reg)
261 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
262 values = {r: int(v, 0) for r, v in matches}
263 if reg:
264 return values[reg]
265 return values
266
267 def load_image(self, image):
268 output = self.command("load_image %s" % image)
269 if 'invalid ELF file, only 32bits files are supported' in output:
270 raise TestNotApplicable(output)
271
272 class CannotAccess(Exception):
273 def __init__(self, address):
274 Exception.__init__(self)
275 self.address = address
276
277 class Gdb(object):
278 def __init__(self,
279 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
280 self.child = pexpect.spawn(cmd)
281 self.child.logfile = open("gdb.log", "w")
282 self.child.logfile.write("+ %s\n" % cmd)
283 self.wait()
284 self.command("set confirm off")
285 self.command("set width 0")
286 self.command("set height 0")
287 # Force consistency.
288 self.command("set print entry-values no")
289
290 def wait(self):
291 """Wait for prompt."""
292 self.child.expect(r"\(gdb\)")
293
294 def command(self, command, timeout=6000):
295 self.child.sendline(command)
296 self.child.expect("\n", timeout=timeout)
297 self.child.expect(r"\(gdb\)", timeout=timeout)
298 return self.child.before.strip()
299
300 def c(self, wait=True, timeout=-1):
301 if wait:
302 output = self.command("c", timeout=timeout)
303 assert "Continuing" in output
304 return output
305 else:
306 self.child.sendline("c")
307 self.child.expect("Continuing")
308
309 def interrupt(self):
310 self.child.send("\003")
311 self.child.expect(r"\(gdb\)", timeout=6000)
312 return self.child.before.strip()
313
314 def x(self, address, size='w'):
315 output = self.command("x/%s %s" % (size, address))
316 value = int(output.split(':')[1].strip(), 0)
317 return value
318
319 def p_raw(self, obj):
320 output = self.command("p %s" % obj)
321 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
322 if m:
323 raise CannotAccess(int(m.group(1), 0))
324 return output.split('=')[-1].strip()
325
326 def p(self, obj):
327 output = self.command("p/x %s" % obj)
328 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
329 if m:
330 raise CannotAccess(int(m.group(1), 0))
331 value = int(output.split('=')[-1].strip(), 0)
332 return value
333
334 def p_string(self, obj):
335 output = self.command("p %s" % obj)
336 value = shlex.split(output.split('=')[-1].strip())[1]
337 return value
338
339 def stepi(self):
340 output = self.command("stepi")
341 return output
342
343 def load(self):
344 output = self.command("load", timeout=6000)
345 assert "failed" not in output
346 assert "Transfer rate" in output
347
348 def b(self, location):
349 output = self.command("b %s" % location)
350 assert "not defined" not in output
351 assert "Breakpoint" in output
352 return output
353
354 def hbreak(self, location):
355 output = self.command("hbreak %s" % location)
356 assert "not defined" not in output
357 assert "Hardware assisted breakpoint" in output
358 return output
359
360 def run_all_tests(module, target, parsed):
361 good_results = set(('pass', 'not_applicable'))
362
363 start = time.time()
364
365 results = {}
366 count = 0
367
368 global gdb_cmd # pylint: disable=global-statement
369 gdb_cmd = parsed.gdb
370
371 todo = [("ExamineTarget", ExamineTarget)]
372 for name in dir(module):
373 definition = getattr(module, name)
374 if type(definition) == type and hasattr(definition, 'test') and \
375 (not parsed.test or any(test in name for test in parsed.test)):
376 todo.append((name, definition))
377
378 for name, definition in todo:
379 instance = definition(target)
380 result = instance.run()
381 results.setdefault(result, []).append(name)
382 count += 1
383 if result not in good_results and parsed.fail_fast:
384 break
385
386 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
387
388 result = 0
389 for key, value in results.iteritems():
390 print "%d tests returned %s" % (len(value), key)
391 if key not in good_results:
392 result = 1
393 for test in value:
394 print " ", test
395
396 return result
397
398 def add_test_run_options(parser):
399 parser.add_argument("--fail-fast", "-f", action="store_true",
400 help="Exit as soon as any test fails.")
401 parser.add_argument("test", nargs='*',
402 help="Run only tests that are named here.")
403 parser.add_argument("--gdb",
404 help="The command to use to start gdb.")
405
406 def header(title, dash='-'):
407 if title:
408 dashes = dash * (36 - len(title))
409 before = dashes[:len(dashes)/2]
410 after = dashes[len(dashes)/2:]
411 print "%s[ %s ]%s" % (before, title, after)
412 else:
413 print dash * 40
414
415 def print_log(path):
416 header(path)
417 lines = open(path, "r").readlines()
418 if len(lines) > 1000:
419 for l in lines[:500]:
420 sys.stdout.write(l)
421 print "..."
422 for l in lines[-500:]:
423 sys.stdout.write(l)
424 else:
425 for l in lines:
426 sys.stdout.write(l)
427
428 class BaseTest(object):
429 compiled = {}
430
431 def __init__(self, target):
432 self.target = target
433 self.server = None
434 self.target_process = None
435 self.binary = None
436 self.start = 0
437 self.logs = []
438
439 def early_applicable(self):
440 """Return a false value if the test has determined it cannot run
441 without ever needing to talk to the target or server."""
442 # pylint: disable=no-self-use
443 return True
444
445 def setup(self):
446 pass
447
448 def compile(self):
449 compile_args = getattr(self, 'compile_args', None)
450 if compile_args:
451 if compile_args not in BaseTest.compiled:
452 # pylint: disable=star-args
453 BaseTest.compiled[compile_args] = \
454 self.target.compile(*compile_args)
455 self.binary = BaseTest.compiled.get(compile_args)
456
457 def classSetup(self):
458 self.compile()
459 self.target_process = self.target.target()
460 self.server = self.target.server()
461 self.logs.append(self.server.logname)
462
463 def classTeardown(self):
464 del self.server
465 del self.target_process
466
467 def run(self):
468 """
469 If compile_args is set, compile a program and set self.binary.
470
471 Call setup().
472
473 Then call test() and return the result, displaying relevant information
474 if an exception is raised.
475 """
476
477 print "Running", type(self).__name__, "...",
478 sys.stdout.flush()
479
480 if not self.early_applicable():
481 print "not_applicable"
482 return "not_applicable"
483
484 self.start = time.time()
485
486 self.classSetup()
487
488 try:
489 self.setup()
490 result = self.test() # pylint: disable=no-member
491 except TestNotApplicable:
492 result = "not_applicable"
493 except Exception as e: # pylint: disable=broad-except
494 if isinstance(e, TestFailed):
495 result = "fail"
496 else:
497 result = "exception"
498 print "%s in %.2fs" % (result, time.time() - self.start)
499 print "=" * 40
500 if isinstance(e, TestFailed):
501 header("Message")
502 print e.message
503 header("Traceback")
504 traceback.print_exc(file=sys.stdout)
505 for log in self.logs:
506 print_log(log)
507 print "/" * 40
508 return result
509
510 finally:
511 self.classTeardown()
512
513 if not result:
514 result = 'pass'
515 print "%s in %.2fs" % (result, time.time() - self.start)
516 return result
517
518 gdb_cmd = None
519 class GdbTest(BaseTest):
520 def __init__(self, target):
521 BaseTest.__init__(self, target)
522 self.gdb = None
523
524 def classSetup(self):
525 BaseTest.classSetup(self)
526 self.logs.append("gdb.log")
527
528 if gdb_cmd:
529 self.gdb = Gdb(gdb_cmd)
530 else:
531 self.gdb = Gdb()
532
533 if self.binary:
534 self.gdb.command("file %s" % self.binary)
535 if self.target:
536 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
537 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
538 if self.server.port:
539 self.gdb.command(
540 "target extended-remote localhost:%d" % self.server.port)
541
542 # FIXME: OpenOCD doesn't handle PRIV now
543 #self.gdb.p("$priv=3")
544
545 def classTeardown(self):
546 del self.gdb
547 BaseTest.classTeardown(self)
548
549 class ExamineTarget(GdbTest):
550 def test(self):
551 self.target.misa = self.gdb.p("$misa")
552
553 txt = "RV"
554 if (self.target.misa >> 30) == 1:
555 txt += "32"
556 elif (self.target.misa >> 62) == 2:
557 txt += "64"
558 elif (self.target.misa >> 126) == 3:
559 txt += "128"
560 else:
561 txt += "??"
562
563 for i in range(26):
564 if self.target.misa & (1<<i):
565 txt += chr(i + ord('A'))
566 print txt,
567
568 class TestFailed(Exception):
569 def __init__(self, message):
570 Exception.__init__(self)
571 self.message = message
572
573 class TestNotApplicable(Exception):
574 def __init__(self, message):
575 Exception.__init__(self)
576 self.message = message
577
578 def assertEqual(a, b):
579 if a != b:
580 raise TestFailed("%r != %r" % (a, b))
581
582 def assertNotEqual(a, b):
583 if a == b:
584 raise TestFailed("%r == %r" % (a, b))
585
586 def assertIn(a, b):
587 if a not in b:
588 raise TestFailed("%r not in %r" % (a, b))
589
590 def assertNotIn(a, b):
591 if a in b:
592 raise TestFailed("%r in %r" % (a, b))
593
594 def assertGreater(a, b):
595 if not a > b:
596 raise TestFailed("%r not greater than %r" % (a, b))
597
598 def assertLess(a, b):
599 if not a < b:
600 raise TestFailed("%r not less than %r" % (a, b))
601
602 def assertTrue(a):
603 if not a:
604 raise TestFailed("%r is not True" % a)
605
606 def assertRegexpMatches(text, regexp):
607 if not re.search(regexp, text):
608 raise TestFailed("can't find %r in %r" % (regexp, text))