Test debugging with/without a program buffer
[riscv-tests.git] / debug / testlib.py
1 import collections
2 import os
3 import os.path
4 import random
5 import re
6 import shlex
7 import subprocess
8 import sys
9 import tempfile
10 import time
11 import traceback
12 import pipes
13
14 import pexpect
15
16 print_log_names = False
17 real_stdout = sys.stdout
18
19 # Note that gdb comes with its own testsuite. I was unable to figure out how to
20 # run that testsuite against the spike simulator.
21
22 def find_file(path):
23 for directory in (os.getcwd(), os.path.dirname(__file__)):
24 fullpath = os.path.join(directory, path)
25 relpath = os.path.relpath(fullpath)
26 if len(relpath) >= len(fullpath):
27 relpath = fullpath
28 if os.path.exists(relpath):
29 return relpath
30 return None
31
32 def compile(args, xlen=32): # pylint: disable=redefined-builtin
33 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
34 cmd = [cc, "-g"]
35 if xlen == 32:
36 cmd.append("-march=rv32imac")
37 cmd.append("-mabi=ilp32")
38 else:
39 cmd.append("-march=rv64imac")
40 cmd.append("-mabi=lp64")
41 for arg in args:
42 found = find_file(arg)
43 if found:
44 cmd.append(found)
45 else:
46 cmd.append(arg)
47 header("Compile")
48 print "+", " ".join(cmd)
49 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
50 stderr=subprocess.PIPE)
51 stdout, stderr = process.communicate()
52 if process.returncode:
53 print stdout,
54 print stderr,
55 header("")
56 raise Exception("Compile failed!")
57
58 class Spike(object):
59 def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True,
60 isa=None, progbufsize=None):
61 """Launch spike. Return tuple of its process and the port it's running
62 on."""
63 self.process = None
64 self.isa = isa
65 self.progbufsize = progbufsize
66
67 if target.harts:
68 harts = target.harts
69 else:
70 harts = [target]
71
72 cmd = self.command(target, harts, halted, timeout, with_jtag_gdb)
73 self.infinite_loop = target.compile(harts[0],
74 "programs/checksum.c", "programs/tiny-malloc.c",
75 "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
76 cmd.append(self.infinite_loop)
77 self.logfile = tempfile.NamedTemporaryFile(prefix="spike-",
78 suffix=".log")
79 self.logname = self.logfile.name
80 if print_log_names:
81 real_stdout.write("Temporary spike log: %s\n" % self.logname)
82 self.logfile.write("+ %s\n" % " ".join(cmd))
83 self.logfile.flush()
84 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
85 stdout=self.logfile, stderr=self.logfile)
86
87 if with_jtag_gdb:
88 self.port = None
89 for _ in range(30):
90 m = re.search(r"Listening for remote bitbang connection on "
91 r"port (\d+).", open(self.logname).read())
92 if m:
93 self.port = int(m.group(1))
94 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
95 break
96 time.sleep(0.11)
97 if not self.port:
98 print_log(self.logname)
99 raise Exception("Didn't get spike message about bitbang "
100 "connection")
101
102 def command(self, target, harts, halted, timeout, with_jtag_gdb):
103 # pylint: disable=no-self-use
104 if target.sim_cmd:
105 cmd = shlex.split(target.sim_cmd)
106 else:
107 spike = os.path.expandvars("$RISCV/bin/spike")
108 cmd = [spike]
109
110 cmd += ["-p%d" % len(harts)]
111
112 assert len(set(t.xlen for t in harts)) == 1, \
113 "All spike harts must have the same XLEN"
114
115 if self.isa:
116 isa = self.isa
117 else:
118 isa = "RV%dG" % harts[0].xlen
119
120 cmd += ["--isa", isa]
121
122 if not self.progbufsize is None:
123 cmd += ["--progsize", str(self.progbufsize)]
124 cmd += ["--debug-sba", "32"]
125
126 assert len(set(t.ram for t in harts)) == 1, \
127 "All spike harts must have the same RAM layout"
128 assert len(set(t.ram_size for t in harts)) == 1, \
129 "All spike harts must have the same RAM layout"
130 cmd += ["-m0x%x:0x%x" % (harts[0].ram, harts[0].ram_size)]
131
132 if timeout:
133 cmd = ["timeout", str(timeout)] + cmd
134
135 if halted:
136 cmd.append('-H')
137 if with_jtag_gdb:
138 cmd += ['--rbb-port', '0']
139 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
140
141 return cmd
142
143 def __del__(self):
144 if self.process:
145 try:
146 self.process.kill()
147 self.process.wait()
148 except OSError:
149 pass
150
151 def wait(self, *args, **kwargs):
152 return self.process.wait(*args, **kwargs)
153
154 class VcsSim(object):
155 logfile = tempfile.NamedTemporaryFile(prefix='simv', suffix='.log')
156 logname = logfile.name
157
158 def __init__(self, sim_cmd=None, debug=False, timeout=300):
159 if sim_cmd:
160 cmd = shlex.split(sim_cmd)
161 else:
162 cmd = ["simv"]
163 cmd += ["+jtag_vpi_enable"]
164 if debug:
165 cmd[0] = cmd[0] + "-debug"
166 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
167
168 logfile = open(self.logname, "w")
169 if print_log_names:
170 real_stdout.write("Temporary VCS log: %s\n" % self.logname)
171 logfile.write("+ %s\n" % " ".join(cmd))
172 logfile.flush()
173
174 listenfile = open(self.logname, "r")
175 listenfile.seek(0, 2)
176 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
177 stdout=logfile, stderr=logfile)
178 done = False
179 start = time.time()
180 while not done:
181 # Fail if VCS exits early
182 exit_code = self.process.poll()
183 if exit_code is not None:
184 raise RuntimeError('VCS simulator exited early with status %d'
185 % exit_code)
186
187 line = listenfile.readline()
188 if not line:
189 time.sleep(1)
190 match = re.match(r"^Listening on port (\d+)$", line)
191 if match:
192 done = True
193 self.port = int(match.group(1))
194 os.environ['JTAG_VPI_PORT'] = str(self.port)
195
196 if (time.time() - start) > timeout:
197 raise Exception("Timed out waiting for VCS to listen for JTAG "
198 "vpi")
199
200 def __del__(self):
201 try:
202 self.process.kill()
203 self.process.wait()
204 except OSError:
205 pass
206
207 class Openocd(object):
208 logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')
209 logname = logfile.name
210
211 def __init__(self, server_cmd=None, config=None, debug=False, timeout=60):
212 self.timeout = timeout
213
214 if server_cmd:
215 cmd = shlex.split(server_cmd)
216 else:
217 openocd = os.path.expandvars("$RISCV/bin/openocd")
218 cmd = [openocd]
219 if debug:
220 cmd.append("-d")
221
222 # This command needs to come before any config scripts on the command
223 # line, since they are executed in order.
224 cmd += [
225 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
226 "--command",
227 "gdb_port 0",
228 # Disable tcl and telnet servers, since they are unused and because
229 # the port numbers will conflict if multiple OpenOCD processes are
230 # running on the same server.
231 "--command",
232 "tcl_port disabled",
233 "--command",
234 "telnet_port disabled",
235 ]
236
237 if config:
238 f = find_file(config)
239 if f is None:
240 print "Unable to read file " + config
241 exit(1)
242
243 cmd += ["-f", f]
244 if debug:
245 cmd.append("-d")
246
247 logfile = open(Openocd.logname, "w")
248 if print_log_names:
249 real_stdout.write("Temporary OpenOCD log: %s\n" % Openocd.logname)
250 env_entries = ("REMOTE_BITBANG_HOST", "REMOTE_BITBANG_PORT")
251 env_entries = [key for key in env_entries if key in os.environ]
252 logfile.write("+ %s%s\n" % (
253 "".join("%s=%s " % (key, os.environ[key]) for key in env_entries),
254 " ".join(map(pipes.quote, cmd))))
255 logfile.flush()
256
257 self.gdb_ports = []
258 self.process = self.start(cmd, logfile)
259
260 def start(self, cmd, logfile):
261 process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
262 stdout=logfile, stderr=logfile)
263
264 try:
265 # Wait for OpenOCD to have made it through riscv_examine(). When
266 # using OpenOCD to communicate with a simulator this may take a
267 # long time, and gdb will time out when trying to connect if we
268 # attempt too early.
269 start = time.time()
270 messaged = False
271 fd = open(Openocd.logname, "r")
272 while True:
273 line = fd.readline()
274 if not line:
275 if not process.poll() is None:
276 raise Exception("OpenOCD exited early.")
277 time.sleep(0.1)
278 continue
279
280 m = re.search(r"Listening on port (\d+) for gdb connections",
281 line)
282 if m:
283 self.gdb_ports.append(int(m.group(1)))
284
285 if "telnet server disabled" in line:
286 return process
287
288 if not messaged and time.time() - start > 1:
289 messaged = True
290 print "Waiting for OpenOCD to start..."
291 if (time.time() - start) > self.timeout:
292 raise Exception("Timed out waiting for OpenOCD to "
293 "listen for gdb")
294
295 except Exception:
296 print_log(Openocd.logname)
297 raise
298
299 def __del__(self):
300 try:
301 self.process.kill()
302 self.process.wait()
303 except (OSError, AttributeError):
304 pass
305
306 class OpenocdCli(object):
307 def __init__(self, port=4444):
308 self.child = pexpect.spawn(
309 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
310 self.child.expect("> ")
311
312 def command(self, cmd):
313 self.child.sendline(cmd)
314 self.child.expect(cmd)
315 self.child.expect("\n")
316 self.child.expect("> ")
317 return self.child.before.strip("\t\r\n \0")
318
319 def reg(self, reg=''):
320 output = self.command("reg %s" % reg)
321 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
322 values = {r: int(v, 0) for r, v in matches}
323 if reg:
324 return values[reg]
325 return values
326
327 def load_image(self, image):
328 output = self.command("load_image %s" % image)
329 if 'invalid ELF file, only 32bits files are supported' in output:
330 raise TestNotApplicable(output)
331
332 class CannotAccess(Exception):
333 def __init__(self, address):
334 Exception.__init__(self)
335 self.address = address
336
337 class CouldNotFetch(Exception):
338 def __init__(self, regname, explanation):
339 Exception.__init__(self)
340 self.regname = regname
341 self.explanation = explanation
342
343 Thread = collections.namedtuple('Thread', ('id', 'description', 'target_id',
344 'name', 'frame'))
345
346 class Gdb(object):
347 """A single gdb class which can interact with one or more gdb instances."""
348
349 # pylint: disable=too-many-public-methods
350 # pylint: disable=too-many-instance-attributes
351
352 def __init__(self, ports,
353 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb"),
354 timeout=60, binary=None):
355 assert ports
356
357 self.ports = ports
358 self.cmd = cmd
359 self.timeout = timeout
360 self.binary = binary
361
362 self.stack = []
363 self.harts = {}
364
365 self.logfiles = []
366 self.children = []
367 for port in ports:
368 logfile = tempfile.NamedTemporaryFile(prefix="gdb@%d-" % port,
369 suffix=".log")
370 self.logfiles.append(logfile)
371 if print_log_names:
372 real_stdout.write("Temporary gdb log: %s\n" % logfile.name)
373 child = pexpect.spawn(cmd)
374 child.logfile = logfile
375 child.logfile.write("+ %s\n" % cmd)
376 self.children.append(child)
377 self.active_child = self.children[0]
378
379 def connect(self):
380 for port, child in zip(self.ports, self.children):
381 self.select_child(child)
382 self.wait()
383 self.command("set confirm off")
384 self.command("set width 0")
385 self.command("set height 0")
386 # Force consistency.
387 self.command("set print entry-values no")
388 self.command("set remotetimeout %d" % self.timeout)
389 self.command("target extended-remote localhost:%d" % port)
390 if self.binary:
391 self.command("file %s" % self.binary)
392 threads = self.threads()
393 for t in threads:
394 hartid = None
395 if t.name:
396 m = re.search(r"Hart (\d+)", t.name)
397 if m:
398 hartid = int(m.group(1))
399 if hartid is None:
400 if self.harts:
401 hartid = max(self.harts) + 1
402 else:
403 hartid = 0
404 # solo: True iff this is the only thread on this child
405 self.harts[hartid] = {'child': child,
406 'thread': t,
407 'solo': len(threads) == 1}
408
409 def __del__(self):
410 for child in self.children:
411 del child
412
413 def one_hart_per_gdb(self):
414 return all(h['solo'] for h in self.harts.itervalues())
415
416 def lognames(self):
417 return [logfile.name for logfile in self.logfiles]
418
419 def select_child(self, child):
420 self.active_child = child
421
422 def select_hart(self, hart):
423 h = self.harts[hart.id]
424 self.select_child(h['child'])
425 if not h['solo']:
426 output = self.command("thread %s" % h['thread'].id, timeout=10)
427 assert "Unknown" not in output
428
429 def push_state(self):
430 self.stack.append({
431 'active_child': self.active_child
432 })
433
434 def pop_state(self):
435 state = self.stack.pop()
436 self.active_child = state['active_child']
437
438 def wait(self):
439 """Wait for prompt."""
440 self.active_child.expect(r"\(gdb\)")
441
442 def command(self, command, timeout=6000):
443 """timeout is in seconds"""
444 self.active_child.sendline(command)
445 self.active_child.expect("\n", timeout=timeout)
446 self.active_child.expect(r"\(gdb\)", timeout=timeout)
447 return self.active_child.before.strip()
448
449 def global_command(self, command):
450 """Execute this command on every gdb that we control."""
451 with PrivateState(self):
452 for child in self.children:
453 self.select_child(child)
454 self.command(command)
455
456 def c(self, wait=True, timeout=-1, async=False):
457 """
458 Dumb c command.
459 In RTOS mode, gdb will resume all harts.
460 In multi-gdb mode, this command will just go to the current gdb, so
461 will only resume one hart.
462 """
463 if async:
464 async = "&"
465 else:
466 async = ""
467 if wait:
468 output = self.command("c%s" % async, timeout=timeout)
469 assert "Continuing" in output
470 return output
471 else:
472 self.active_child.sendline("c%s" % async)
473 self.active_child.expect("Continuing")
474
475 def c_all(self):
476 """
477 Resume every hart.
478
479 This function works fine when using multiple gdb sessions, but the
480 caller must be careful when using it nonetheless. gdb's behavior is to
481 not set breakpoints until just before the hart is resumed, and then
482 clears them as soon as the hart halts. That means that you can't set
483 one software breakpoint, and expect multiple harts to hit it. It's
484 possible that the first hart completes set/run/halt/clear before the
485 second hart even gets to resume, so it will never hit the breakpoint.
486 """
487 with PrivateState(self):
488 for child in self.children:
489 child.sendline("c")
490 child.expect("Continuing")
491
492 # Now wait for them all to halt
493 for child in self.children:
494 child.expect(r"\(gdb\)")
495
496 def interrupt(self):
497 self.active_child.send("\003")
498 self.active_child.expect(r"\(gdb\)", timeout=6000)
499 return self.active_child.before.strip()
500
501 def x(self, address, size='w'):
502 output = self.command("x/%s %s" % (size, address))
503 value = int(output.split(':')[1].strip(), 0)
504 return value
505
506 def p_raw(self, obj):
507 output = self.command("p %s" % obj)
508 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
509 if m:
510 raise CannotAccess(int(m.group(1), 0))
511 return output.split('=')[-1].strip()
512
513 def parse_string(self, text):
514 text = text.strip()
515 if text.startswith("{") and text.endswith("}"):
516 inner = text[1:-1]
517 return [self.parse_string(t) for t in inner.split(", ")]
518 elif text.startswith('"') and text.endswith('"'):
519 return text[1:-1]
520 else:
521 return int(text, 0)
522
523 def p(self, obj, fmt="/x"):
524 output = self.command("p%s %s" % (fmt, obj))
525 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
526 if m:
527 raise CannotAccess(int(m.group(1), 0))
528 m = re.search(r"Could not fetch register \"(\w+)\"; (.*)$", output)
529 if m:
530 raise CouldNotFetch(m.group(1), m.group(2))
531 rhs = output.split('=')[-1]
532 return self.parse_string(rhs)
533
534 def p_string(self, obj):
535 output = self.command("p %s" % obj)
536 value = shlex.split(output.split('=')[-1].strip())[1]
537 return value
538
539 def stepi(self):
540 output = self.command("stepi", timeout=60)
541 return output
542
543 def load(self):
544 output = self.command("load", timeout=6000)
545 assert "failed" not in output
546 assert "Transfer rate" in output
547
548 def b(self, location):
549 output = self.command("b %s" % location)
550 assert "not defined" not in output
551 assert "Breakpoint" in output
552 return output
553
554 def hbreak(self, location):
555 output = self.command("hbreak %s" % location)
556 assert "not defined" not in output
557 assert "Hardware assisted breakpoint" in output
558 return output
559
560 def threads(self):
561 output = self.command("info threads")
562 threads = []
563 for line in output.splitlines():
564 m = re.match(
565 r"[\s\*]*(\d+)\s*"
566 r"(Remote target|Thread (\d+)\s*\(Name: ([^\)]+))"
567 r"\s*(.*)", line)
568 if m:
569 threads.append(Thread(*m.groups()))
570 assert threads
571 #>>>if not threads:
572 #>>> threads.append(Thread('1', '1', 'Default', '???'))
573 return threads
574
575 def thread(self, thread):
576 return self.command("thread %s" % thread.id)
577
578 def where(self):
579 return self.command("where 1")
580
581 class PrivateState(object):
582 def __init__(self, gdb):
583 self.gdb = gdb
584
585 def __enter__(self):
586 self.gdb.push_state()
587
588 def __exit__(self, _type, _value, _traceback):
589 self.gdb.pop_state()
590
591 def run_all_tests(module, target, parsed):
592 if not os.path.exists(parsed.logs):
593 os.makedirs(parsed.logs)
594
595 overall_start = time.time()
596
597 global gdb_cmd # pylint: disable=global-statement
598 gdb_cmd = parsed.gdb
599
600 todo = []
601 examine_added = False
602 for hart in target.harts:
603 if parsed.misaval:
604 hart.misa = int(parsed.misaval, 16)
605 print "Using $misa from command line: 0x%x" % hart.misa
606 elif hart.misa:
607 print "Using $misa from hart definition: 0x%x" % hart.misa
608 elif not examine_added:
609 todo.append(("ExamineTarget", ExamineTarget, None))
610 examine_added = True
611
612 for name in dir(module):
613 definition = getattr(module, name)
614 if isinstance(definition, type) and hasattr(definition, 'test') and \
615 (not parsed.test or any(test in name for test in parsed.test)):
616 todo.append((name, definition, None))
617
618 results, count = run_tests(parsed, target, todo)
619
620 header("ran %d tests in %.0fs" % (count, time.time() - overall_start),
621 dash=':')
622
623 return print_results(results)
624
625 good_results = set(('pass', 'not_applicable'))
626 def run_tests(parsed, target, todo):
627 results = {}
628 count = 0
629
630 for name, definition, hart in todo:
631 log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %
632 (time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))
633 log_fd = open(log_name, 'w')
634 print "[%s] Starting > %s" % (name, log_name)
635 instance = definition(target, hart)
636 sys.stdout.flush()
637 log_fd.write("Test: %s\n" % name)
638 log_fd.write("Target: %s\n" % type(target).__name__)
639 start = time.time()
640 global real_stdout # pylint: disable=global-statement
641 real_stdout = sys.stdout
642 sys.stdout = log_fd
643 try:
644 result = instance.run()
645 log_fd.write("Result: %s\n" % result)
646 finally:
647 sys.stdout = real_stdout
648 log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
649 log_fd.flush()
650 print "[%s] %s in %.2fs" % (name, result, time.time() - start)
651 if result not in good_results and parsed.print_failures:
652 sys.stdout.write(open(log_name).read())
653 sys.stdout.flush()
654 results.setdefault(result, []).append((name, log_name))
655 count += 1
656 if result not in good_results and parsed.fail_fast:
657 break
658
659 return results, count
660
661 def print_results(results):
662 result = 0
663 for key, value in results.iteritems():
664 print "%d tests returned %s" % (len(value), key)
665 if key not in good_results:
666 result = 1
667 for name, log_name in value:
668 print " %s > %s" % (name, log_name)
669
670 return result
671
672 def add_test_run_options(parser):
673 parser.add_argument("--logs", default="logs",
674 help="Store logs in the specified directory.")
675 parser.add_argument("--fail-fast", "-f", action="store_true",
676 help="Exit as soon as any test fails.")
677 parser.add_argument("--print-failures", action="store_true",
678 help="When a test fails, print the log file to stdout.")
679 parser.add_argument("--print-log-names", "--pln", action="store_true",
680 help="Print names of temporary log files as soon as they are "
681 "created.")
682 parser.add_argument("test", nargs='*',
683 help="Run only tests that are named here.")
684 parser.add_argument("--gdb",
685 help="The command to use to start gdb.")
686 parser.add_argument("--misaval",
687 help="Don't run ExamineTarget, just assume the misa value which is "
688 "specified.")
689
690 def header(title, dash='-', length=78):
691 if title:
692 dashes = dash * (length - 4 - len(title))
693 before = dashes[:len(dashes)/2]
694 after = dashes[len(dashes)/2:]
695 print "%s[ %s ]%s" % (before, title, after)
696 else:
697 print dash * length
698
699 def print_log(path):
700 header(path)
701 for l in open(path, "r"):
702 sys.stdout.write(l)
703 print
704
705 class BaseTest(object):
706 compiled = {}
707
708 def __init__(self, target, hart=None):
709 self.target = target
710 if hart:
711 self.hart = hart
712 else:
713 self.hart = random.choice(target.harts)
714 self.hart = target.harts[-1] #<<<
715 self.server = None
716 self.target_process = None
717 self.binary = None
718 self.start = 0
719 self.logs = []
720
721 def early_applicable(self):
722 """Return a false value if the test has determined it cannot run
723 without ever needing to talk to the target or server."""
724 # pylint: disable=no-self-use
725 return True
726
727 def setup(self):
728 pass
729
730 def compile(self):
731 compile_args = getattr(self, 'compile_args', None)
732 if compile_args:
733 if compile_args not in BaseTest.compiled:
734 BaseTest.compiled[compile_args] = \
735 self.target.compile(self.hart, *compile_args)
736 self.binary = BaseTest.compiled.get(compile_args)
737
738 def classSetup(self):
739 self.compile()
740 self.target_process = self.target.create()
741 if self.target_process:
742 self.logs.append(self.target_process.logname)
743 try:
744 self.server = self.target.server()
745 self.logs.append(self.server.logname)
746 except Exception:
747 for log in self.logs:
748 print_log(log)
749 raise
750
751 def classTeardown(self):
752 del self.server
753 del self.target_process
754
755 def postMortem(self):
756 pass
757
758 def run(self):
759 """
760 If compile_args is set, compile a program and set self.binary.
761
762 Call setup().
763
764 Then call test() and return the result, displaying relevant information
765 if an exception is raised.
766 """
767
768 sys.stdout.flush()
769
770 if not self.early_applicable():
771 return "not_applicable"
772
773 self.start = time.time()
774
775 try:
776 self.classSetup()
777 self.setup()
778 result = self.test() # pylint: disable=no-member
779 except TestNotApplicable:
780 result = "not_applicable"
781 except Exception as e: # pylint: disable=broad-except
782 if isinstance(e, TestFailed):
783 result = "fail"
784 else:
785 result = "exception"
786 if isinstance(e, TestFailed):
787 header("Message")
788 print e.message
789 header("Traceback")
790 traceback.print_exc(file=sys.stdout)
791 try:
792 self.postMortem()
793 except Exception as e: # pylint: disable=broad-except
794 header("postMortem Exception")
795 print e
796 traceback.print_exc(file=sys.stdout)
797 return result
798
799 finally:
800 for log in self.logs:
801 print_log(log)
802 header("End of logs")
803 self.classTeardown()
804
805 if not result:
806 result = 'pass'
807 return result
808
809 gdb_cmd = None
810 class GdbTest(BaseTest):
811 def __init__(self, target, hart=None):
812 BaseTest.__init__(self, target, hart=hart)
813 self.gdb = None
814
815 def classSetup(self):
816 BaseTest.classSetup(self)
817
818 if gdb_cmd:
819 self.gdb = Gdb(self.server.gdb_ports, gdb_cmd,
820 timeout=self.target.timeout_sec, binary=self.binary)
821 else:
822 self.gdb = Gdb(self.server.gdb_ports,
823 timeout=self.target.timeout_sec, binary=self.binary)
824
825 self.logs += self.gdb.lognames()
826 self.gdb.connect()
827
828 self.gdb.global_command("set remotetimeout %d" %
829 self.target.timeout_sec)
830
831 for cmd in self.target.gdb_setup:
832 self.gdb.command(cmd)
833
834 self.gdb.select_hart(self.hart)
835
836 # FIXME: OpenOCD doesn't handle PRIV now
837 #self.gdb.p("$priv=3")
838
839 def postMortem(self):
840 if not self.gdb:
841 return
842 self.gdb.interrupt()
843 self.gdb.command("disassemble")
844 self.gdb.command("info registers all", timeout=10)
845
846 def classTeardown(self):
847 del self.gdb
848 BaseTest.classTeardown(self)
849
850 class GdbSingleHartTest(GdbTest):
851 def classSetup(self):
852 GdbTest.classSetup(self)
853
854 for hart in self.target.harts:
855 # Park all harts that we're not using in a safe place.
856 if hart != self.hart:
857 self.gdb.select_hart(hart)
858 self.gdb.p("$pc=loop_forever")
859 self.gdb.select_hart(self.hart)
860
861 class ExamineTarget(GdbTest):
862 def test(self):
863 for hart in self.target.harts:
864 self.gdb.select_hart(hart)
865
866 hart.misa = self.gdb.p("$misa")
867
868 txt = "RV"
869 misa_xlen = 0
870 if ((hart.misa & 0xFFFFFFFF) >> 30) == 1:
871 misa_xlen = 32
872 elif ((hart.misa & 0xFFFFFFFFFFFFFFFF) >> 62) == 2:
873 misa_xlen = 64
874 elif ((hart.misa & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) >> 126) == 3:
875 misa_xlen = 128
876 else:
877 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
878 self.hart.misa)
879
880 if misa_xlen != hart.xlen:
881 raise TestFailed("MISA reported XLEN of %d but we were "\
882 "expecting XLEN of %d\n" % (misa_xlen, hart.xlen))
883
884 txt += ("%d" % misa_xlen)
885
886 for i in range(26):
887 if hart.misa & (1<<i):
888 txt += chr(i + ord('A'))
889 print txt,
890
891 class TestFailed(Exception):
892 def __init__(self, message):
893 Exception.__init__(self)
894 self.message = message
895
896 class TestNotApplicable(Exception):
897 def __init__(self, message):
898 Exception.__init__(self)
899 self.message = message
900
901 def assertEqual(a, b):
902 if a != b:
903 raise TestFailed("%r != %r" % (a, b))
904
905 def assertNotEqual(a, b):
906 if a == b:
907 raise TestFailed("%r == %r" % (a, b))
908
909 def assertIn(a, b):
910 if a not in b:
911 raise TestFailed("%r not in %r" % (a, b))
912
913 def assertNotIn(a, b):
914 if a in b:
915 raise TestFailed("%r in %r" % (a, b))
916
917 def assertGreater(a, b):
918 if not a > b:
919 raise TestFailed("%r not greater than %r" % (a, b))
920
921 def assertLess(a, b):
922 if not a < b:
923 raise TestFailed("%r not less than %r" % (a, b))
924
925 def assertTrue(a):
926 if not a:
927 raise TestFailed("%r is not True" % a)
928
929 def assertRegexpMatches(text, regexp):
930 if not re.search(regexp, text):
931 raise TestFailed("can't find %r in %r" % (regexp, text))