debug: Don't halt out of reset. It's unrealistic. Use a program which loops (actually...
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
23 cmd = [cc, "-g"]
24 if (xlen == 32):
25 cmd.append("-march=rv32imac")
26 cmd.append("-mabi=ilp32")
27 else:
28 cmd.append("-march=rv64imac")
29 cmd.append("-mabi=lp64")
30 for arg in args:
31 found = find_file(arg)
32 if found:
33 cmd.append(found)
34 else:
35 cmd.append(arg)
36 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
37 stderr=subprocess.PIPE)
38 stdout, stderr = process.communicate()
39 if process.returncode:
40 print
41 header("Compile failed")
42 print "+", " ".join(cmd)
43 print stdout,
44 print stderr,
45 header("")
46 raise Exception("Compile failed!")
47
48 def unused_port():
49 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
50 import socket
51 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
52 s.bind(("", 0))
53 port = s.getsockname()[1]
54 s.close()
55 return port
56
57 class Spike(object):
58 logname = "spike.log"
59
60 def __init__(self, sim_cmd, binary=None, halted=False, with_jtag_gdb=True,
61 timeout=None, xlen=64):
62 """Launch spike. Return tuple of its process and the port it's running
63 on."""
64 if sim_cmd:
65 cmd = shlex.split(sim_cmd)
66 else:
67 spike = os.path.expandvars("$RISCV/bin/spike")
68 cmd = [spike]
69 if xlen == 32:
70 cmd += ["--isa", "RV32"]
71
72 if timeout:
73 cmd = ["timeout", str(timeout)] + cmd
74
75 if halted:
76 cmd.append('-H')
77 if with_jtag_gdb:
78 cmd += ['--rbb-port', '0']
79 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
80 cmd.append("-m32")
81 cmd.append('programs/infinite_loop')
82 if binary:
83 cmd.append(binary)
84 logfile = open(self.logname, "w")
85 logfile.write("+ %s\n" % " ".join(cmd))
86 logfile.flush()
87 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
88 stdout=logfile, stderr=logfile)
89
90 if with_jtag_gdb:
91 self.port = None
92 for _ in range(30):
93 m = re.search(r"Listening for remote bitbang connection on "
94 r"port (\d+).", open(self.logname).read())
95 if m:
96 self.port = int(m.group(1))
97 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
98 break
99 time.sleep(0.11)
100 assert self.port, "Didn't get spike message about bitbang " \
101 "connection"
102
103 def __del__(self):
104 try:
105 self.process.kill()
106 self.process.wait()
107 except OSError:
108 pass
109
110 def wait(self, *args, **kwargs):
111 return self.process.wait(*args, **kwargs)
112
113 class VcsSim(object):
114 def __init__(self, sim_cmd=None, debug=False):
115 if sim_cmd:
116 cmd = shlex.split(simv)
117 else:
118 cmd = ["simv"]
119 cmd += ["+jtag_vpi_enable"]
120 if debug:
121 cmd[0] = cmd[0] + "-debug"
122 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
123 logfile = open("simv.log", "w")
124 logfile.write("+ %s\n" % " ".join(cmd))
125 logfile.flush()
126 listenfile = open("simv.log", "r")
127 listenfile.seek(0, 2)
128 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
129 stdout=logfile, stderr=logfile)
130 done = False
131 while not done:
132 line = listenfile.readline()
133 if not line:
134 time.sleep(1)
135 match = re.match(r"^Listening on port (\d+)$", line)
136 if match:
137 done = True
138 self.port = int(match.group(1))
139 os.environ['JTAG_VPI_PORT'] = str(self.port)
140
141 def __del__(self):
142 try:
143 self.process.kill()
144 self.process.wait()
145 except OSError:
146 pass
147
148 class Openocd(object):
149 logname = "openocd.log"
150
151 def __init__(self, server_cmd=None, config=None, debug=False):
152 if server_cmd:
153 cmd = shlex.split(server_cmd)
154 else:
155 openocd = os.path.expandvars("$RISCV/bin/riscv-openocd")
156 cmd = [openocd]
157 if (debug):
158 cmd.append("-d")
159
160 # This command needs to come before any config scripts on the command
161 # line, since they are executed in order.
162 cmd += [
163 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
164 "--command",
165 "gdb_port 0",
166 # Disable tcl and telnet servers, since they are unused and because
167 # the port numbers will conflict if multiple OpenOCD processes are
168 # running on the same server.
169 "--command",
170 "tcl_port disabled",
171 "--command",
172 "telnet_port disabled",
173 ]
174
175 if config:
176 f = find_file(config)
177 if f is None:
178 print("Unable to read file " + config)
179 exit(1)
180
181 cmd += ["-f", f]
182 if debug:
183 cmd.append("-d")
184
185 logfile = open(Openocd.logname, "w")
186 logfile.write("+ %s\n" % " ".join(cmd))
187 logfile.flush()
188 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
189 stdout=logfile, stderr=logfile)
190
191 # Wait for OpenOCD to have made it through riscv_examine(). When using
192 # OpenOCD to communicate with a simulator this may take a long time,
193 # and gdb will time out when trying to connect if we attempt too early.
194 start = time.time()
195 messaged = False
196 while True:
197 log = open(Openocd.logname).read()
198 if "Ready for Remote Connections" in log:
199 break
200 if not self.process.poll() is None:
201 raise Exception(
202 "OpenOCD exited before completing riscv_examine()")
203 if not messaged and time.time() - start > 1:
204 messaged = True
205 print "Waiting for OpenOCD to examine RISCV core..."
206
207 self.port = self._get_gdb_server_port()
208
209 def _get_gdb_server_port(self):
210 """Get port that OpenOCD's gdb server is listening on."""
211 MAX_ATTEMPTS = 50
212 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
213 for _ in range(MAX_ATTEMPTS):
214 with open(os.devnull, 'w') as devnull:
215 try:
216 output = subprocess.check_output([
217 'lsof',
218 '-a', # Take the AND of the following selectors
219 '-p{}'.format(self.process.pid), # Filter on PID
220 '-iTCP', # Filter only TCP sockets
221 ], stderr=devnull)
222 except subprocess.CalledProcessError:
223 output = ""
224 matches = list(PORT_REGEX.finditer(output))
225 matches = [m for m in matches
226 if m.group('port') not in ('6666', '4444')]
227 if len(matches) > 1:
228 print output
229 raise Exception(
230 "OpenOCD listening on multiple ports. Cannot uniquely "
231 "identify gdb server port.")
232 elif matches:
233 [match] = matches
234 return int(match.group('port'))
235 time.sleep(1)
236 raise Exception("Timed out waiting for gdb server to obtain port.")
237
238 def __del__(self):
239 try:
240 self.process.kill()
241 self.process.wait()
242 except OSError:
243 pass
244
245 class OpenocdCli(object):
246 def __init__(self, port=4444):
247 self.child = pexpect.spawn(
248 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
249 self.child.expect("> ")
250
251 def command(self, cmd):
252 self.child.sendline(cmd)
253 self.child.expect(cmd)
254 self.child.expect("\n")
255 self.child.expect("> ")
256 return self.child.before.strip("\t\r\n \0")
257
258 def reg(self, reg=''):
259 output = self.command("reg %s" % reg)
260 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
261 values = {r: int(v, 0) for r, v in matches}
262 if reg:
263 return values[reg]
264 return values
265
266 def load_image(self, image):
267 output = self.command("load_image %s" % image)
268 if 'invalid ELF file, only 32bits files are supported' in output:
269 raise TestNotApplicable(output)
270
271 class CannotAccess(Exception):
272 def __init__(self, address):
273 Exception.__init__(self)
274 self.address = address
275
276 class Gdb(object):
277 def __init__(self,
278 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
279 self.child = pexpect.spawn(cmd)
280 self.child.logfile = open("gdb.log", "w")
281 self.child.logfile.write("+ %s\n" % cmd)
282 self.wait()
283 self.command("set confirm off")
284 self.command("set width 0")
285 self.command("set height 0")
286 # Force consistency.
287 self.command("set print entry-values no")
288
289 def wait(self):
290 """Wait for prompt."""
291 self.child.expect(r"\(gdb\)")
292
293 def command(self, command, timeout=6000):
294 self.child.sendline(command)
295 self.child.expect("\n", timeout=timeout)
296 self.child.expect(r"\(gdb\)", timeout=timeout)
297 return self.child.before.strip()
298
299 def c(self, wait=True, timeout=-1):
300 if wait:
301 output = self.command("c", timeout=timeout)
302 assert "Continuing" in output
303 return output
304 else:
305 self.child.sendline("c")
306 self.child.expect("Continuing")
307
308 def interrupt(self):
309 self.child.send("\003")
310 self.child.expect(r"\(gdb\)", timeout=6000)
311 return self.child.before.strip()
312
313 def x(self, address, size='w'):
314 output = self.command("x/%s %s" % (size, address))
315 value = int(output.split(':')[1].strip(), 0)
316 return value
317
318 def p_raw(self, obj):
319 output = self.command("p %s" % obj)
320 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
321 if m:
322 raise CannotAccess(int(m.group(1), 0))
323 return output.split('=')[-1].strip()
324
325 def p(self, obj):
326 output = self.command("p/x %s" % obj)
327 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
328 if m:
329 raise CannotAccess(int(m.group(1), 0))
330 value = int(output.split('=')[-1].strip(), 0)
331 return value
332
333 def p_string(self, obj):
334 output = self.command("p %s" % obj)
335 value = shlex.split(output.split('=')[-1].strip())[1]
336 return value
337
338 def stepi(self):
339 output = self.command("stepi")
340 return output
341
342 def load(self):
343 output = self.command("load", timeout=6000)
344 assert "failed" not in output
345 assert "Transfer rate" in output
346
347 def b(self, location):
348 output = self.command("b %s" % location)
349 assert "not defined" not in output
350 assert "Breakpoint" in output
351 return output
352
353 def hbreak(self, location):
354 output = self.command("hbreak %s" % location)
355 assert "not defined" not in output
356 assert "Hardware assisted breakpoint" in output
357 return output
358
359 def run_all_tests(module, target, parsed):
360 good_results = set(('pass', 'not_applicable'))
361
362 start = time.time()
363
364 results = {}
365 count = 0
366
367 global gdb_cmd # pylint: disable=global-statement
368 gdb_cmd = parsed.gdb
369
370 todo = [("ExamineTarget", ExamineTarget)]
371 for name in dir(module):
372 definition = getattr(module, name)
373 if type(definition) == type and hasattr(definition, 'test') and \
374 (not parsed.test or any(test in name for test in parsed.test)):
375 todo.append((name, definition))
376
377 for name, definition in todo:
378 instance = definition(target)
379 result = instance.run()
380 results.setdefault(result, []).append(name)
381 count += 1
382 if result not in good_results and parsed.fail_fast:
383 break
384
385 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
386
387 result = 0
388 for key, value in results.iteritems():
389 print "%d tests returned %s" % (len(value), key)
390 if key not in good_results:
391 result = 1
392 for test in value:
393 print " ", test
394
395 return result
396
397 def add_test_run_options(parser):
398 parser.add_argument("--fail-fast", "-f", action="store_true",
399 help="Exit as soon as any test fails.")
400 parser.add_argument("test", nargs='*',
401 help="Run only tests that are named here.")
402 parser.add_argument("--gdb",
403 help="The command to use to start gdb.")
404
405 def header(title, dash='-'):
406 if title:
407 dashes = dash * (36 - len(title))
408 before = dashes[:len(dashes)/2]
409 after = dashes[len(dashes)/2:]
410 print "%s[ %s ]%s" % (before, title, after)
411 else:
412 print dash * 40
413
414 def print_log(path):
415 header(path)
416 lines = open(path, "r").readlines()
417 if len(lines) > 1000:
418 for l in lines[:500]:
419 sys.stdout.write(l)
420 print "..."
421 for l in lines[-500:]:
422 sys.stdout.write(l)
423 else:
424 for l in lines:
425 sys.stdout.write(l)
426
427 class BaseTest(object):
428 compiled = {}
429
430 def __init__(self, target):
431 self.target = target
432 self.server = None
433 self.target_process = None
434 self.binary = None
435 self.start = 0
436 self.logs = []
437
438 def early_applicable(self):
439 """Return a false value if the test has determined it cannot run
440 without ever needing to talk to the target or server."""
441 # pylint: disable=no-self-use
442 return True
443
444 def setup(self):
445 pass
446
447 def compile(self):
448 compile_args = getattr(self, 'compile_args', None)
449 if compile_args:
450 if compile_args not in BaseTest.compiled:
451 # pylint: disable=star-args
452 BaseTest.compiled[compile_args] = \
453 self.target.compile(*compile_args)
454 self.binary = BaseTest.compiled.get(compile_args)
455
456 def classSetup(self):
457 self.compile()
458 self.target_process = self.target.target()
459 self.server = self.target.server()
460 self.logs.append(self.server.logname)
461
462 def classTeardown(self):
463 del self.server
464 del self.target_process
465
466 def run(self):
467 """
468 If compile_args is set, compile a program and set self.binary.
469
470 Call setup().
471
472 Then call test() and return the result, displaying relevant information
473 if an exception is raised.
474 """
475
476 print "Running", type(self).__name__, "...",
477 sys.stdout.flush()
478
479 if not self.early_applicable():
480 print "not_applicable"
481 return "not_applicable"
482
483 self.start = time.time()
484
485 self.classSetup()
486
487 try:
488 self.setup()
489 result = self.test() # pylint: disable=no-member
490 except TestNotApplicable:
491 result = "not_applicable"
492 except Exception as e: # pylint: disable=broad-except
493 if isinstance(e, TestFailed):
494 result = "fail"
495 else:
496 result = "exception"
497 print "%s in %.2fs" % (result, time.time() - self.start)
498 print "=" * 40
499 if isinstance(e, TestFailed):
500 header("Message")
501 print e.message
502 header("Traceback")
503 traceback.print_exc(file=sys.stdout)
504 for log in self.logs:
505 print_log(log)
506 print "/" * 40
507 return result
508
509 finally:
510 self.classTeardown()
511
512 if not result:
513 result = 'pass'
514 print "%s in %.2fs" % (result, time.time() - self.start)
515 return result
516
517 gdb_cmd = None
518 class GdbTest(BaseTest):
519 def __init__(self, target):
520 BaseTest.__init__(self, target)
521 self.gdb = None
522
523 def classSetup(self):
524 BaseTest.classSetup(self)
525 self.logs.append("gdb.log")
526
527 if gdb_cmd:
528 self.gdb = Gdb(gdb_cmd)
529 else:
530 self.gdb = Gdb()
531
532 if self.binary:
533 self.gdb.command("file %s" % self.binary)
534 if self.target:
535 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
536 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
537 if self.server.port:
538 self.gdb.command(
539 "target extended-remote localhost:%d" % self.server.port)
540
541 self.gdb.p("$priv=3")
542
543 def classTeardown(self):
544 del self.gdb
545 BaseTest.classTeardown(self)
546
547 class ExamineTarget(GdbTest):
548 def test(self):
549 self.target.misa = self.gdb.p("$misa")
550
551 txt = "RV"
552 if (self.target.misa >> 30) == 1:
553 txt += "32"
554 elif (self.target.misa >> 62) == 2:
555 txt += "64"
556 elif (self.target.misa >> 126) == 3:
557 txt += "128"
558 else:
559 txt += "??"
560
561 for i in range(26):
562 if self.target.misa & (1<<i):
563 txt += chr(i + ord('A'))
564 print txt,
565
566 class TestFailed(Exception):
567 def __init__(self, message):
568 Exception.__init__(self)
569 self.message = message
570
571 class TestNotApplicable(Exception):
572 def __init__(self, message):
573 Exception.__init__(self)
574 self.message = message
575
576 def assertEqual(a, b):
577 if a != b:
578 raise TestFailed("%r != %r" % (a, b))
579
580 def assertNotEqual(a, b):
581 if a == b:
582 raise TestFailed("%r == %r" % (a, b))
583
584 def assertIn(a, b):
585 if a not in b:
586 raise TestFailed("%r not in %r" % (a, b))
587
588 def assertNotIn(a, b):
589 if a in b:
590 raise TestFailed("%r in %r" % (a, b))
591
592 def assertGreater(a, b):
593 if not a > b:
594 raise TestFailed("%r not greater than %r" % (a, b))
595
596 def assertLess(a, b):
597 if not a < b:
598 raise TestFailed("%r not less than %r" % (a, b))
599
600 def assertTrue(a):
601 if not a:
602 raise TestFailed("%r is not True" % a)
603
604 def assertRegexpMatches(text, regexp):
605 if not re.search(regexp, text):
606 raise TestFailed("can't find %r in %r" % (regexp, text))