Let Spike have the default amount of RAM
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
23 cmd = [cc, "-g"]
24 if (xlen == 32):
25 cmd.append("-march=rv32imac")
26 cmd.append("-mabi=ilp32")
27 else:
28 cmd.append("-march=rv64imac")
29 cmd.append("-mabi=lp64")
30 for arg in args:
31 found = find_file(arg)
32 if found:
33 cmd.append(found)
34 else:
35 cmd.append(arg)
36 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
37 stderr=subprocess.PIPE)
38 stdout, stderr = process.communicate()
39 if process.returncode:
40 print
41 header("Compile failed")
42 print "+", " ".join(cmd)
43 print stdout,
44 print stderr,
45 header("")
46 raise Exception("Compile failed!")
47
48 def unused_port():
49 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
50 import socket
51 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
52 s.bind(("", 0))
53 port = s.getsockname()[1]
54 s.close()
55 return port
56
57 class Spike(object):
58 logname = "spike.log"
59
60 def __init__(self, sim_cmd, binary=None, halted=False, with_jtag_gdb=True,
61 timeout=None, xlen=64):
62 """Launch spike. Return tuple of its process and the port it's running
63 on."""
64 if sim_cmd:
65 cmd = shlex.split(sim_cmd)
66 else:
67 spike = os.path.expandvars("$RISCV/bin/spike")
68 cmd = [spike]
69 if xlen == 32:
70 cmd += ["--isa", "RV32"]
71
72 if timeout:
73 cmd = ["timeout", str(timeout)] + cmd
74
75 if halted:
76 cmd.append('-H')
77 if with_jtag_gdb:
78 cmd += ['--rbb-port', '0']
79 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
80 cmd.append('programs/infinite_loop')
81 if binary:
82 cmd.append(binary)
83 logfile = open(self.logname, "w")
84 logfile.write("+ %s\n" % " ".join(cmd))
85 logfile.flush()
86 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
87 stdout=logfile, stderr=logfile)
88
89 if with_jtag_gdb:
90 self.port = None
91 for _ in range(30):
92 m = re.search(r"Listening for remote bitbang connection on "
93 r"port (\d+).", open(self.logname).read())
94 if m:
95 self.port = int(m.group(1))
96 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
97 break
98 time.sleep(0.11)
99 assert self.port, "Didn't get spike message about bitbang " \
100 "connection"
101
102 def __del__(self):
103 try:
104 self.process.kill()
105 self.process.wait()
106 except OSError:
107 pass
108
109 def wait(self, *args, **kwargs):
110 return self.process.wait(*args, **kwargs)
111
112 class VcsSim(object):
113 def __init__(self, sim_cmd=None, debug=False):
114 if sim_cmd:
115 cmd = shlex.split(sim_cmd)
116 else:
117 cmd = ["simv"]
118 cmd += ["+jtag_vpi_enable"]
119 if debug:
120 cmd[0] = cmd[0] + "-debug"
121 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
122 logfile = open("simv.log", "w")
123 logfile.write("+ %s\n" % " ".join(cmd))
124 logfile.flush()
125 listenfile = open("simv.log", "r")
126 listenfile.seek(0, 2)
127 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
128 stdout=logfile, stderr=logfile)
129 done = False
130 while not done:
131 line = listenfile.readline()
132 if not line:
133 time.sleep(1)
134 match = re.match(r"^Listening on port (\d+)$", line)
135 if match:
136 done = True
137 self.port = int(match.group(1))
138 os.environ['JTAG_VPI_PORT'] = str(self.port)
139
140 def __del__(self):
141 try:
142 self.process.kill()
143 self.process.wait()
144 except OSError:
145 pass
146
147 class Openocd(object):
148 logname = "openocd.log"
149
150 def __init__(self, server_cmd=None, config=None, debug=False):
151 if server_cmd:
152 cmd = shlex.split(server_cmd)
153 else:
154 openocd = os.path.expandvars("$RISCV/bin/riscv-openocd")
155 cmd = [openocd]
156 if (debug):
157 cmd.append("-d")
158
159 # This command needs to come before any config scripts on the command
160 # line, since they are executed in order.
161 cmd += [
162 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
163 "--command",
164 "gdb_port 0",
165 # Disable tcl and telnet servers, since they are unused and because
166 # the port numbers will conflict if multiple OpenOCD processes are
167 # running on the same server.
168 "--command",
169 "tcl_port disabled",
170 "--command",
171 "telnet_port disabled",
172 ]
173
174 if config:
175 f = find_file(config)
176 if f is None:
177 print("Unable to read file " + config)
178 exit(1)
179
180 cmd += ["-f", f]
181 if debug:
182 cmd.append("-d")
183
184 logfile = open(Openocd.logname, "w")
185 logfile.write("+ %s\n" % " ".join(cmd))
186 logfile.flush()
187 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
188 stdout=logfile, stderr=logfile)
189
190 # Wait for OpenOCD to have made it through riscv_examine(). When using
191 # OpenOCD to communicate with a simulator this may take a long time,
192 # and gdb will time out when trying to connect if we attempt too early.
193 start = time.time()
194 messaged = False
195 while True:
196 log = open(Openocd.logname).read()
197 if "Ready for Remote Connections" in log:
198 break
199 if not self.process.poll() is None:
200 raise Exception(
201 "OpenOCD exited before completing riscv_examine()")
202 if not messaged and time.time() - start > 1:
203 messaged = True
204 print "Waiting for OpenOCD to examine RISCV core..."
205
206 self.port = self._get_gdb_server_port()
207
208 def _get_gdb_server_port(self):
209 """Get port that OpenOCD's gdb server is listening on."""
210 MAX_ATTEMPTS = 50
211 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
212 for _ in range(MAX_ATTEMPTS):
213 with open(os.devnull, 'w') as devnull:
214 try:
215 output = subprocess.check_output([
216 'lsof',
217 '-a', # Take the AND of the following selectors
218 '-p{}'.format(self.process.pid), # Filter on PID
219 '-iTCP', # Filter only TCP sockets
220 ], stderr=devnull)
221 except subprocess.CalledProcessError:
222 output = ""
223 matches = list(PORT_REGEX.finditer(output))
224 matches = [m for m in matches
225 if m.group('port') not in ('6666', '4444')]
226 if len(matches) > 1:
227 print output
228 raise Exception(
229 "OpenOCD listening on multiple ports. Cannot uniquely "
230 "identify gdb server port.")
231 elif matches:
232 [match] = matches
233 return int(match.group('port'))
234 time.sleep(1)
235 raise Exception("Timed out waiting for gdb server to obtain port.")
236
237 def __del__(self):
238 try:
239 self.process.kill()
240 self.process.wait()
241 except OSError:
242 pass
243
244 class OpenocdCli(object):
245 def __init__(self, port=4444):
246 self.child = pexpect.spawn(
247 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
248 self.child.expect("> ")
249
250 def command(self, cmd):
251 self.child.sendline(cmd)
252 self.child.expect(cmd)
253 self.child.expect("\n")
254 self.child.expect("> ")
255 return self.child.before.strip("\t\r\n \0")
256
257 def reg(self, reg=''):
258 output = self.command("reg %s" % reg)
259 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
260 values = {r: int(v, 0) for r, v in matches}
261 if reg:
262 return values[reg]
263 return values
264
265 def load_image(self, image):
266 output = self.command("load_image %s" % image)
267 if 'invalid ELF file, only 32bits files are supported' in output:
268 raise TestNotApplicable(output)
269
270 class CannotAccess(Exception):
271 def __init__(self, address):
272 Exception.__init__(self)
273 self.address = address
274
275 class Gdb(object):
276 def __init__(self,
277 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
278 self.child = pexpect.spawn(cmd)
279 self.child.logfile = open("gdb.log", "w")
280 self.child.logfile.write("+ %s\n" % cmd)
281 self.wait()
282 self.command("set confirm off")
283 self.command("set width 0")
284 self.command("set height 0")
285 # Force consistency.
286 self.command("set print entry-values no")
287
288 def wait(self):
289 """Wait for prompt."""
290 self.child.expect(r"\(gdb\)")
291
292 def command(self, command, timeout=6000):
293 self.child.sendline(command)
294 self.child.expect("\n", timeout=timeout)
295 self.child.expect(r"\(gdb\)", timeout=timeout)
296 return self.child.before.strip()
297
298 def c(self, wait=True, timeout=-1):
299 if wait:
300 output = self.command("c", timeout=timeout)
301 assert "Continuing" in output
302 return output
303 else:
304 self.child.sendline("c")
305 self.child.expect("Continuing")
306
307 def interrupt(self):
308 self.child.send("\003")
309 self.child.expect(r"\(gdb\)", timeout=6000)
310 return self.child.before.strip()
311
312 def x(self, address, size='w'):
313 output = self.command("x/%s %s" % (size, address))
314 value = int(output.split(':')[1].strip(), 0)
315 return value
316
317 def p_raw(self, obj):
318 output = self.command("p %s" % obj)
319 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
320 if m:
321 raise CannotAccess(int(m.group(1), 0))
322 return output.split('=')[-1].strip()
323
324 def p(self, obj):
325 output = self.command("p/x %s" % obj)
326 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
327 if m:
328 raise CannotAccess(int(m.group(1), 0))
329 value = int(output.split('=')[-1].strip(), 0)
330 return value
331
332 def p_string(self, obj):
333 output = self.command("p %s" % obj)
334 value = shlex.split(output.split('=')[-1].strip())[1]
335 return value
336
337 def stepi(self):
338 output = self.command("stepi")
339 return output
340
341 def load(self):
342 output = self.command("load", timeout=6000)
343 assert "failed" not in output
344 assert "Transfer rate" in output
345
346 def b(self, location):
347 output = self.command("b %s" % location)
348 assert "not defined" not in output
349 assert "Breakpoint" in output
350 return output
351
352 def hbreak(self, location):
353 output = self.command("hbreak %s" % location)
354 assert "not defined" not in output
355 assert "Hardware assisted breakpoint" in output
356 return output
357
358 def run_all_tests(module, target, parsed):
359 good_results = set(('pass', 'not_applicable'))
360
361 start = time.time()
362
363 results = {}
364 count = 0
365
366 global gdb_cmd # pylint: disable=global-statement
367 gdb_cmd = parsed.gdb
368
369 todo = [("ExamineTarget", ExamineTarget)]
370 for name in dir(module):
371 definition = getattr(module, name)
372 if type(definition) == type and hasattr(definition, 'test') and \
373 (not parsed.test or any(test in name for test in parsed.test)):
374 todo.append((name, definition))
375
376 for name, definition in todo:
377 instance = definition(target)
378 result = instance.run()
379 results.setdefault(result, []).append(name)
380 count += 1
381 if result not in good_results and parsed.fail_fast:
382 break
383
384 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
385
386 result = 0
387 for key, value in results.iteritems():
388 print "%d tests returned %s" % (len(value), key)
389 if key not in good_results:
390 result = 1
391 for test in value:
392 print " ", test
393
394 return result
395
396 def add_test_run_options(parser):
397 parser.add_argument("--fail-fast", "-f", action="store_true",
398 help="Exit as soon as any test fails.")
399 parser.add_argument("test", nargs='*',
400 help="Run only tests that are named here.")
401 parser.add_argument("--gdb",
402 help="The command to use to start gdb.")
403
404 def header(title, dash='-'):
405 if title:
406 dashes = dash * (36 - len(title))
407 before = dashes[:len(dashes)/2]
408 after = dashes[len(dashes)/2:]
409 print "%s[ %s ]%s" % (before, title, after)
410 else:
411 print dash * 40
412
413 def print_log(path):
414 header(path)
415 lines = open(path, "r").readlines()
416 if len(lines) > 1000:
417 for l in lines[:500]:
418 sys.stdout.write(l)
419 print "..."
420 for l in lines[-500:]:
421 sys.stdout.write(l)
422 else:
423 for l in lines:
424 sys.stdout.write(l)
425
426 class BaseTest(object):
427 compiled = {}
428
429 def __init__(self, target):
430 self.target = target
431 self.server = None
432 self.target_process = None
433 self.binary = None
434 self.start = 0
435 self.logs = []
436
437 def early_applicable(self):
438 """Return a false value if the test has determined it cannot run
439 without ever needing to talk to the target or server."""
440 # pylint: disable=no-self-use
441 return True
442
443 def setup(self):
444 pass
445
446 def compile(self):
447 compile_args = getattr(self, 'compile_args', None)
448 if compile_args:
449 if compile_args not in BaseTest.compiled:
450 # pylint: disable=star-args
451 BaseTest.compiled[compile_args] = \
452 self.target.compile(*compile_args)
453 self.binary = BaseTest.compiled.get(compile_args)
454
455 def classSetup(self):
456 self.compile()
457 self.target_process = self.target.target()
458 self.server = self.target.server()
459 self.logs.append(self.server.logname)
460
461 def classTeardown(self):
462 del self.server
463 del self.target_process
464
465 def run(self):
466 """
467 If compile_args is set, compile a program and set self.binary.
468
469 Call setup().
470
471 Then call test() and return the result, displaying relevant information
472 if an exception is raised.
473 """
474
475 print "Running", type(self).__name__, "...",
476 sys.stdout.flush()
477
478 if not self.early_applicable():
479 print "not_applicable"
480 return "not_applicable"
481
482 self.start = time.time()
483
484 self.classSetup()
485
486 try:
487 self.setup()
488 result = self.test() # pylint: disable=no-member
489 except TestNotApplicable:
490 result = "not_applicable"
491 except Exception as e: # pylint: disable=broad-except
492 if isinstance(e, TestFailed):
493 result = "fail"
494 else:
495 result = "exception"
496 print "%s in %.2fs" % (result, time.time() - self.start)
497 print "=" * 40
498 if isinstance(e, TestFailed):
499 header("Message")
500 print e.message
501 header("Traceback")
502 traceback.print_exc(file=sys.stdout)
503 for log in self.logs:
504 print_log(log)
505 print "/" * 40
506 return result
507
508 finally:
509 self.classTeardown()
510
511 if not result:
512 result = 'pass'
513 print "%s in %.2fs" % (result, time.time() - self.start)
514 return result
515
516 gdb_cmd = None
517 class GdbTest(BaseTest):
518 def __init__(self, target):
519 BaseTest.__init__(self, target)
520 self.gdb = None
521
522 def classSetup(self):
523 BaseTest.classSetup(self)
524 self.logs.append("gdb.log")
525
526 if gdb_cmd:
527 self.gdb = Gdb(gdb_cmd)
528 else:
529 self.gdb = Gdb()
530
531 if self.binary:
532 self.gdb.command("file %s" % self.binary)
533 if self.target:
534 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
535 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
536 if self.server.port:
537 self.gdb.command(
538 "target extended-remote localhost:%d" % self.server.port)
539
540 self.gdb.p("$priv=3")
541
542 def classTeardown(self):
543 del self.gdb
544 BaseTest.classTeardown(self)
545
546 class ExamineTarget(GdbTest):
547 def test(self):
548 self.target.misa = self.gdb.p("$misa")
549
550 txt = "RV"
551 if (self.target.misa >> 30) == 1:
552 txt += "32"
553 elif (self.target.misa >> 62) == 2:
554 txt += "64"
555 elif (self.target.misa >> 126) == 3:
556 txt += "128"
557 else:
558 txt += "??"
559
560 for i in range(26):
561 if self.target.misa & (1<<i):
562 txt += chr(i + ord('A'))
563 print txt,
564
565 class TestFailed(Exception):
566 def __init__(self, message):
567 Exception.__init__(self)
568 self.message = message
569
570 class TestNotApplicable(Exception):
571 def __init__(self, message):
572 Exception.__init__(self)
573 self.message = message
574
575 def assertEqual(a, b):
576 if a != b:
577 raise TestFailed("%r != %r" % (a, b))
578
579 def assertNotEqual(a, b):
580 if a == b:
581 raise TestFailed("%r == %r" % (a, b))
582
583 def assertIn(a, b):
584 if a not in b:
585 raise TestFailed("%r not in %r" % (a, b))
586
587 def assertNotIn(a, b):
588 if a in b:
589 raise TestFailed("%r in %r" % (a, b))
590
591 def assertGreater(a, b):
592 if not a > b:
593 raise TestFailed("%r not greater than %r" % (a, b))
594
595 def assertLess(a, b):
596 if not a < b:
597 raise TestFailed("%r not less than %r" % (a, b))
598
599 def assertTrue(a):
600 if not a:
601 raise TestFailed("%r is not True" % a)
602
603 def assertRegexpMatches(text, regexp):
604 if not re.search(regexp, text):
605 raise TestFailed("can't find %r in %r" % (regexp, text))