Test mstatus.TW, mstatus.TVM, and mstatus.TSR features
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv%d-unknown-elf-gcc" % xlen)
23 cmd = [cc, "-g"]
24 for arg in args:
25 found = find_file(arg)
26 if found:
27 cmd.append(found)
28 else:
29 cmd.append(arg)
30 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
31 stderr=subprocess.PIPE)
32 stdout, stderr = process.communicate()
33 if process.returncode:
34 print
35 header("Compile failed")
36 print "+", " ".join(cmd)
37 print stdout,
38 print stderr,
39 header("")
40 raise Exception("Compile failed!")
41
42 def unused_port():
43 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
44 import socket
45 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
46 s.bind(("", 0))
47 port = s.getsockname()[1]
48 s.close()
49 return port
50
51 class Spike(object):
52 logname = "spike.log"
53
54 def __init__(self, cmd, binary=None, halted=False, with_gdb=True,
55 timeout=None, xlen=64):
56 """Launch spike. Return tuple of its process and the port it's running
57 on."""
58 if cmd:
59 cmd = shlex.split(cmd)
60 else:
61 cmd = ["spike"]
62 if xlen == 32:
63 cmd += ["--isa", "RV32"]
64
65 if timeout:
66 cmd = ["timeout", str(timeout)] + cmd
67
68 if halted:
69 cmd.append('-H')
70 if with_gdb:
71 self.port = unused_port()
72 cmd += ['--gdb-port', str(self.port)]
73 cmd.append("-m32")
74 cmd.append('pk')
75 if binary:
76 cmd.append(binary)
77 logfile = open(self.logname, "w")
78 logfile.write("+ %s\n" % " ".join(cmd))
79 logfile.flush()
80 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
81 stdout=logfile, stderr=logfile)
82
83 def __del__(self):
84 try:
85 self.process.kill()
86 self.process.wait()
87 except OSError:
88 pass
89
90 def wait(self, *args, **kwargs):
91 return self.process.wait(*args, **kwargs)
92
93 class VcsSim(object):
94 def __init__(self, simv=None, debug=False):
95 if simv:
96 cmd = shlex.split(simv)
97 else:
98 cmd = ["simv"]
99 cmd += ["+jtag_vpi_enable"]
100 if debug:
101 cmd[0] = cmd[0] + "-debug"
102 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
103 logfile = open("simv.log", "w")
104 logfile.write("+ %s\n" % " ".join(cmd))
105 logfile.flush()
106 listenfile = open("simv.log", "r")
107 listenfile.seek(0, 2)
108 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
109 stdout=logfile, stderr=logfile)
110 done = False
111 while not done:
112 line = listenfile.readline()
113 if not line:
114 time.sleep(1)
115 match = re.match(r"^Listening on port (\d+)$", line)
116 if match:
117 done = True
118 self.port = int(match.group(1))
119 os.environ['JTAG_VPI_PORT'] = str(self.port)
120
121 def __del__(self):
122 try:
123 self.process.kill()
124 self.process.wait()
125 except OSError:
126 pass
127
128 class Openocd(object):
129 logname = "openocd.log"
130
131 def __init__(self, cmd=None, config=None, debug=False):
132 if cmd:
133 cmd = shlex.split(cmd)
134 else:
135 cmd = ["openocd"]
136 if config:
137 cmd += ["-f", find_file(config)]
138 if debug:
139 cmd.append("-d")
140
141 # This command needs to come before any config scripts on the command
142 # line, since they are executed in order.
143 cmd[1:1] = [
144 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
145 "--command",
146 "gdb_port 0",
147 # Disable tcl and telnet servers, since they are unused and because
148 # the port numbers will conflict if multiple OpenOCD processes are
149 # running on the same server.
150 "--command",
151 "tcl_port disabled",
152 "--command",
153 "telnet_port disabled",
154 ]
155
156 logfile = open(Openocd.logname, "w")
157 logfile.write("+ %s\n" % " ".join(cmd))
158 logfile.flush()
159 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
160 stdout=logfile, stderr=logfile)
161
162 # Wait for OpenOCD to have made it through riscv_examine(). When using
163 # OpenOCD to communicate with a simulator this may take a long time,
164 # and gdb will time out when trying to connect if we attempt too early.
165 start = time.time()
166 messaged = False
167 while True:
168 log = open(Openocd.logname).read()
169 if "Examined RISCV core" in log:
170 break
171 if not self.process.poll() is None:
172 raise Exception(
173 "OpenOCD exited before completing riscv_examine()")
174 if not messaged and time.time() - start > 1:
175 messaged = True
176 print "Waiting for OpenOCD to examine RISCV core..."
177
178 self.port = self._get_gdb_server_port()
179
180 def _get_gdb_server_port(self):
181 """Get port that OpenOCD's gdb server is listening on."""
182 MAX_ATTEMPTS = 50
183 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
184 for _ in range(MAX_ATTEMPTS):
185 with open(os.devnull, 'w') as devnull:
186 try:
187 output = subprocess.check_output([
188 'lsof',
189 '-a', # Take the AND of the following selectors
190 '-p{}'.format(self.process.pid), # Filter on PID
191 '-iTCP', # Filter only TCP sockets
192 ], stderr=devnull)
193 except subprocess.CalledProcessError:
194 output = ""
195 matches = list(PORT_REGEX.finditer(output))
196 matches = [m for m in matches
197 if m.group('port') not in ('6666', '4444')]
198 if len(matches) > 1:
199 print output
200 raise Exception(
201 "OpenOCD listening on multiple ports. Cannot uniquely "
202 "identify gdb server port.")
203 elif matches:
204 [match] = matches
205 return int(match.group('port'))
206 time.sleep(0.1)
207 raise Exception("Timed out waiting for gdb server to obtain port.")
208
209 def __del__(self):
210 try:
211 self.process.kill()
212 self.process.wait()
213 except OSError:
214 pass
215
216 class OpenocdCli(object):
217 def __init__(self, port=4444):
218 self.child = pexpect.spawn(
219 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
220 self.child.expect("> ")
221
222 def command(self, cmd):
223 self.child.sendline(cmd)
224 self.child.expect(cmd)
225 self.child.expect("\n")
226 self.child.expect("> ")
227 return self.child.before.strip("\t\r\n \0")
228
229 def reg(self, reg=''):
230 output = self.command("reg %s" % reg)
231 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
232 values = {r: int(v, 0) for r, v in matches}
233 if reg:
234 return values[reg]
235 return values
236
237 def load_image(self, image):
238 output = self.command("load_image %s" % image)
239 if 'invalid ELF file, only 32bits files are supported' in output:
240 raise TestNotApplicable(output)
241
242 class CannotAccess(Exception):
243 def __init__(self, address):
244 Exception.__init__(self)
245 self.address = address
246
247 class Gdb(object):
248 def __init__(self,
249 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
250 self.child = pexpect.spawn(cmd)
251 self.child.logfile = open("gdb.log", "w")
252 self.child.logfile.write("+ %s\n" % cmd)
253 self.wait()
254 self.command("set confirm off")
255 self.command("set width 0")
256 self.command("set height 0")
257 # Force consistency.
258 self.command("set print entry-values no")
259
260 def wait(self):
261 """Wait for prompt."""
262 self.child.expect(r"\(gdb\)")
263
264 def command(self, command, timeout=-1):
265 self.child.sendline(command)
266 self.child.expect("\n", timeout=timeout)
267 self.child.expect(r"\(gdb\)", timeout=timeout)
268 return self.child.before.strip()
269
270 def c(self, wait=True):
271 if wait:
272 output = self.command("c")
273 assert "Continuing" in output
274 return output
275 else:
276 self.child.sendline("c")
277 self.child.expect("Continuing")
278
279 def interrupt(self):
280 self.child.send("\003")
281 self.child.expect(r"\(gdb\)", timeout=60)
282 return self.child.before.strip()
283
284 def x(self, address, size='w'):
285 output = self.command("x/%s %s" % (size, address))
286 value = int(output.split(':')[1].strip(), 0)
287 return value
288
289 def p_raw(self, obj):
290 output = self.command("p %s" % obj)
291 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
292 if m:
293 raise CannotAccess(int(m.group(1), 0))
294 return output.split('=')[-1].strip()
295
296 def p(self, obj):
297 output = self.command("p/x %s" % obj)
298 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
299 if m:
300 raise CannotAccess(int(m.group(1), 0))
301 value = int(output.split('=')[-1].strip(), 0)
302 return value
303
304 def p_string(self, obj):
305 output = self.command("p %s" % obj)
306 value = shlex.split(output.split('=')[-1].strip())[1]
307 return value
308
309 def stepi(self):
310 output = self.command("stepi")
311 return output
312
313 def load(self):
314 output = self.command("load", timeout=60)
315 assert "failed" not in output
316 assert "Transfer rate" in output
317
318 def b(self, location):
319 output = self.command("b %s" % location)
320 assert "not defined" not in output
321 assert "Breakpoint" in output
322 return output
323
324 def hbreak(self, location):
325 output = self.command("hbreak %s" % location)
326 assert "not defined" not in output
327 assert "Hardware assisted breakpoint" in output
328 return output
329
330 def run_all_tests(module, target, parsed):
331 good_results = set(('pass', 'not_applicable'))
332
333 start = time.time()
334
335 results = {}
336 count = 0
337
338 global gdb_cmd # pylint: disable=global-statement
339 gdb_cmd = parsed.gdb
340
341 todo = [("ExamineTarget", ExamineTarget)]
342 for name in dir(module):
343 definition = getattr(module, name)
344 if type(definition) == type and hasattr(definition, 'test') and \
345 (not parsed.test or any(test in name for test in parsed.test)):
346 todo.append((name, definition))
347
348 for name, definition in todo:
349 instance = definition(target)
350 result = instance.run()
351 results.setdefault(result, []).append(name)
352 count += 1
353 if result not in good_results and parsed.fail_fast:
354 break
355
356 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
357
358 result = 0
359 for key, value in results.iteritems():
360 print "%d tests returned %s" % (len(value), key)
361 if key not in good_results:
362 result = 1
363 for test in value:
364 print " ", test
365
366 return result
367
368 def add_test_run_options(parser):
369 parser.add_argument("--fail-fast", "-f", action="store_true",
370 help="Exit as soon as any test fails.")
371 parser.add_argument("test", nargs='*',
372 help="Run only tests that are named here.")
373 parser.add_argument("--gdb",
374 help="The command to use to start gdb.")
375
376 def header(title, dash='-'):
377 if title:
378 dashes = dash * (36 - len(title))
379 before = dashes[:len(dashes)/2]
380 after = dashes[len(dashes)/2:]
381 print "%s[ %s ]%s" % (before, title, after)
382 else:
383 print dash * 40
384
385 def print_log(path):
386 header(path)
387 lines = open(path, "r").readlines()
388 if len(lines) > 1000:
389 for l in lines[:500]:
390 sys.stdout.write(l)
391 print "..."
392 for l in lines[-500:]:
393 sys.stdout.write(l)
394 else:
395 for l in lines:
396 sys.stdout.write(l)
397
398 class BaseTest(object):
399 compiled = {}
400
401 def __init__(self, target):
402 self.target = target
403 self.server = None
404 self.target_process = None
405 self.binary = None
406 self.start = 0
407 self.logs = []
408
409 def early_applicable(self):
410 """Return a false value if the test has determined it cannot run
411 without ever needing to talk to the target or server."""
412 # pylint: disable=no-self-use
413 return True
414
415 def setup(self):
416 pass
417
418 def compile(self):
419 compile_args = getattr(self, 'compile_args', None)
420 if compile_args:
421 if compile_args not in BaseTest.compiled:
422 # pylint: disable=star-args
423 BaseTest.compiled[compile_args] = \
424 self.target.compile(*compile_args)
425 self.binary = BaseTest.compiled.get(compile_args)
426
427 def classSetup(self):
428 self.compile()
429 self.target_process = self.target.target()
430 self.server = self.target.server()
431 self.logs.append(self.server.logname)
432
433 def classTeardown(self):
434 del self.server
435 del self.target_process
436
437 def run(self):
438 """
439 If compile_args is set, compile a program and set self.binary.
440
441 Call setup().
442
443 Then call test() and return the result, displaying relevant information
444 if an exception is raised.
445 """
446
447 print "Running", type(self).__name__, "...",
448 sys.stdout.flush()
449
450 if not self.early_applicable():
451 print "not_applicable"
452 return "not_applicable"
453
454 self.start = time.time()
455
456 self.classSetup()
457
458 try:
459 self.setup()
460 result = self.test() # pylint: disable=no-member
461 except TestNotApplicable:
462 result = "not_applicable"
463 except Exception as e: # pylint: disable=broad-except
464 if isinstance(e, TestFailed):
465 result = "fail"
466 else:
467 result = "exception"
468 print "%s in %.2fs" % (result, time.time() - self.start)
469 print "=" * 40
470 if isinstance(e, TestFailed):
471 header("Message")
472 print e.message
473 header("Traceback")
474 traceback.print_exc(file=sys.stdout)
475 for log in self.logs:
476 print_log(log)
477 print "/" * 40
478 return result
479
480 finally:
481 self.classTeardown()
482
483 if not result:
484 result = 'pass'
485 print "%s in %.2fs" % (result, time.time() - self.start)
486 return result
487
488 gdb_cmd = None
489 class GdbTest(BaseTest):
490 def __init__(self, target):
491 BaseTest.__init__(self, target)
492 self.gdb = None
493
494 def classSetup(self):
495 BaseTest.classSetup(self)
496 self.logs.append("gdb.log")
497
498 if gdb_cmd:
499 self.gdb = Gdb(gdb_cmd)
500 else:
501 self.gdb = Gdb()
502
503 if self.binary:
504 self.gdb.command("file %s" % self.binary)
505 if self.target:
506 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
507 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
508 if self.server.port:
509 self.gdb.command(
510 "target extended-remote localhost:%d" % self.server.port)
511
512 self.gdb.p("$priv=3")
513
514 def classTeardown(self):
515 del self.gdb
516 BaseTest.classTeardown(self)
517
518 class ExamineTarget(GdbTest):
519 def test(self):
520 self.target.misa = self.gdb.p("$misa")
521
522 txt = "RV"
523 if (self.target.misa >> 30) == 1:
524 txt += "32"
525 elif (self.target.misa >> 62) == 2:
526 txt += "64"
527 elif (self.target.misa >> 126) == 3:
528 txt += "128"
529 else:
530 txt += "??"
531
532 for i in range(26):
533 if self.target.misa & (1<<i):
534 txt += chr(i + ord('A'))
535 print txt,
536
537 class TestFailed(Exception):
538 def __init__(self, message):
539 Exception.__init__(self)
540 self.message = message
541
542 class TestNotApplicable(Exception):
543 def __init__(self, message):
544 Exception.__init__(self)
545 self.message = message
546
547 def assertEqual(a, b):
548 if a != b:
549 raise TestFailed("%r != %r" % (a, b))
550
551 def assertNotEqual(a, b):
552 if a == b:
553 raise TestFailed("%r == %r" % (a, b))
554
555 def assertIn(a, b):
556 if a not in b:
557 raise TestFailed("%r not in %r" % (a, b))
558
559 def assertNotIn(a, b):
560 if a in b:
561 raise TestFailed("%r in %r" % (a, b))
562
563 def assertGreater(a, b):
564 if not a > b:
565 raise TestFailed("%r not greater than %r" % (a, b))
566
567 def assertLess(a, b):
568 if not a < b:
569 raise TestFailed("%r not less than %r" % (a, b))
570
571 def assertTrue(a):
572 if not a:
573 raise TestFailed("%r is not True" % a)
574
575 def assertRegexpMatches(text, regexp):
576 if not re.search(regexp, text):
577 raise TestFailed("can't find %r in %r" % (regexp, text))