Print log filename at the end of the log.
[riscv-tests.git] / debug / testlib.py
1 import collections
2 import os
3 import os.path
4 import random
5 import re
6 import shlex
7 import subprocess
8 import sys
9 import tempfile
10 import time
11 import traceback
12 import pipes
13
14 import pexpect
15
16 print_log_names = False
17 real_stdout = sys.stdout
18
19 # Note that gdb comes with its own testsuite. I was unable to figure out how to
20 # run that testsuite against the spike simulator.
21
22 def find_file(path):
23 for directory in (os.getcwd(), os.path.dirname(__file__)):
24 fullpath = os.path.join(directory, path)
25 relpath = os.path.relpath(fullpath)
26 if len(relpath) >= len(fullpath):
27 relpath = fullpath
28 if os.path.exists(relpath):
29 return relpath
30 return None
31
32 def compile(args, xlen=32): # pylint: disable=redefined-builtin
33 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
34 cmd = [cc, "-g"]
35 if xlen == 32:
36 cmd.append("-march=rv32imac")
37 cmd.append("-mabi=ilp32")
38 else:
39 cmd.append("-march=rv64imac")
40 cmd.append("-mabi=lp64")
41 for arg in args:
42 found = find_file(arg)
43 if found:
44 cmd.append(found)
45 else:
46 cmd.append(arg)
47 header("Compile")
48 print "+", " ".join(cmd)
49 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
50 stderr=subprocess.PIPE)
51 stdout, stderr = process.communicate()
52 if process.returncode:
53 print stdout,
54 print stderr,
55 header("")
56 raise Exception("Compile failed!")
57
58 class Spike(object):
59 def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True,
60 isa=None, progbufsize=None):
61 """Launch spike. Return tuple of its process and the port it's running
62 on."""
63 self.process = None
64 self.isa = isa
65 self.progbufsize = progbufsize
66
67 if target.harts:
68 harts = target.harts
69 else:
70 harts = [target]
71
72 cmd = self.command(target, harts, halted, timeout, with_jtag_gdb)
73 self.infinite_loop = target.compile(harts[0],
74 "programs/checksum.c", "programs/tiny-malloc.c",
75 "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
76 cmd.append(self.infinite_loop)
77 self.logfile = tempfile.NamedTemporaryFile(prefix="spike-",
78 suffix=".log")
79 self.logname = self.logfile.name
80 if print_log_names:
81 real_stdout.write("Temporary spike log: %s\n" % self.logname)
82 self.logfile.write("+ %s\n" % " ".join(cmd))
83 self.logfile.flush()
84 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
85 stdout=self.logfile, stderr=self.logfile)
86
87 if with_jtag_gdb:
88 self.port = None
89 for _ in range(30):
90 m = re.search(r"Listening for remote bitbang connection on "
91 r"port (\d+).", open(self.logname).read())
92 if m:
93 self.port = int(m.group(1))
94 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
95 break
96 time.sleep(0.11)
97 if not self.port:
98 print_log(self.logname)
99 raise Exception("Didn't get spike message about bitbang "
100 "connection")
101
102 def command(self, target, harts, halted, timeout, with_jtag_gdb):
103 # pylint: disable=no-self-use
104 if target.sim_cmd:
105 cmd = shlex.split(target.sim_cmd)
106 else:
107 spike = os.path.expandvars("$RISCV/bin/spike")
108 cmd = [spike]
109
110 cmd += ["-p%d" % len(harts)]
111
112 assert len(set(t.xlen for t in harts)) == 1, \
113 "All spike harts must have the same XLEN"
114
115 if self.isa:
116 isa = self.isa
117 else:
118 isa = "RV%dG" % harts[0].xlen
119
120 cmd += ["--isa", isa]
121
122 if not self.progbufsize is None:
123 cmd += ["--progsize", str(self.progbufsize)]
124 cmd += ["--debug-sba", "32"]
125
126 assert len(set(t.ram for t in harts)) == 1, \
127 "All spike harts must have the same RAM layout"
128 assert len(set(t.ram_size for t in harts)) == 1, \
129 "All spike harts must have the same RAM layout"
130 cmd += ["-m0x%x:0x%x" % (harts[0].ram, harts[0].ram_size)]
131
132 if timeout:
133 cmd = ["timeout", str(timeout)] + cmd
134
135 if halted:
136 cmd.append('-H')
137 if with_jtag_gdb:
138 cmd += ['--rbb-port', '0']
139 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
140
141 return cmd
142
143 def __del__(self):
144 if self.process:
145 try:
146 self.process.kill()
147 self.process.wait()
148 except OSError:
149 pass
150
151 def wait(self, *args, **kwargs):
152 return self.process.wait(*args, **kwargs)
153
154 class VcsSim(object):
155 logfile = tempfile.NamedTemporaryFile(prefix='simv', suffix='.log')
156 logname = logfile.name
157
158 def __init__(self, sim_cmd=None, debug=False, timeout=300):
159 if sim_cmd:
160 cmd = shlex.split(sim_cmd)
161 else:
162 cmd = ["simv"]
163 cmd += ["+jtag_vpi_enable"]
164 if debug:
165 cmd[0] = cmd[0] + "-debug"
166 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
167
168 logfile = open(self.logname, "w")
169 if print_log_names:
170 real_stdout.write("Temporary VCS log: %s\n" % self.logname)
171 logfile.write("+ %s\n" % " ".join(cmd))
172 logfile.flush()
173
174 listenfile = open(self.logname, "r")
175 listenfile.seek(0, 2)
176 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
177 stdout=logfile, stderr=logfile)
178 done = False
179 start = time.time()
180 while not done:
181 # Fail if VCS exits early
182 exit_code = self.process.poll()
183 if exit_code is not None:
184 raise RuntimeError('VCS simulator exited early with status %d'
185 % exit_code)
186
187 line = listenfile.readline()
188 if not line:
189 time.sleep(1)
190 match = re.match(r"^Listening on port (\d+)$", line)
191 if match:
192 done = True
193 self.port = int(match.group(1))
194 os.environ['JTAG_VPI_PORT'] = str(self.port)
195
196 if (time.time() - start) > timeout:
197 raise Exception("Timed out waiting for VCS to listen for JTAG "
198 "vpi")
199
200 def __del__(self):
201 try:
202 self.process.kill()
203 self.process.wait()
204 except OSError:
205 pass
206
207 class Openocd(object):
208 logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')
209 logname = logfile.name
210
211 def __init__(self, server_cmd=None, config=None, debug=False, timeout=60):
212 self.timeout = timeout
213
214 if server_cmd:
215 cmd = shlex.split(server_cmd)
216 else:
217 openocd = os.path.expandvars("$RISCV/bin/openocd")
218 cmd = [openocd]
219 if debug:
220 cmd.append("-d")
221
222 # This command needs to come before any config scripts on the command
223 # line, since they are executed in order.
224 cmd += [
225 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
226 "--command",
227 "gdb_port 0",
228 # Disable tcl and telnet servers, since they are unused and because
229 # the port numbers will conflict if multiple OpenOCD processes are
230 # running on the same server.
231 "--command",
232 "tcl_port disabled",
233 "--command",
234 "telnet_port disabled",
235 ]
236
237 if config:
238 f = find_file(config)
239 if f is None:
240 print "Unable to read file " + config
241 exit(1)
242
243 cmd += ["-f", f]
244 if debug:
245 cmd.append("-d")
246
247 logfile = open(Openocd.logname, "w")
248 if print_log_names:
249 real_stdout.write("Temporary OpenOCD log: %s\n" % Openocd.logname)
250 env_entries = ("REMOTE_BITBANG_HOST", "REMOTE_BITBANG_PORT")
251 env_entries = [key for key in env_entries if key in os.environ]
252 logfile.write("+ %s%s\n" % (
253 "".join("%s=%s " % (key, os.environ[key]) for key in env_entries),
254 " ".join(map(pipes.quote, cmd))))
255 logfile.flush()
256
257 self.gdb_ports = []
258 self.process = self.start(cmd, logfile)
259
260 def start(self, cmd, logfile):
261 process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
262 stdout=logfile, stderr=logfile)
263
264 try:
265 # Wait for OpenOCD to have made it through riscv_examine(). When
266 # using OpenOCD to communicate with a simulator this may take a
267 # long time, and gdb will time out when trying to connect if we
268 # attempt too early.
269 start = time.time()
270 messaged = False
271 fd = open(Openocd.logname, "r")
272 while True:
273 line = fd.readline()
274 if not line:
275 if not process.poll() is None:
276 raise Exception("OpenOCD exited early.")
277 time.sleep(0.1)
278 continue
279
280 m = re.search(r"Listening on port (\d+) for gdb connections",
281 line)
282 if m:
283 self.gdb_ports.append(int(m.group(1)))
284
285 if "telnet server disabled" in line:
286 return process
287
288 if not messaged and time.time() - start > 1:
289 messaged = True
290 print "Waiting for OpenOCD to start..."
291 if (time.time() - start) > self.timeout:
292 raise Exception("Timed out waiting for OpenOCD to "
293 "listen for gdb")
294
295 except Exception:
296 print_log(Openocd.logname)
297 raise
298
299 def __del__(self):
300 try:
301 self.process.kill()
302 self.process.wait()
303 except (OSError, AttributeError):
304 pass
305
306 class OpenocdCli(object):
307 def __init__(self, port=4444):
308 self.child = pexpect.spawn(
309 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
310 self.child.expect("> ")
311
312 def command(self, cmd):
313 self.child.sendline(cmd)
314 self.child.expect(cmd)
315 self.child.expect("\n")
316 self.child.expect("> ")
317 return self.child.before.strip("\t\r\n \0")
318
319 def reg(self, reg=''):
320 output = self.command("reg %s" % reg)
321 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
322 values = {r: int(v, 0) for r, v in matches}
323 if reg:
324 return values[reg]
325 return values
326
327 def load_image(self, image):
328 output = self.command("load_image %s" % image)
329 if 'invalid ELF file, only 32bits files are supported' in output:
330 raise TestNotApplicable(output)
331
332 class CannotAccess(Exception):
333 def __init__(self, address):
334 Exception.__init__(self)
335 self.address = address
336
337 class CouldNotFetch(Exception):
338 def __init__(self, regname, explanation):
339 Exception.__init__(self)
340 self.regname = regname
341 self.explanation = explanation
342
343 Thread = collections.namedtuple('Thread', ('id', 'description', 'target_id',
344 'name', 'frame'))
345
346 class Gdb(object):
347 """A single gdb class which can interact with one or more gdb instances."""
348
349 # pylint: disable=too-many-public-methods
350 # pylint: disable=too-many-instance-attributes
351
352 def __init__(self, ports,
353 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb"),
354 timeout=60, binary=None):
355 assert ports
356
357 self.ports = ports
358 self.cmd = cmd
359 self.timeout = timeout
360 self.binary = binary
361
362 self.stack = []
363 self.harts = {}
364
365 self.logfiles = []
366 self.children = []
367 for port in ports:
368 logfile = tempfile.NamedTemporaryFile(prefix="gdb@%d-" % port,
369 suffix=".log")
370 self.logfiles.append(logfile)
371 if print_log_names:
372 real_stdout.write("Temporary gdb log: %s\n" % logfile.name)
373 child = pexpect.spawn(cmd)
374 child.logfile = logfile
375 child.logfile.write("+ %s\n" % cmd)
376 self.children.append(child)
377 self.active_child = self.children[0]
378
379 def connect(self):
380 for port, child in zip(self.ports, self.children):
381 self.select_child(child)
382 self.wait()
383 self.command("set confirm off")
384 self.command("set width 0")
385 self.command("set height 0")
386 # Force consistency.
387 self.command("set print entry-values no")
388 self.command("set remotetimeout %d" % self.timeout)
389 self.command("target extended-remote localhost:%d" % port)
390 if self.binary:
391 self.command("file %s" % self.binary)
392 threads = self.threads()
393 for t in threads:
394 hartid = None
395 if t.name:
396 m = re.search(r"Hart (\d+)", t.name)
397 if m:
398 hartid = int(m.group(1))
399 if hartid is None:
400 if self.harts:
401 hartid = max(self.harts) + 1
402 else:
403 hartid = 0
404 # solo: True iff this is the only thread on this child
405 self.harts[hartid] = {'child': child,
406 'thread': t,
407 'solo': len(threads) == 1}
408
409 def __del__(self):
410 for child in self.children:
411 del child
412
413 def one_hart_per_gdb(self):
414 return all(h['solo'] for h in self.harts.itervalues())
415
416 def lognames(self):
417 return [logfile.name for logfile in self.logfiles]
418
419 def select_child(self, child):
420 self.active_child = child
421
422 def select_hart(self, hart):
423 h = self.harts[hart.id]
424 self.select_child(h['child'])
425 if not h['solo']:
426 output = self.command("thread %s" % h['thread'].id, timeout=10)
427 assert "Unknown" not in output
428
429 def push_state(self):
430 self.stack.append({
431 'active_child': self.active_child
432 })
433
434 def pop_state(self):
435 state = self.stack.pop()
436 self.active_child = state['active_child']
437
438 def wait(self):
439 """Wait for prompt."""
440 self.active_child.expect(r"\(gdb\)")
441
442 def command(self, command, timeout=6000):
443 """timeout is in seconds"""
444 self.active_child.sendline(command)
445 self.active_child.expect("\n", timeout=timeout)
446 self.active_child.expect(r"\(gdb\)", timeout=timeout)
447 return self.active_child.before.strip()
448
449 def global_command(self, command):
450 """Execute this command on every gdb that we control."""
451 with PrivateState(self):
452 for child in self.children:
453 self.select_child(child)
454 self.command(command)
455
456 def c(self, wait=True, timeout=-1, async=False):
457 """
458 Dumb c command.
459 In RTOS mode, gdb will resume all harts.
460 In multi-gdb mode, this command will just go to the current gdb, so
461 will only resume one hart.
462 """
463 if async:
464 async = "&"
465 else:
466 async = ""
467 if wait:
468 output = self.command("c%s" % async, timeout=timeout)
469 assert "Continuing" in output
470 return output
471 else:
472 self.active_child.sendline("c%s" % async)
473 self.active_child.expect("Continuing")
474
475 def c_all(self):
476 """
477 Resume every hart.
478
479 This function works fine when using multiple gdb sessions, but the
480 caller must be careful when using it nonetheless. gdb's behavior is to
481 not set breakpoints until just before the hart is resumed, and then
482 clears them as soon as the hart halts. That means that you can't set
483 one software breakpoint, and expect multiple harts to hit it. It's
484 possible that the first hart completes set/run/halt/clear before the
485 second hart even gets to resume, so it will never hit the breakpoint.
486 """
487 with PrivateState(self):
488 for child in self.children:
489 child.sendline("c")
490 child.expect("Continuing")
491
492 # Now wait for them all to halt
493 for child in self.children:
494 child.expect(r"\(gdb\)")
495
496 def interrupt(self):
497 self.active_child.send("\003")
498 self.active_child.expect(r"\(gdb\)", timeout=6000)
499 return self.active_child.before.strip()
500
501 def x(self, address, size='w'):
502 output = self.command("x/%s %s" % (size, address))
503 value = int(output.split(':')[1].strip(), 0)
504 return value
505
506 def p_raw(self, obj):
507 output = self.command("p %s" % obj)
508 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
509 if m:
510 raise CannotAccess(int(m.group(1), 0))
511 return output.split('=')[-1].strip()
512
513 def parse_string(self, text):
514 text = text.strip()
515 if text.startswith("{") and text.endswith("}"):
516 inner = text[1:-1]
517 return [self.parse_string(t) for t in inner.split(", ")]
518 elif text.startswith('"') and text.endswith('"'):
519 return text[1:-1]
520 else:
521 return int(text, 0)
522
523 def p(self, obj, fmt="/x"):
524 output = self.command("p%s %s" % (fmt, obj))
525 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
526 if m:
527 raise CannotAccess(int(m.group(1), 0))
528 m = re.search(r"Could not fetch register \"(\w+)\"; (.*)$", output)
529 if m:
530 raise CouldNotFetch(m.group(1), m.group(2))
531 rhs = output.split('=')[-1]
532 return self.parse_string(rhs)
533
534 def p_string(self, obj):
535 output = self.command("p %s" % obj)
536 value = shlex.split(output.split('=')[-1].strip())[1]
537 return value
538
539 def stepi(self):
540 output = self.command("stepi", timeout=60)
541 return output
542
543 def load(self):
544 output = self.command("load", timeout=6000)
545 assert "failed" not in output
546 assert "Transfer rate" in output
547
548 def b(self, location):
549 output = self.command("b %s" % location)
550 assert "not defined" not in output
551 assert "Breakpoint" in output
552 return output
553
554 def hbreak(self, location):
555 output = self.command("hbreak %s" % location)
556 assert "not defined" not in output
557 assert "Hardware assisted breakpoint" in output
558 return output
559
560 def threads(self):
561 output = self.command("info threads")
562 threads = []
563 for line in output.splitlines():
564 m = re.match(
565 r"[\s\*]*(\d+)\s*"
566 r"(Remote target|Thread (\d+)\s*\(Name: ([^\)]+))"
567 r"\s*(.*)", line)
568 if m:
569 threads.append(Thread(*m.groups()))
570 assert threads
571 #>>>if not threads:
572 #>>> threads.append(Thread('1', '1', 'Default', '???'))
573 return threads
574
575 def thread(self, thread):
576 return self.command("thread %s" % thread.id)
577
578 def where(self):
579 return self.command("where 1")
580
581 class PrivateState(object):
582 def __init__(self, gdb):
583 self.gdb = gdb
584
585 def __enter__(self):
586 self.gdb.push_state()
587
588 def __exit__(self, _type, _value, _traceback):
589 self.gdb.pop_state()
590
591 def run_all_tests(module, target, parsed):
592 if not os.path.exists(parsed.logs):
593 os.makedirs(parsed.logs)
594
595 overall_start = time.time()
596
597 global gdb_cmd # pylint: disable=global-statement
598 gdb_cmd = parsed.gdb
599
600 todo = []
601 examine_added = False
602 for hart in target.harts:
603 if parsed.misaval:
604 hart.misa = int(parsed.misaval, 16)
605 print "Using $misa from command line: 0x%x" % hart.misa
606 elif hart.misa:
607 print "Using $misa from hart definition: 0x%x" % hart.misa
608 elif not examine_added:
609 todo.append(("ExamineTarget", ExamineTarget, None))
610 examine_added = True
611
612 for name in dir(module):
613 definition = getattr(module, name)
614 if isinstance(definition, type) and hasattr(definition, 'test') and \
615 (not parsed.test or any(test in name for test in parsed.test)):
616 todo.append((name, definition, None))
617
618 results, count = run_tests(parsed, target, todo)
619
620 header("ran %d tests in %.0fs" % (count, time.time() - overall_start),
621 dash=':')
622
623 return print_results(results)
624
625 good_results = set(('pass', 'not_applicable'))
626 def run_tests(parsed, target, todo):
627 results = {}
628 count = 0
629
630 for name, definition, hart in todo:
631 log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %
632 (time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))
633 log_fd = open(log_name, 'w')
634 print "[%s] Starting > %s" % (name, log_name)
635 instance = definition(target, hart)
636 sys.stdout.flush()
637 log_fd.write("Test: %s\n" % name)
638 log_fd.write("Target: %s\n" % type(target).__name__)
639 start = time.time()
640 global real_stdout # pylint: disable=global-statement
641 real_stdout = sys.stdout
642 sys.stdout = log_fd
643 try:
644 result = instance.run()
645 log_fd.write("Result: %s\n" % result)
646 log_fd.write("Logfile: %s\n" % log_name)
647 finally:
648 sys.stdout = real_stdout
649 log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
650 log_fd.flush()
651 print "[%s] %s in %.2fs" % (name, result, time.time() - start)
652 if result not in good_results and parsed.print_failures:
653 sys.stdout.write(open(log_name).read())
654 sys.stdout.flush()
655 results.setdefault(result, []).append((name, log_name))
656 count += 1
657 if result not in good_results and parsed.fail_fast:
658 break
659
660 return results, count
661
662 def print_results(results):
663 result = 0
664 for key, value in results.iteritems():
665 print "%d tests returned %s" % (len(value), key)
666 if key not in good_results:
667 result = 1
668 for name, log_name in value:
669 print " %s > %s" % (name, log_name)
670
671 return result
672
673 def add_test_run_options(parser):
674 parser.add_argument("--logs", default="logs",
675 help="Store logs in the specified directory.")
676 parser.add_argument("--fail-fast", "-f", action="store_true",
677 help="Exit as soon as any test fails.")
678 parser.add_argument("--print-failures", action="store_true",
679 help="When a test fails, print the log file to stdout.")
680 parser.add_argument("--print-log-names", "--pln", action="store_true",
681 help="Print names of temporary log files as soon as they are "
682 "created.")
683 parser.add_argument("test", nargs='*',
684 help="Run only tests that are named here.")
685 parser.add_argument("--gdb",
686 help="The command to use to start gdb.")
687 parser.add_argument("--misaval",
688 help="Don't run ExamineTarget, just assume the misa value which is "
689 "specified.")
690
691 def header(title, dash='-', length=78):
692 if title:
693 dashes = dash * (length - 4 - len(title))
694 before = dashes[:len(dashes)/2]
695 after = dashes[len(dashes)/2:]
696 print "%s[ %s ]%s" % (before, title, after)
697 else:
698 print dash * length
699
700 def print_log(path):
701 header(path)
702 for l in open(path, "r"):
703 sys.stdout.write(l)
704 print
705
706 class BaseTest(object):
707 compiled = {}
708
709 def __init__(self, target, hart=None):
710 self.target = target
711 if hart:
712 self.hart = hart
713 else:
714 self.hart = random.choice(target.harts)
715 self.hart = target.harts[-1] #<<<
716 self.server = None
717 self.target_process = None
718 self.binary = None
719 self.start = 0
720 self.logs = []
721
722 def early_applicable(self):
723 """Return a false value if the test has determined it cannot run
724 without ever needing to talk to the target or server."""
725 # pylint: disable=no-self-use
726 return True
727
728 def setup(self):
729 pass
730
731 def compile(self):
732 compile_args = getattr(self, 'compile_args', None)
733 if compile_args:
734 if compile_args not in BaseTest.compiled:
735 BaseTest.compiled[compile_args] = \
736 self.target.compile(self.hart, *compile_args)
737 self.binary = BaseTest.compiled.get(compile_args)
738
739 def classSetup(self):
740 self.compile()
741 self.target_process = self.target.create()
742 if self.target_process:
743 self.logs.append(self.target_process.logname)
744 try:
745 self.server = self.target.server()
746 self.logs.append(self.server.logname)
747 except Exception:
748 for log in self.logs:
749 print_log(log)
750 raise
751
752 def classTeardown(self):
753 del self.server
754 del self.target_process
755
756 def postMortem(self):
757 pass
758
759 def run(self):
760 """
761 If compile_args is set, compile a program and set self.binary.
762
763 Call setup().
764
765 Then call test() and return the result, displaying relevant information
766 if an exception is raised.
767 """
768
769 sys.stdout.flush()
770
771 if not self.early_applicable():
772 return "not_applicable"
773
774 self.start = time.time()
775
776 try:
777 self.classSetup()
778 self.setup()
779 result = self.test() # pylint: disable=no-member
780 except TestNotApplicable:
781 result = "not_applicable"
782 except Exception as e: # pylint: disable=broad-except
783 if isinstance(e, TestFailed):
784 result = "fail"
785 else:
786 result = "exception"
787 if isinstance(e, TestFailed):
788 header("Message")
789 print e.message
790 header("Traceback")
791 traceback.print_exc(file=sys.stdout)
792 try:
793 self.postMortem()
794 except Exception as e: # pylint: disable=broad-except
795 header("postMortem Exception")
796 print e
797 traceback.print_exc(file=sys.stdout)
798 return result
799
800 finally:
801 for log in self.logs:
802 print_log(log)
803 header("End of logs")
804 self.classTeardown()
805
806 if not result:
807 result = 'pass'
808 return result
809
810 gdb_cmd = None
811 class GdbTest(BaseTest):
812 def __init__(self, target, hart=None):
813 BaseTest.__init__(self, target, hart=hart)
814 self.gdb = None
815
816 def classSetup(self):
817 BaseTest.classSetup(self)
818
819 if gdb_cmd:
820 self.gdb = Gdb(self.server.gdb_ports, gdb_cmd,
821 timeout=self.target.timeout_sec, binary=self.binary)
822 else:
823 self.gdb = Gdb(self.server.gdb_ports,
824 timeout=self.target.timeout_sec, binary=self.binary)
825
826 self.logs += self.gdb.lognames()
827 self.gdb.connect()
828
829 self.gdb.global_command("set remotetimeout %d" %
830 self.target.timeout_sec)
831
832 for cmd in self.target.gdb_setup:
833 self.gdb.command(cmd)
834
835 self.gdb.select_hart(self.hart)
836
837 # FIXME: OpenOCD doesn't handle PRIV now
838 #self.gdb.p("$priv=3")
839
840 def postMortem(self):
841 if not self.gdb:
842 return
843 self.gdb.interrupt()
844 self.gdb.command("disassemble")
845 self.gdb.command("info registers all", timeout=10)
846
847 def classTeardown(self):
848 del self.gdb
849 BaseTest.classTeardown(self)
850
851 class GdbSingleHartTest(GdbTest):
852 def classSetup(self):
853 GdbTest.classSetup(self)
854
855 for hart in self.target.harts:
856 # Park all harts that we're not using in a safe place.
857 if hart != self.hart:
858 self.gdb.select_hart(hart)
859 self.gdb.p("$pc=loop_forever")
860 self.gdb.select_hart(self.hart)
861
862 class ExamineTarget(GdbTest):
863 def test(self):
864 for hart in self.target.harts:
865 self.gdb.select_hart(hart)
866
867 hart.misa = self.gdb.p("$misa")
868
869 txt = "RV"
870 misa_xlen = 0
871 if ((hart.misa & 0xFFFFFFFF) >> 30) == 1:
872 misa_xlen = 32
873 elif ((hart.misa & 0xFFFFFFFFFFFFFFFF) >> 62) == 2:
874 misa_xlen = 64
875 elif ((hart.misa & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) >> 126) == 3:
876 misa_xlen = 128
877 else:
878 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
879 self.hart.misa)
880
881 if misa_xlen != hart.xlen:
882 raise TestFailed("MISA reported XLEN of %d but we were "\
883 "expecting XLEN of %d\n" % (misa_xlen, hart.xlen))
884
885 txt += ("%d" % misa_xlen)
886
887 for i in range(26):
888 if hart.misa & (1<<i):
889 txt += chr(i + ord('A'))
890 print txt,
891
892 class TestFailed(Exception):
893 def __init__(self, message):
894 Exception.__init__(self)
895 self.message = message
896
897 class TestNotApplicable(Exception):
898 def __init__(self, message):
899 Exception.__init__(self)
900 self.message = message
901
902 def assertEqual(a, b):
903 if a != b:
904 raise TestFailed("%r != %r" % (a, b))
905
906 def assertNotEqual(a, b):
907 if a == b:
908 raise TestFailed("%r == %r" % (a, b))
909
910 def assertIn(a, b):
911 if a not in b:
912 raise TestFailed("%r not in %r" % (a, b))
913
914 def assertNotIn(a, b):
915 if a in b:
916 raise TestFailed("%r in %r" % (a, b))
917
918 def assertGreater(a, b):
919 if not a > b:
920 raise TestFailed("%r not greater than %r" % (a, b))
921
922 def assertLess(a, b):
923 if not a < b:
924 raise TestFailed("%r not less than %r" % (a, b))
925
926 def assertTrue(a):
927 if not a:
928 raise TestFailed("%r is not True" % a)
929
930 def assertRegexpMatches(text, regexp):
931 if not re.search(regexp, text):
932 raise TestFailed("can't find %r in %r" % (regexp, text))