Make pylint happy.
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
23 cmd = [cc, "-g"]
24 if xlen == 32:
25 cmd.append("-march=rv32imac")
26 cmd.append("-mabi=ilp32")
27 else:
28 cmd.append("-march=rv64imac")
29 cmd.append("-mabi=lp64")
30 for arg in args:
31 found = find_file(arg)
32 if found:
33 cmd.append(found)
34 else:
35 cmd.append(arg)
36 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
37 stderr=subprocess.PIPE)
38 stdout, stderr = process.communicate()
39 if process.returncode:
40 print
41 header("Compile failed")
42 print "+", " ".join(cmd)
43 print stdout,
44 print stderr,
45 header("")
46 raise Exception("Compile failed!")
47
48 def unused_port():
49 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
50 import socket
51 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
52 s.bind(("", 0))
53 port = s.getsockname()[1]
54 s.close()
55 return port
56
57 class Spike(object):
58 logname = "spike.log"
59
60 def __init__(self, sim_cmd, binary=None, halted=False, with_jtag_gdb=True,
61 timeout=None, xlen=64):
62 """Launch spike. Return tuple of its process and the port it's running
63 on."""
64 if sim_cmd:
65 cmd = shlex.split(sim_cmd)
66 else:
67 spike = os.path.expandvars("$RISCV/bin/spike")
68 cmd = [spike]
69 if xlen == 32:
70 cmd += ["--isa", "RV32G"]
71 else:
72 cmd += ["--isa", "RV64G"]
73
74 if timeout:
75 cmd = ["timeout", str(timeout)] + cmd
76
77 cmd += ["-m0x10000000:0x10000000"]
78
79 if halted:
80 cmd.append('-H')
81 if with_jtag_gdb:
82 cmd += ['--rbb-port', '0']
83 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
84 cmd.append('programs/infinite_loop')
85 if binary:
86 cmd.append(binary)
87 logfile = open(self.logname, "w")
88 logfile.write("+ %s\n" % " ".join(cmd))
89 logfile.flush()
90 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
91 stdout=logfile, stderr=logfile)
92
93 if with_jtag_gdb:
94 self.port = None
95 for _ in range(30):
96 m = re.search(r"Listening for remote bitbang connection on "
97 r"port (\d+).", open(self.logname).read())
98 if m:
99 self.port = int(m.group(1))
100 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
101 break
102 time.sleep(0.11)
103 assert self.port, "Didn't get spike message about bitbang " \
104 "connection"
105
106 def __del__(self):
107 try:
108 self.process.kill()
109 self.process.wait()
110 except OSError:
111 pass
112
113 def wait(self, *args, **kwargs):
114 return self.process.wait(*args, **kwargs)
115
116 class VcsSim(object):
117 def __init__(self, sim_cmd=None, debug=False):
118 if sim_cmd:
119 cmd = shlex.split(sim_cmd)
120 else:
121 cmd = ["simv"]
122 cmd += ["+jtag_vpi_enable"]
123 if debug:
124 cmd[0] = cmd[0] + "-debug"
125 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
126 logfile = open("simv.log", "w")
127 logfile.write("+ %s\n" % " ".join(cmd))
128 logfile.flush()
129 listenfile = open("simv.log", "r")
130 listenfile.seek(0, 2)
131 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
132 stdout=logfile, stderr=logfile)
133 done = False
134 while not done:
135 # Fail if VCS exits early
136 exit_code = self.process.poll()
137 if exit_code is not None:
138 raise RuntimeError('VCS simulator exited early with status %d'
139 % exit_code)
140
141 line = listenfile.readline()
142 if not line:
143 time.sleep(1)
144 match = re.match(r"^Listening on port (\d+)$", line)
145 if match:
146 done = True
147 self.port = int(match.group(1))
148 os.environ['JTAG_VPI_PORT'] = str(self.port)
149
150 def __del__(self):
151 try:
152 self.process.kill()
153 self.process.wait()
154 except OSError:
155 pass
156
157 class Openocd(object):
158 logname = "openocd.log"
159
160 def __init__(self, server_cmd=None, config=None, debug=False):
161 if server_cmd:
162 cmd = shlex.split(server_cmd)
163 else:
164 openocd = os.path.expandvars("$RISCV/bin/riscv-openocd")
165 cmd = [openocd]
166 if debug:
167 cmd.append("-d")
168
169 # This command needs to come before any config scripts on the command
170 # line, since they are executed in order.
171 cmd += [
172 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
173 "--command",
174 "gdb_port 0",
175 # Disable tcl and telnet servers, since they are unused and because
176 # the port numbers will conflict if multiple OpenOCD processes are
177 # running on the same server.
178 "--command",
179 "tcl_port disabled",
180 "--command",
181 "telnet_port disabled",
182 ]
183
184 if config:
185 f = find_file(config)
186 if f is None:
187 print "Unable to read file " + config
188 exit(1)
189
190 cmd += ["-f", f]
191 if debug:
192 cmd.append("-d")
193
194 logfile = open(Openocd.logname, "w")
195 logfile.write("+ %s\n" % " ".join(cmd))
196 logfile.flush()
197 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
198 stdout=logfile, stderr=logfile)
199
200 # Wait for OpenOCD to have made it through riscv_examine(). When using
201 # OpenOCD to communicate with a simulator this may take a long time,
202 # and gdb will time out when trying to connect if we attempt too early.
203 start = time.time()
204 messaged = False
205 while True:
206 log = open(Openocd.logname).read()
207 if "Ready for Remote Connections" in log:
208 break
209 if not self.process.poll() is None:
210 raise Exception(
211 "OpenOCD exited before completing riscv_examine()")
212 if not messaged and time.time() - start > 1:
213 messaged = True
214 print "Waiting for OpenOCD to examine RISCV core..."
215
216 self.port = self._get_gdb_server_port()
217
218 def _get_gdb_server_port(self):
219 """Get port that OpenOCD's gdb server is listening on."""
220 MAX_ATTEMPTS = 50
221 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
222 for _ in range(MAX_ATTEMPTS):
223 with open(os.devnull, 'w') as devnull:
224 try:
225 output = subprocess.check_output([
226 'lsof',
227 '-a', # Take the AND of the following selectors
228 '-p{}'.format(self.process.pid), # Filter on PID
229 '-iTCP', # Filter only TCP sockets
230 ], stderr=devnull)
231 except subprocess.CalledProcessError:
232 output = ""
233 matches = list(PORT_REGEX.finditer(output))
234 matches = [m for m in matches
235 if m.group('port') not in ('6666', '4444')]
236 if len(matches) > 1:
237 print output
238 raise Exception(
239 "OpenOCD listening on multiple ports. Cannot uniquely "
240 "identify gdb server port.")
241 elif matches:
242 [match] = matches
243 return int(match.group('port'))
244 time.sleep(1)
245 raise Exception("Timed out waiting for gdb server to obtain port.")
246
247 def __del__(self):
248 try:
249 self.process.kill()
250 self.process.wait()
251 except OSError:
252 pass
253
254 class OpenocdCli(object):
255 def __init__(self, port=4444):
256 self.child = pexpect.spawn(
257 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
258 self.child.expect("> ")
259
260 def command(self, cmd):
261 self.child.sendline(cmd)
262 self.child.expect(cmd)
263 self.child.expect("\n")
264 self.child.expect("> ")
265 return self.child.before.strip("\t\r\n \0")
266
267 def reg(self, reg=''):
268 output = self.command("reg %s" % reg)
269 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
270 values = {r: int(v, 0) for r, v in matches}
271 if reg:
272 return values[reg]
273 return values
274
275 def load_image(self, image):
276 output = self.command("load_image %s" % image)
277 if 'invalid ELF file, only 32bits files are supported' in output:
278 raise TestNotApplicable(output)
279
280 class CannotAccess(Exception):
281 def __init__(self, address):
282 Exception.__init__(self)
283 self.address = address
284
285 class Gdb(object):
286 def __init__(self,
287 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
288 self.child = pexpect.spawn(cmd)
289 self.child.logfile = open("gdb.log", "w")
290 self.child.logfile.write("+ %s\n" % cmd)
291 self.wait()
292 self.command("set confirm off")
293 self.command("set width 0")
294 self.command("set height 0")
295 # Force consistency.
296 self.command("set print entry-values no")
297
298 def wait(self):
299 """Wait for prompt."""
300 self.child.expect(r"\(gdb\)")
301
302 def command(self, command, timeout=6000):
303 self.child.sendline(command)
304 self.child.expect("\n", timeout=timeout)
305 self.child.expect(r"\(gdb\)", timeout=timeout)
306 return self.child.before.strip()
307
308 def c(self, wait=True, timeout=-1):
309 if wait:
310 output = self.command("c", timeout=timeout)
311 assert "Continuing" in output
312 return output
313 else:
314 self.child.sendline("c")
315 self.child.expect("Continuing")
316
317 def interrupt(self):
318 self.child.send("\003")
319 self.child.expect(r"\(gdb\)", timeout=6000)
320 return self.child.before.strip()
321
322 def x(self, address, size='w'):
323 output = self.command("x/%s %s" % (size, address))
324 value = int(output.split(':')[1].strip(), 0)
325 return value
326
327 def p_raw(self, obj):
328 output = self.command("p %s" % obj)
329 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
330 if m:
331 raise CannotAccess(int(m.group(1), 0))
332 return output.split('=')[-1].strip()
333
334 def p(self, obj):
335 output = self.command("p/x %s" % obj)
336 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
337 if m:
338 raise CannotAccess(int(m.group(1), 0))
339 value = int(output.split('=')[-1].strip(), 0)
340 return value
341
342 def p_string(self, obj):
343 output = self.command("p %s" % obj)
344 value = shlex.split(output.split('=')[-1].strip())[1]
345 return value
346
347 def stepi(self):
348 output = self.command("stepi")
349 return output
350
351 def load(self):
352 output = self.command("load", timeout=6000)
353 assert "failed" not in output
354 assert "Transfer rate" in output
355
356 def b(self, location):
357 output = self.command("b %s" % location)
358 assert "not defined" not in output
359 assert "Breakpoint" in output
360 return output
361
362 def hbreak(self, location):
363 output = self.command("hbreak %s" % location)
364 assert "not defined" not in output
365 assert "Hardware assisted breakpoint" in output
366 return output
367
368 def run_all_tests(module, target, parsed):
369 good_results = set(('pass', 'not_applicable'))
370
371 start = time.time()
372
373 results = {}
374 count = 0
375
376 global gdb_cmd # pylint: disable=global-statement
377 gdb_cmd = parsed.gdb
378
379 todo = []
380 if parsed.misaval:
381 target.misa = int(parsed.misaval, 16)
382 print "Assuming $MISA value of 0x%x. Skipping ExamineTarget." % \
383 target.misa
384 else:
385 todo.append(("ExamineTarget", ExamineTarget))
386
387 for name in dir(module):
388 definition = getattr(module, name)
389 if type(definition) == type and hasattr(definition, 'test') and \
390 (not parsed.test or any(test in name for test in parsed.test)):
391 todo.append((name, definition))
392
393 for name, definition in todo:
394 instance = definition(target)
395 result = instance.run()
396 results.setdefault(result, []).append(name)
397 count += 1
398 if result not in good_results and parsed.fail_fast:
399 break
400
401 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
402
403 result = 0
404 for key, value in results.iteritems():
405 print "%d tests returned %s" % (len(value), key)
406 if key not in good_results:
407 result = 1
408 for test in value:
409 print " ", test
410
411 return result
412
413 def add_test_run_options(parser):
414
415 parser.add_argument("--fail-fast", "-f", action="store_true",
416 help="Exit as soon as any test fails.")
417 parser.add_argument("test", nargs='*',
418 help="Run only tests that are named here.")
419 parser.add_argument("--gdb",
420 help="The command to use to start gdb.")
421 parser.add_argument("--misaval",
422 help="Don't run ExamineTarget, just assume the misa value which is "
423 "specified.")
424
425 def header(title, dash='-'):
426 if title:
427 dashes = dash * (36 - len(title))
428 before = dashes[:len(dashes)/2]
429 after = dashes[len(dashes)/2:]
430 print "%s[ %s ]%s" % (before, title, after)
431 else:
432 print dash * 40
433
434 def print_log(path):
435 header(path)
436 lines = open(path, "r").readlines()
437 if len(lines) > 1000:
438 for l in lines[:500]:
439 sys.stdout.write(l)
440 print "..."
441 for l in lines[-500:]:
442 sys.stdout.write(l)
443 else:
444 for l in lines:
445 sys.stdout.write(l)
446
447 class BaseTest(object):
448 compiled = {}
449
450 def __init__(self, target):
451 self.target = target
452 self.server = None
453 self.target_process = None
454 self.binary = None
455 self.start = 0
456 self.logs = []
457
458 def early_applicable(self):
459 """Return a false value if the test has determined it cannot run
460 without ever needing to talk to the target or server."""
461 # pylint: disable=no-self-use
462 return True
463
464 def setup(self):
465 pass
466
467 def compile(self):
468 compile_args = getattr(self, 'compile_args', None)
469 if compile_args:
470 if compile_args not in BaseTest.compiled:
471 # pylint: disable=star-args
472 BaseTest.compiled[compile_args] = \
473 self.target.compile(*compile_args)
474 self.binary = BaseTest.compiled.get(compile_args)
475
476 def classSetup(self):
477 self.compile()
478 self.target_process = self.target.target()
479 self.server = self.target.server()
480 self.logs.append(self.server.logname)
481
482 def classTeardown(self):
483 del self.server
484 del self.target_process
485
486 def run(self):
487 """
488 If compile_args is set, compile a program and set self.binary.
489
490 Call setup().
491
492 Then call test() and return the result, displaying relevant information
493 if an exception is raised.
494 """
495
496 print "Running", type(self).__name__, "...",
497 sys.stdout.flush()
498
499 if not self.early_applicable():
500 print "not_applicable"
501 return "not_applicable"
502
503 self.start = time.time()
504
505 self.classSetup()
506
507 try:
508 self.setup()
509 result = self.test() # pylint: disable=no-member
510 except TestNotApplicable:
511 result = "not_applicable"
512 except Exception as e: # pylint: disable=broad-except
513 if isinstance(e, TestFailed):
514 result = "fail"
515 else:
516 result = "exception"
517 print "%s in %.2fs" % (result, time.time() - self.start)
518 print "=" * 40
519 if isinstance(e, TestFailed):
520 header("Message")
521 print e.message
522 header("Traceback")
523 traceback.print_exc(file=sys.stdout)
524 for log in self.logs:
525 print_log(log)
526 print "/" * 40
527 return result
528
529 finally:
530 self.classTeardown()
531
532 if not result:
533 result = 'pass'
534 print "%s in %.2fs" % (result, time.time() - self.start)
535 return result
536
537 gdb_cmd = None
538 class GdbTest(BaseTest):
539 def __init__(self, target):
540 BaseTest.__init__(self, target)
541 self.gdb = None
542
543 def classSetup(self):
544 BaseTest.classSetup(self)
545 self.logs.append("gdb.log")
546
547 if gdb_cmd:
548 self.gdb = Gdb(gdb_cmd)
549 else:
550 self.gdb = Gdb()
551
552 if self.binary:
553 self.gdb.command("file %s" % self.binary)
554 if self.target:
555 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
556 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
557 if self.server.port:
558 self.gdb.command(
559 "target extended-remote localhost:%d" % self.server.port)
560
561 # FIXME: OpenOCD doesn't handle PRIV now
562 #self.gdb.p("$priv=3")
563
564 def classTeardown(self):
565 del self.gdb
566 BaseTest.classTeardown(self)
567
568 class ExamineTarget(GdbTest):
569 def test(self):
570 self.target.misa = self.gdb.p("$misa")
571
572 txt = "RV"
573 if (self.target.misa >> 30) == 1:
574 txt += "32"
575 elif (self.target.misa >> 62) == 2:
576 txt += "64"
577 elif (self.target.misa >> 126) == 3:
578 txt += "128"
579 else:
580 txt += "??"
581
582 for i in range(26):
583 if self.target.misa & (1<<i):
584 txt += chr(i + ord('A'))
585 print txt,
586
587 class TestFailed(Exception):
588 def __init__(self, message):
589 Exception.__init__(self)
590 self.message = message
591
592 class TestNotApplicable(Exception):
593 def __init__(self, message):
594 Exception.__init__(self)
595 self.message = message
596
597 def assertEqual(a, b):
598 if a != b:
599 raise TestFailed("%r != %r" % (a, b))
600
601 def assertNotEqual(a, b):
602 if a == b:
603 raise TestFailed("%r == %r" % (a, b))
604
605 def assertIn(a, b):
606 if a not in b:
607 raise TestFailed("%r not in %r" % (a, b))
608
609 def assertNotIn(a, b):
610 if a in b:
611 raise TestFailed("%r in %r" % (a, b))
612
613 def assertGreater(a, b):
614 if not a > b:
615 raise TestFailed("%r not greater than %r" % (a, b))
616
617 def assertLess(a, b):
618 if not a < b:
619 raise TestFailed("%r not less than %r" % (a, b))
620
621 def assertTrue(a):
622 if not a:
623 raise TestFailed("%r is not True" % a)
624
625 def assertRegexpMatches(text, regexp):
626 if not re.search(regexp, text):
627 raise TestFailed("can't find %r in %r" % (regexp, text))