Make pylint happy.
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv%d-unknown-elf-gcc" % xlen)
23 cmd = [cc, "-g"]
24 for arg in args:
25 found = find_file(arg)
26 if found:
27 cmd.append(found)
28 else:
29 cmd.append(arg)
30 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
31 stderr=subprocess.PIPE)
32 stdout, stderr = process.communicate()
33 if process.returncode:
34 print
35 header("Compile failed")
36 print "+", " ".join(cmd)
37 print stdout,
38 print stderr,
39 header("")
40 raise Exception("Compile failed!")
41
42 def unused_port():
43 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
44 import socket
45 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
46 s.bind(("", 0))
47 port = s.getsockname()[1]
48 s.close()
49 return port
50
51 class Spike(object):
52 logname = "spike.log"
53
54 def __init__(self, cmd, binary=None, halted=False, with_jtag_gdb=True,
55 timeout=None, xlen=64):
56 """Launch spike. Return tuple of its process and the port it's running
57 on."""
58 if cmd:
59 cmd = shlex.split(cmd)
60 else:
61 cmd = ["spike", "-l"]
62 if xlen == 32:
63 cmd += ["--isa", "RV32"]
64
65 if timeout:
66 cmd = ["timeout", str(timeout)] + cmd
67
68 if halted:
69 cmd.append('-H')
70 if with_jtag_gdb:
71 cmd += ['--rbb-port', '0']
72 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
73 cmd.append("-m32")
74 cmd.append('pk')
75 if binary:
76 cmd.append(binary)
77 logfile = open(self.logname, "w")
78 logfile.write("+ %s\n" % " ".join(cmd))
79 logfile.flush()
80 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
81 stdout=logfile, stderr=logfile)
82
83 if with_jtag_gdb:
84 self.port = None
85 for _ in range(30):
86 m = re.search(r"Listening for remote bitbang connection on "
87 r"port (\d+).", open(self.logname).read())
88 if m:
89 self.port = int(m.group(1))
90 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
91 break
92 time.sleep(0.11)
93 assert self.port, "Didn't get spike message about bitbang " \
94 "connection"
95
96 def __del__(self):
97 try:
98 self.process.kill()
99 self.process.wait()
100 except OSError:
101 pass
102
103 def wait(self, *args, **kwargs):
104 return self.process.wait(*args, **kwargs)
105
106 class VcsSim(object):
107 def __init__(self, simv=None, debug=False):
108 if simv:
109 cmd = shlex.split(simv)
110 else:
111 cmd = ["simv"]
112 cmd += ["+jtag_vpi_enable"]
113 if debug:
114 cmd[0] = cmd[0] + "-debug"
115 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
116 logfile = open("simv.log", "w")
117 logfile.write("+ %s\n" % " ".join(cmd))
118 logfile.flush()
119 listenfile = open("simv.log", "r")
120 listenfile.seek(0, 2)
121 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
122 stdout=logfile, stderr=logfile)
123 done = False
124 while not done:
125 line = listenfile.readline()
126 if not line:
127 time.sleep(1)
128 match = re.match(r"^Listening on port (\d+)$", line)
129 if match:
130 done = True
131 self.port = int(match.group(1))
132 os.environ['JTAG_VPI_PORT'] = str(self.port)
133
134 def __del__(self):
135 try:
136 self.process.kill()
137 self.process.wait()
138 except OSError:
139 pass
140
141 class Openocd(object):
142 logname = "openocd.log"
143
144 def __init__(self, cmd=None, config=None, debug=False):
145 if cmd:
146 cmd = shlex.split(cmd)
147 else:
148 cmd = ["openocd", "-d"]
149
150 # This command needs to come before any config scripts on the command
151 # line, since they are executed in order.
152 cmd += [
153 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
154 "--command",
155 "gdb_port 0",
156 # Disable tcl and telnet servers, since they are unused and because
157 # the port numbers will conflict if multiple OpenOCD processes are
158 # running on the same server.
159 "--command",
160 "tcl_port disabled",
161 "--command",
162 "telnet_port disabled",
163 ]
164
165 if config:
166 cmd += ["-f", find_file(config)]
167 if debug:
168 cmd.append("-d")
169
170 logfile = open(Openocd.logname, "w")
171 logfile.write("+ %s\n" % " ".join(cmd))
172 logfile.flush()
173 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
174 stdout=logfile, stderr=logfile)
175
176 # Wait for OpenOCD to have made it through riscv_examine(). When using
177 # OpenOCD to communicate with a simulator this may take a long time,
178 # and gdb will time out when trying to connect if we attempt too early.
179 start = time.time()
180 messaged = False
181 while True:
182 log = open(Openocd.logname).read()
183 if "Examined RISCV core" in log:
184 break
185 if not self.process.poll() is None:
186 raise Exception(
187 "OpenOCD exited before completing riscv_examine()")
188 if not messaged and time.time() - start > 1:
189 messaged = True
190 print "Waiting for OpenOCD to examine RISCV core..."
191
192 self.port = self._get_gdb_server_port()
193
194 def _get_gdb_server_port(self):
195 """Get port that OpenOCD's gdb server is listening on."""
196 MAX_ATTEMPTS = 50
197 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
198 for _ in range(MAX_ATTEMPTS):
199 with open(os.devnull, 'w') as devnull:
200 try:
201 output = subprocess.check_output([
202 'lsof',
203 '-a', # Take the AND of the following selectors
204 '-p{}'.format(self.process.pid), # Filter on PID
205 '-iTCP', # Filter only TCP sockets
206 ], stderr=devnull)
207 except subprocess.CalledProcessError:
208 output = ""
209 matches = list(PORT_REGEX.finditer(output))
210 matches = [m for m in matches
211 if m.group('port') not in ('6666', '4444')]
212 if len(matches) > 1:
213 print output
214 raise Exception(
215 "OpenOCD listening on multiple ports. Cannot uniquely "
216 "identify gdb server port.")
217 elif matches:
218 [match] = matches
219 return int(match.group('port'))
220 time.sleep(0.1)
221 raise Exception("Timed out waiting for gdb server to obtain port.")
222
223 def __del__(self):
224 try:
225 self.process.kill()
226 self.process.wait()
227 except OSError:
228 pass
229
230 class OpenocdCli(object):
231 def __init__(self, port=4444):
232 self.child = pexpect.spawn(
233 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
234 self.child.expect("> ")
235
236 def command(self, cmd):
237 self.child.sendline(cmd)
238 self.child.expect(cmd)
239 self.child.expect("\n")
240 self.child.expect("> ")
241 return self.child.before.strip("\t\r\n \0")
242
243 def reg(self, reg=''):
244 output = self.command("reg %s" % reg)
245 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
246 values = {r: int(v, 0) for r, v in matches}
247 if reg:
248 return values[reg]
249 return values
250
251 def load_image(self, image):
252 output = self.command("load_image %s" % image)
253 if 'invalid ELF file, only 32bits files are supported' in output:
254 raise TestNotApplicable(output)
255
256 class CannotAccess(Exception):
257 def __init__(self, address):
258 Exception.__init__(self)
259 self.address = address
260
261 class Gdb(object):
262 def __init__(self,
263 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
264 self.child = pexpect.spawn(cmd)
265 self.child.logfile = open("gdb.log", "w")
266 self.child.logfile.write("+ %s\n" % cmd)
267 self.wait()
268 self.command("set confirm off")
269 self.command("set width 0")
270 self.command("set height 0")
271 # Force consistency.
272 self.command("set print entry-values no")
273
274 def wait(self):
275 """Wait for prompt."""
276 self.child.expect(r"\(gdb\)")
277
278 def command(self, command, timeout=-1):
279 self.child.sendline(command)
280 self.child.expect("\n", timeout=timeout)
281 self.child.expect(r"\(gdb\)", timeout=timeout)
282 return self.child.before.strip()
283
284 def c(self, wait=True):
285 if wait:
286 output = self.command("c")
287 assert "Continuing" in output
288 return output
289 else:
290 self.child.sendline("c")
291 self.child.expect("Continuing")
292
293 def interrupt(self):
294 self.child.send("\003")
295 self.child.expect(r"\(gdb\)", timeout=60)
296 return self.child.before.strip()
297
298 def x(self, address, size='w'):
299 output = self.command("x/%s %s" % (size, address))
300 value = int(output.split(':')[1].strip(), 0)
301 return value
302
303 def p_raw(self, obj):
304 output = self.command("p %s" % obj)
305 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
306 if m:
307 raise CannotAccess(int(m.group(1), 0))
308 return output.split('=')[-1].strip()
309
310 def p(self, obj):
311 output = self.command("p/x %s" % obj)
312 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
313 if m:
314 raise CannotAccess(int(m.group(1), 0))
315 value = int(output.split('=')[-1].strip(), 0)
316 return value
317
318 def p_string(self, obj):
319 output = self.command("p %s" % obj)
320 value = shlex.split(output.split('=')[-1].strip())[1]
321 return value
322
323 def stepi(self):
324 output = self.command("stepi")
325 return output
326
327 def load(self):
328 output = self.command("load", timeout=60)
329 assert "failed" not in output
330 assert "Transfer rate" in output
331
332 def b(self, location):
333 output = self.command("b %s" % location)
334 assert "not defined" not in output
335 assert "Breakpoint" in output
336 return output
337
338 def hbreak(self, location):
339 output = self.command("hbreak %s" % location)
340 assert "not defined" not in output
341 assert "Hardware assisted breakpoint" in output
342 return output
343
344 def run_all_tests(module, target, parsed):
345 good_results = set(('pass', 'not_applicable'))
346
347 start = time.time()
348
349 results = {}
350 count = 0
351
352 global gdb_cmd # pylint: disable=global-statement
353 gdb_cmd = parsed.gdb
354
355 todo = [("ExamineTarget", ExamineTarget)]
356 for name in dir(module):
357 definition = getattr(module, name)
358 if type(definition) == type and hasattr(definition, 'test') and \
359 (not parsed.test or any(test in name for test in parsed.test)):
360 todo.append((name, definition))
361
362 for name, definition in todo:
363 instance = definition(target)
364 result = instance.run()
365 results.setdefault(result, []).append(name)
366 count += 1
367 if result not in good_results and parsed.fail_fast:
368 break
369
370 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
371
372 result = 0
373 for key, value in results.iteritems():
374 print "%d tests returned %s" % (len(value), key)
375 if key not in good_results:
376 result = 1
377 for test in value:
378 print " ", test
379
380 return result
381
382 def add_test_run_options(parser):
383 parser.add_argument("--fail-fast", "-f", action="store_true",
384 help="Exit as soon as any test fails.")
385 parser.add_argument("test", nargs='*',
386 help="Run only tests that are named here.")
387 parser.add_argument("--gdb",
388 help="The command to use to start gdb.")
389
390 def header(title, dash='-'):
391 if title:
392 dashes = dash * (36 - len(title))
393 before = dashes[:len(dashes)/2]
394 after = dashes[len(dashes)/2:]
395 print "%s[ %s ]%s" % (before, title, after)
396 else:
397 print dash * 40
398
399 def print_log(path):
400 header(path)
401 lines = open(path, "r").readlines()
402 if len(lines) > 1000:
403 for l in lines[:500]:
404 sys.stdout.write(l)
405 print "..."
406 for l in lines[-500:]:
407 sys.stdout.write(l)
408 else:
409 for l in lines:
410 sys.stdout.write(l)
411
412 class BaseTest(object):
413 compiled = {}
414
415 def __init__(self, target):
416 self.target = target
417 self.server = None
418 self.target_process = None
419 self.binary = None
420 self.start = 0
421 self.logs = []
422
423 def early_applicable(self):
424 """Return a false value if the test has determined it cannot run
425 without ever needing to talk to the target or server."""
426 # pylint: disable=no-self-use
427 return True
428
429 def setup(self):
430 pass
431
432 def compile(self):
433 compile_args = getattr(self, 'compile_args', None)
434 if compile_args:
435 if compile_args not in BaseTest.compiled:
436 # pylint: disable=star-args
437 BaseTest.compiled[compile_args] = \
438 self.target.compile(*compile_args)
439 self.binary = BaseTest.compiled.get(compile_args)
440
441 def classSetup(self):
442 self.compile()
443 self.target_process = self.target.target()
444 self.server = self.target.server()
445 self.logs.append(self.server.logname)
446
447 def classTeardown(self):
448 del self.server
449 del self.target_process
450
451 def run(self):
452 """
453 If compile_args is set, compile a program and set self.binary.
454
455 Call setup().
456
457 Then call test() and return the result, displaying relevant information
458 if an exception is raised.
459 """
460
461 print "Running", type(self).__name__, "...",
462 sys.stdout.flush()
463
464 if not self.early_applicable():
465 print "not_applicable"
466 return "not_applicable"
467
468 self.start = time.time()
469
470 self.classSetup()
471
472 try:
473 self.setup()
474 result = self.test() # pylint: disable=no-member
475 except TestNotApplicable:
476 result = "not_applicable"
477 except Exception as e: # pylint: disable=broad-except
478 if isinstance(e, TestFailed):
479 result = "fail"
480 else:
481 result = "exception"
482 print "%s in %.2fs" % (result, time.time() - self.start)
483 print "=" * 40
484 if isinstance(e, TestFailed):
485 header("Message")
486 print e.message
487 header("Traceback")
488 traceback.print_exc(file=sys.stdout)
489 for log in self.logs:
490 print_log(log)
491 print "/" * 40
492 return result
493
494 finally:
495 self.classTeardown()
496
497 if not result:
498 result = 'pass'
499 print "%s in %.2fs" % (result, time.time() - self.start)
500 return result
501
502 gdb_cmd = None
503 class GdbTest(BaseTest):
504 def __init__(self, target):
505 BaseTest.__init__(self, target)
506 self.gdb = None
507
508 def classSetup(self):
509 BaseTest.classSetup(self)
510 self.logs.append("gdb.log")
511
512 if gdb_cmd:
513 self.gdb = Gdb(gdb_cmd)
514 else:
515 self.gdb = Gdb()
516
517 if self.binary:
518 self.gdb.command("file %s" % self.binary)
519 if self.target:
520 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
521 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
522 if self.server.port:
523 self.gdb.command(
524 "target extended-remote localhost:%d" % self.server.port)
525
526 self.gdb.p("$priv=3")
527
528 def classTeardown(self):
529 del self.gdb
530 BaseTest.classTeardown(self)
531
532 class ExamineTarget(GdbTest):
533 def test(self):
534 self.target.misa = self.gdb.p("$misa")
535
536 txt = "RV"
537 if (self.target.misa >> 30) == 1:
538 txt += "32"
539 elif (self.target.misa >> 62) == 2:
540 txt += "64"
541 elif (self.target.misa >> 126) == 3:
542 txt += "128"
543 else:
544 txt += "??"
545
546 for i in range(26):
547 if self.target.misa & (1<<i):
548 txt += chr(i + ord('A'))
549 print txt,
550
551 class TestFailed(Exception):
552 def __init__(self, message):
553 Exception.__init__(self)
554 self.message = message
555
556 class TestNotApplicable(Exception):
557 def __init__(self, message):
558 Exception.__init__(self)
559 self.message = message
560
561 def assertEqual(a, b):
562 if a != b:
563 raise TestFailed("%r != %r" % (a, b))
564
565 def assertNotEqual(a, b):
566 if a == b:
567 raise TestFailed("%r == %r" % (a, b))
568
569 def assertIn(a, b):
570 if a not in b:
571 raise TestFailed("%r not in %r" % (a, b))
572
573 def assertNotIn(a, b):
574 if a in b:
575 raise TestFailed("%r in %r" % (a, b))
576
577 def assertGreater(a, b):
578 if not a > b:
579 raise TestFailed("%r not greater than %r" % (a, b))
580
581 def assertLess(a, b):
582 if not a < b:
583 raise TestFailed("%r not less than %r" % (a, b))
584
585 def assertTrue(a):
586 if not a:
587 raise TestFailed("%r is not True" % a)
588
589 def assertRegexpMatches(text, regexp):
590 if not re.search(regexp, text):
591 raise TestFailed("can't find %r in %r" % (regexp, text))