Merge pull request #47 from riscv/debug-0.13
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
23 cmd = [cc, "-g"]
24 if (xlen == 32):
25 cmd.append("-march=rv32imac")
26 cmd.append("-mabi=ilp32")
27 else:
28 cmd.append("-march=rv64imac")
29 cmd.append("-mabi=lp64")
30 for arg in args:
31 found = find_file(arg)
32 if found:
33 cmd.append(found)
34 else:
35 cmd.append(arg)
36 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
37 stderr=subprocess.PIPE)
38 stdout, stderr = process.communicate()
39 if process.returncode:
40 print
41 header("Compile failed")
42 print "+", " ".join(cmd)
43 print stdout,
44 print stderr,
45 header("")
46 raise Exception("Compile failed!")
47
48 def unused_port():
49 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
50 import socket
51 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
52 s.bind(("", 0))
53 port = s.getsockname()[1]
54 s.close()
55 return port
56
57 class Spike(object):
58 logname = "spike.log"
59
60 def __init__(self, sim_cmd, binary=None, halted=False, with_jtag_gdb=True,
61 timeout=None, xlen=64):
62 """Launch spike. Return tuple of its process and the port it's running
63 on."""
64 if sim_cmd:
65 cmd = shlex.split(sim_cmd)
66 else:
67 spike = os.path.expandvars("$RISCV/bin/spike")
68 cmd = [spike]
69 if xlen == 32:
70 cmd += ["--isa", "RV32G"]
71 else:
72 cmd += ["--isa", "RV64G"]
73
74 if timeout:
75 cmd = ["timeout", str(timeout)] + cmd
76
77 cmd += ["-m0x10000000:0x10000000"]
78
79 if halted:
80 cmd.append('-H')
81 if with_jtag_gdb:
82 cmd += ['--rbb-port', '0']
83 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
84 cmd.append('programs/infinite_loop')
85 if binary:
86 cmd.append(binary)
87 logfile = open(self.logname, "w")
88 logfile.write("+ %s\n" % " ".join(cmd))
89 logfile.flush()
90 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
91 stdout=logfile, stderr=logfile)
92
93 if with_jtag_gdb:
94 self.port = None
95 for _ in range(30):
96 m = re.search(r"Listening for remote bitbang connection on "
97 r"port (\d+).", open(self.logname).read())
98 if m:
99 self.port = int(m.group(1))
100 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
101 break
102 time.sleep(0.11)
103 assert self.port, "Didn't get spike message about bitbang " \
104 "connection"
105
106 def __del__(self):
107 try:
108 self.process.kill()
109 self.process.wait()
110 except OSError:
111 pass
112
113 def wait(self, *args, **kwargs):
114 return self.process.wait(*args, **kwargs)
115
116 class VcsSim(object):
117 def __init__(self, sim_cmd=None, debug=False):
118 if sim_cmd:
119 cmd = shlex.split(sim_cmd)
120 else:
121 cmd = ["simv"]
122 cmd += ["+jtag_vpi_enable"]
123 if debug:
124 cmd[0] = cmd[0] + "-debug"
125 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
126 logfile = open("simv.log", "w")
127 logfile.write("+ %s\n" % " ".join(cmd))
128 logfile.flush()
129 listenfile = open("simv.log", "r")
130 listenfile.seek(0, 2)
131 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
132 stdout=logfile, stderr=logfile)
133 done = False
134 while not done:
135 line = listenfile.readline()
136 if not line:
137 time.sleep(1)
138 match = re.match(r"^Listening on port (\d+)$", line)
139 if match:
140 done = True
141 self.port = int(match.group(1))
142 os.environ['JTAG_VPI_PORT'] = str(self.port)
143
144 def __del__(self):
145 try:
146 self.process.kill()
147 self.process.wait()
148 except OSError:
149 pass
150
151 class Openocd(object):
152 logname = "openocd.log"
153
154 def __init__(self, server_cmd=None, config=None, debug=False):
155 if server_cmd:
156 cmd = shlex.split(server_cmd)
157 else:
158 openocd = os.path.expandvars("$RISCV/bin/riscv-openocd")
159 cmd = [openocd]
160 if (debug):
161 cmd.append("-d")
162
163 # This command needs to come before any config scripts on the command
164 # line, since they are executed in order.
165 cmd += [
166 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
167 "--command",
168 "gdb_port 0",
169 # Disable tcl and telnet servers, since they are unused and because
170 # the port numbers will conflict if multiple OpenOCD processes are
171 # running on the same server.
172 "--command",
173 "tcl_port disabled",
174 "--command",
175 "telnet_port disabled",
176 ]
177
178 if config:
179 f = find_file(config)
180 if f is None:
181 print("Unable to read file " + config)
182 exit(1)
183
184 cmd += ["-f", f]
185 if debug:
186 cmd.append("-d")
187
188 logfile = open(Openocd.logname, "w")
189 logfile.write("+ %s\n" % " ".join(cmd))
190 logfile.flush()
191 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
192 stdout=logfile, stderr=logfile)
193
194 # Wait for OpenOCD to have made it through riscv_examine(). When using
195 # OpenOCD to communicate with a simulator this may take a long time,
196 # and gdb will time out when trying to connect if we attempt too early.
197 start = time.time()
198 messaged = False
199 while True:
200 log = open(Openocd.logname).read()
201 if "Ready for Remote Connections" in log:
202 break
203 if not self.process.poll() is None:
204 raise Exception(
205 "OpenOCD exited before completing riscv_examine()")
206 if not messaged and time.time() - start > 1:
207 messaged = True
208 print "Waiting for OpenOCD to examine RISCV core..."
209
210 self.port = self._get_gdb_server_port()
211
212 def _get_gdb_server_port(self):
213 """Get port that OpenOCD's gdb server is listening on."""
214 MAX_ATTEMPTS = 50
215 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
216 for _ in range(MAX_ATTEMPTS):
217 with open(os.devnull, 'w') as devnull:
218 try:
219 output = subprocess.check_output([
220 'lsof',
221 '-a', # Take the AND of the following selectors
222 '-p{}'.format(self.process.pid), # Filter on PID
223 '-iTCP', # Filter only TCP sockets
224 ], stderr=devnull)
225 except subprocess.CalledProcessError:
226 output = ""
227 matches = list(PORT_REGEX.finditer(output))
228 matches = [m for m in matches
229 if m.group('port') not in ('6666', '4444')]
230 if len(matches) > 1:
231 print output
232 raise Exception(
233 "OpenOCD listening on multiple ports. Cannot uniquely "
234 "identify gdb server port.")
235 elif matches:
236 [match] = matches
237 return int(match.group('port'))
238 time.sleep(1)
239 raise Exception("Timed out waiting for gdb server to obtain port.")
240
241 def __del__(self):
242 try:
243 self.process.kill()
244 self.process.wait()
245 except OSError:
246 pass
247
248 class OpenocdCli(object):
249 def __init__(self, port=4444):
250 self.child = pexpect.spawn(
251 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
252 self.child.expect("> ")
253
254 def command(self, cmd):
255 self.child.sendline(cmd)
256 self.child.expect(cmd)
257 self.child.expect("\n")
258 self.child.expect("> ")
259 return self.child.before.strip("\t\r\n \0")
260
261 def reg(self, reg=''):
262 output = self.command("reg %s" % reg)
263 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
264 values = {r: int(v, 0) for r, v in matches}
265 if reg:
266 return values[reg]
267 return values
268
269 def load_image(self, image):
270 output = self.command("load_image %s" % image)
271 if 'invalid ELF file, only 32bits files are supported' in output:
272 raise TestNotApplicable(output)
273
274 class CannotAccess(Exception):
275 def __init__(self, address):
276 Exception.__init__(self)
277 self.address = address
278
279 class Gdb(object):
280 def __init__(self,
281 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
282 self.child = pexpect.spawn(cmd)
283 self.child.logfile = open("gdb.log", "w")
284 self.child.logfile.write("+ %s\n" % cmd)
285 self.wait()
286 self.command("set confirm off")
287 self.command("set width 0")
288 self.command("set height 0")
289 # Force consistency.
290 self.command("set print entry-values no")
291
292 def wait(self):
293 """Wait for prompt."""
294 self.child.expect(r"\(gdb\)")
295
296 def command(self, command, timeout=6000):
297 self.child.sendline(command)
298 self.child.expect("\n", timeout=timeout)
299 self.child.expect(r"\(gdb\)", timeout=timeout)
300 return self.child.before.strip()
301
302 def c(self, wait=True, timeout=-1):
303 if wait:
304 output = self.command("c", timeout=timeout)
305 assert "Continuing" in output
306 return output
307 else:
308 self.child.sendline("c")
309 self.child.expect("Continuing")
310
311 def interrupt(self):
312 self.child.send("\003")
313 self.child.expect(r"\(gdb\)", timeout=6000)
314 return self.child.before.strip()
315
316 def x(self, address, size='w'):
317 output = self.command("x/%s %s" % (size, address))
318 value = int(output.split(':')[1].strip(), 0)
319 return value
320
321 def p_raw(self, obj):
322 output = self.command("p %s" % obj)
323 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
324 if m:
325 raise CannotAccess(int(m.group(1), 0))
326 return output.split('=')[-1].strip()
327
328 def p(self, obj):
329 output = self.command("p/x %s" % obj)
330 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
331 if m:
332 raise CannotAccess(int(m.group(1), 0))
333 value = int(output.split('=')[-1].strip(), 0)
334 return value
335
336 def p_string(self, obj):
337 output = self.command("p %s" % obj)
338 value = shlex.split(output.split('=')[-1].strip())[1]
339 return value
340
341 def stepi(self):
342 output = self.command("stepi")
343 return output
344
345 def load(self):
346 output = self.command("load", timeout=6000)
347 assert "failed" not in output
348 assert "Transfer rate" in output
349
350 def b(self, location):
351 output = self.command("b %s" % location)
352 assert "not defined" not in output
353 assert "Breakpoint" in output
354 return output
355
356 def hbreak(self, location):
357 output = self.command("hbreak %s" % location)
358 assert "not defined" not in output
359 assert "Hardware assisted breakpoint" in output
360 return output
361
362 def run_all_tests(module, target, parsed):
363 good_results = set(('pass', 'not_applicable'))
364
365 start = time.time()
366
367 results = {}
368 count = 0
369
370 global gdb_cmd # pylint: disable=global-statement
371 gdb_cmd = parsed.gdb
372
373 todo = [("ExamineTarget", ExamineTarget)]
374 for name in dir(module):
375 definition = getattr(module, name)
376 if type(definition) == type and hasattr(definition, 'test') and \
377 (not parsed.test or any(test in name for test in parsed.test)):
378 todo.append((name, definition))
379
380 for name, definition in todo:
381 instance = definition(target)
382 result = instance.run()
383 results.setdefault(result, []).append(name)
384 count += 1
385 if result not in good_results and parsed.fail_fast:
386 break
387
388 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
389
390 result = 0
391 for key, value in results.iteritems():
392 print "%d tests returned %s" % (len(value), key)
393 if key not in good_results:
394 result = 1
395 for test in value:
396 print " ", test
397
398 return result
399
400 def add_test_run_options(parser):
401 parser.add_argument("--fail-fast", "-f", action="store_true",
402 help="Exit as soon as any test fails.")
403 parser.add_argument("test", nargs='*',
404 help="Run only tests that are named here.")
405 parser.add_argument("--gdb",
406 help="The command to use to start gdb.")
407
408 def header(title, dash='-'):
409 if title:
410 dashes = dash * (36 - len(title))
411 before = dashes[:len(dashes)/2]
412 after = dashes[len(dashes)/2:]
413 print "%s[ %s ]%s" % (before, title, after)
414 else:
415 print dash * 40
416
417 def print_log(path):
418 header(path)
419 lines = open(path, "r").readlines()
420 if len(lines) > 1000:
421 for l in lines[:500]:
422 sys.stdout.write(l)
423 print "..."
424 for l in lines[-500:]:
425 sys.stdout.write(l)
426 else:
427 for l in lines:
428 sys.stdout.write(l)
429
430 class BaseTest(object):
431 compiled = {}
432
433 def __init__(self, target):
434 self.target = target
435 self.server = None
436 self.target_process = None
437 self.binary = None
438 self.start = 0
439 self.logs = []
440
441 def early_applicable(self):
442 """Return a false value if the test has determined it cannot run
443 without ever needing to talk to the target or server."""
444 # pylint: disable=no-self-use
445 return True
446
447 def setup(self):
448 pass
449
450 def compile(self):
451 compile_args = getattr(self, 'compile_args', None)
452 if compile_args:
453 if compile_args not in BaseTest.compiled:
454 # pylint: disable=star-args
455 BaseTest.compiled[compile_args] = \
456 self.target.compile(*compile_args)
457 self.binary = BaseTest.compiled.get(compile_args)
458
459 def classSetup(self):
460 self.compile()
461 self.target_process = self.target.target()
462 self.server = self.target.server()
463 self.logs.append(self.server.logname)
464
465 def classTeardown(self):
466 del self.server
467 del self.target_process
468
469 def run(self):
470 """
471 If compile_args is set, compile a program and set self.binary.
472
473 Call setup().
474
475 Then call test() and return the result, displaying relevant information
476 if an exception is raised.
477 """
478
479 print "Running", type(self).__name__, "...",
480 sys.stdout.flush()
481
482 if not self.early_applicable():
483 print "not_applicable"
484 return "not_applicable"
485
486 self.start = time.time()
487
488 self.classSetup()
489
490 try:
491 self.setup()
492 result = self.test() # pylint: disable=no-member
493 except TestNotApplicable:
494 result = "not_applicable"
495 except Exception as e: # pylint: disable=broad-except
496 if isinstance(e, TestFailed):
497 result = "fail"
498 else:
499 result = "exception"
500 print "%s in %.2fs" % (result, time.time() - self.start)
501 print "=" * 40
502 if isinstance(e, TestFailed):
503 header("Message")
504 print e.message
505 header("Traceback")
506 traceback.print_exc(file=sys.stdout)
507 for log in self.logs:
508 print_log(log)
509 print "/" * 40
510 return result
511
512 finally:
513 self.classTeardown()
514
515 if not result:
516 result = 'pass'
517 print "%s in %.2fs" % (result, time.time() - self.start)
518 return result
519
520 gdb_cmd = None
521 class GdbTest(BaseTest):
522 def __init__(self, target):
523 BaseTest.__init__(self, target)
524 self.gdb = None
525
526 def classSetup(self):
527 BaseTest.classSetup(self)
528 self.logs.append("gdb.log")
529
530 if gdb_cmd:
531 self.gdb = Gdb(gdb_cmd)
532 else:
533 self.gdb = Gdb()
534
535 if self.binary:
536 self.gdb.command("file %s" % self.binary)
537 if self.target:
538 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
539 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
540 if self.server.port:
541 self.gdb.command(
542 "target extended-remote localhost:%d" % self.server.port)
543
544 # FIXME: OpenOCD doesn't handle PRIV now
545 #self.gdb.p("$priv=3")
546
547 def classTeardown(self):
548 del self.gdb
549 BaseTest.classTeardown(self)
550
551 class ExamineTarget(GdbTest):
552 def test(self):
553 self.target.misa = self.gdb.p("$misa")
554
555 txt = "RV"
556 if (self.target.misa >> 30) == 1:
557 txt += "32"
558 elif (self.target.misa >> 62) == 2:
559 txt += "64"
560 elif (self.target.misa >> 126) == 3:
561 txt += "128"
562 else:
563 txt += "??"
564
565 for i in range(26):
566 if self.target.misa & (1<<i):
567 txt += chr(i + ord('A'))
568 print txt,
569
570 class TestFailed(Exception):
571 def __init__(self, message):
572 Exception.__init__(self)
573 self.message = message
574
575 class TestNotApplicable(Exception):
576 def __init__(self, message):
577 Exception.__init__(self)
578 self.message = message
579
580 def assertEqual(a, b):
581 if a != b:
582 raise TestFailed("%r != %r" % (a, b))
583
584 def assertNotEqual(a, b):
585 if a == b:
586 raise TestFailed("%r == %r" % (a, b))
587
588 def assertIn(a, b):
589 if a not in b:
590 raise TestFailed("%r not in %r" % (a, b))
591
592 def assertNotIn(a, b):
593 if a in b:
594 raise TestFailed("%r in %r" % (a, b))
595
596 def assertGreater(a, b):
597 if not a > b:
598 raise TestFailed("%r not greater than %r" % (a, b))
599
600 def assertLess(a, b):
601 if not a < b:
602 raise TestFailed("%r not less than %r" % (a, b))
603
604 def assertTrue(a):
605 if not a:
606 raise TestFailed("%r is not True" % a)
607
608 def assertRegexpMatches(text, regexp):
609 if not re.search(regexp, text):
610 raise TestFailed("can't find %r in %r" % (regexp, text))