1d46b6c4a59cd93279595bba63a8115b780007d9
[riscv-tests.git] / debug / testlib.py
1 import collections
2 import os
3 import os.path
4 import random
5 import re
6 import shlex
7 import subprocess
8 import sys
9 import tempfile
10 import time
11 import traceback
12 import pipes
13
14 import pexpect
15
16 print_log_names = False
17 real_stdout = sys.stdout
18
19 # Note that gdb comes with its own testsuite. I was unable to figure out how to
20 # run that testsuite against the spike simulator.
21
22 def find_file(path):
23 for directory in (os.getcwd(), os.path.dirname(__file__)):
24 fullpath = os.path.join(directory, path)
25 relpath = os.path.relpath(fullpath)
26 if len(relpath) >= len(fullpath):
27 relpath = fullpath
28 if os.path.exists(relpath):
29 return relpath
30 return None
31
32 def compile(args, xlen=32): # pylint: disable=redefined-builtin
33 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
34 cmd = [cc, "-g"]
35 if xlen == 32:
36 cmd.append("-march=rv32imac")
37 cmd.append("-mabi=ilp32")
38 else:
39 cmd.append("-march=rv64imac")
40 cmd.append("-mabi=lp64")
41 for arg in args:
42 found = find_file(arg)
43 if found:
44 cmd.append(found)
45 else:
46 cmd.append(arg)
47 header("Compile")
48 print "+", " ".join(cmd)
49 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
50 stderr=subprocess.PIPE)
51 stdout, stderr = process.communicate()
52 if process.returncode:
53 print stdout,
54 print stderr,
55 header("")
56 raise Exception("Compile failed!")
57
58 class Spike(object):
59 def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True,
60 isa=None, progbufsize=None):
61 """Launch spike. Return tuple of its process and the port it's running
62 on."""
63 self.process = None
64 self.isa = isa
65 self.progbufsize = progbufsize
66
67 if target.harts:
68 harts = target.harts
69 else:
70 harts = [target]
71
72 cmd = self.command(target, harts, halted, timeout, with_jtag_gdb)
73 self.infinite_loop = target.compile(harts[0],
74 "programs/checksum.c", "programs/tiny-malloc.c",
75 "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
76 cmd.append(self.infinite_loop)
77 self.logfile = tempfile.NamedTemporaryFile(prefix="spike-",
78 suffix=".log")
79 self.logname = self.logfile.name
80 if print_log_names:
81 real_stdout.write("Temporary spike log: %s\n" % self.logname)
82 self.logfile.write("+ %s\n" % " ".join(cmd))
83 self.logfile.flush()
84 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
85 stdout=self.logfile, stderr=self.logfile)
86
87 if with_jtag_gdb:
88 self.port = None
89 for _ in range(30):
90 m = re.search(r"Listening for remote bitbang connection on "
91 r"port (\d+).", open(self.logname).read())
92 if m:
93 self.port = int(m.group(1))
94 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
95 break
96 time.sleep(0.11)
97 if not self.port:
98 print_log(self.logname)
99 raise Exception("Didn't get spike message about bitbang "
100 "connection")
101
102 def command(self, target, harts, halted, timeout, with_jtag_gdb):
103 # pylint: disable=no-self-use
104 if target.sim_cmd:
105 cmd = shlex.split(target.sim_cmd)
106 else:
107 spike = os.path.expandvars("$RISCV/bin/spike")
108 cmd = [spike]
109
110 cmd += ["-p%d" % len(harts)]
111
112 assert len(set(t.xlen for t in harts)) == 1, \
113 "All spike harts must have the same XLEN"
114
115 if self.isa:
116 isa = self.isa
117 else:
118 isa = "RV%dG" % harts[0].xlen
119
120 cmd += ["--isa", isa]
121 cmd += ["--debug-auth"]
122
123 if not self.progbufsize is None:
124 cmd += ["--progsize", str(self.progbufsize)]
125 cmd += ["--debug-sba", "32"]
126
127 assert len(set(t.ram for t in harts)) == 1, \
128 "All spike harts must have the same RAM layout"
129 assert len(set(t.ram_size for t in harts)) == 1, \
130 "All spike harts must have the same RAM layout"
131 cmd += ["-m0x%x:0x%x" % (harts[0].ram, harts[0].ram_size)]
132
133 if timeout:
134 cmd = ["timeout", str(timeout)] + cmd
135
136 if halted:
137 cmd.append('-H')
138 if with_jtag_gdb:
139 cmd += ['--rbb-port', '0']
140 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
141
142 return cmd
143
144 def __del__(self):
145 if self.process:
146 try:
147 self.process.kill()
148 self.process.wait()
149 except OSError:
150 pass
151
152 def wait(self, *args, **kwargs):
153 return self.process.wait(*args, **kwargs)
154
155 class VcsSim(object):
156 logfile = tempfile.NamedTemporaryFile(prefix='simv', suffix='.log')
157 logname = logfile.name
158
159 def __init__(self, sim_cmd=None, debug=False, timeout=300):
160 if sim_cmd:
161 cmd = shlex.split(sim_cmd)
162 else:
163 cmd = ["simv"]
164 cmd += ["+jtag_vpi_enable"]
165 if debug:
166 cmd[0] = cmd[0] + "-debug"
167 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
168
169 logfile = open(self.logname, "w")
170 if print_log_names:
171 real_stdout.write("Temporary VCS log: %s\n" % self.logname)
172 logfile.write("+ %s\n" % " ".join(cmd))
173 logfile.flush()
174
175 listenfile = open(self.logname, "r")
176 listenfile.seek(0, 2)
177 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
178 stdout=logfile, stderr=logfile)
179 done = False
180 start = time.time()
181 while not done:
182 # Fail if VCS exits early
183 exit_code = self.process.poll()
184 if exit_code is not None:
185 raise RuntimeError('VCS simulator exited early with status %d'
186 % exit_code)
187
188 line = listenfile.readline()
189 if not line:
190 time.sleep(1)
191 match = re.match(r"^Listening on port (\d+)$", line)
192 if match:
193 done = True
194 self.port = int(match.group(1))
195 os.environ['JTAG_VPI_PORT'] = str(self.port)
196
197 if (time.time() - start) > timeout:
198 raise Exception("Timed out waiting for VCS to listen for JTAG "
199 "vpi")
200
201 def __del__(self):
202 try:
203 self.process.kill()
204 self.process.wait()
205 except OSError:
206 pass
207
208 class Openocd(object):
209 logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')
210 logname = logfile.name
211
212 def __init__(self, server_cmd=None, config=None, debug=False, timeout=60):
213 self.timeout = timeout
214
215 if server_cmd:
216 cmd = shlex.split(server_cmd)
217 else:
218 openocd = os.path.expandvars("$RISCV/bin/openocd")
219 cmd = [openocd]
220 if debug:
221 cmd.append("-d")
222
223 # This command needs to come before any config scripts on the command
224 # line, since they are executed in order.
225 cmd += [
226 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
227 "--command",
228 "gdb_port 0",
229 # Disable tcl and telnet servers, since they are unused and because
230 # the port numbers will conflict if multiple OpenOCD processes are
231 # running on the same server.
232 "--command",
233 "tcl_port disabled",
234 "--command",
235 "telnet_port disabled",
236 ]
237
238 if config:
239 f = find_file(config)
240 if f is None:
241 print "Unable to read file " + config
242 exit(1)
243
244 cmd += ["-f", f]
245 if debug:
246 cmd.append("-d")
247
248 logfile = open(Openocd.logname, "w")
249 if print_log_names:
250 real_stdout.write("Temporary OpenOCD log: %s\n" % Openocd.logname)
251 env_entries = ("REMOTE_BITBANG_HOST", "REMOTE_BITBANG_PORT")
252 env_entries = [key for key in env_entries if key in os.environ]
253 logfile.write("+ %s%s\n" % (
254 "".join("%s=%s " % (key, os.environ[key]) for key in env_entries),
255 " ".join(map(pipes.quote, cmd))))
256 logfile.flush()
257
258 self.gdb_ports = []
259 self.process = self.start(cmd, logfile)
260
261 def start(self, cmd, logfile):
262 process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
263 stdout=logfile, stderr=logfile)
264
265 try:
266 # Wait for OpenOCD to have made it through riscv_examine(). When
267 # using OpenOCD to communicate with a simulator this may take a
268 # long time, and gdb will time out when trying to connect if we
269 # attempt too early.
270 start = time.time()
271 messaged = False
272 fd = open(Openocd.logname, "r")
273 while True:
274 line = fd.readline()
275 if not line:
276 if not process.poll() is None:
277 raise Exception("OpenOCD exited early.")
278 time.sleep(0.1)
279 continue
280
281 m = re.search(r"Listening on port (\d+) for gdb connections",
282 line)
283 if m:
284 self.gdb_ports.append(int(m.group(1)))
285
286 if "telnet server disabled" in line:
287 return process
288
289 if not messaged and time.time() - start > 1:
290 messaged = True
291 print "Waiting for OpenOCD to start..."
292 if (time.time() - start) > self.timeout:
293 raise Exception("Timed out waiting for OpenOCD to "
294 "listen for gdb")
295
296 except Exception:
297 print_log(Openocd.logname)
298 raise
299
300 def __del__(self):
301 try:
302 self.process.terminate()
303 start = time.time()
304 while time.time() < start + 10000:
305 if self.process.poll():
306 break
307 else:
308 self.process.kill()
309 self.process.wait()
310 except (OSError, AttributeError):
311 pass
312
313 class OpenocdCli(object):
314 def __init__(self, port=4444):
315 self.child = pexpect.spawn(
316 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
317 self.child.expect("> ")
318
319 def command(self, cmd):
320 self.child.sendline(cmd)
321 self.child.expect(cmd)
322 self.child.expect("\n")
323 self.child.expect("> ")
324 return self.child.before.strip("\t\r\n \0")
325
326 def reg(self, reg=''):
327 output = self.command("reg %s" % reg)
328 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
329 values = {r: int(v, 0) for r, v in matches}
330 if reg:
331 return values[reg]
332 return values
333
334 def load_image(self, image):
335 output = self.command("load_image %s" % image)
336 if 'invalid ELF file, only 32bits files are supported' in output:
337 raise TestNotApplicable(output)
338
339 class CannotAccess(Exception):
340 def __init__(self, address):
341 Exception.__init__(self)
342 self.address = address
343
344 class CouldNotFetch(Exception):
345 def __init__(self, regname, explanation):
346 Exception.__init__(self)
347 self.regname = regname
348 self.explanation = explanation
349
350 Thread = collections.namedtuple('Thread', ('id', 'description', 'target_id',
351 'name', 'frame'))
352
353 class Gdb(object):
354 """A single gdb class which can interact with one or more gdb instances."""
355
356 # pylint: disable=too-many-public-methods
357 # pylint: disable=too-many-instance-attributes
358
359 def __init__(self, ports,
360 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb"),
361 timeout=60, binary=None):
362 assert ports
363
364 self.ports = ports
365 self.cmd = cmd
366 self.timeout = timeout
367 self.binary = binary
368
369 self.stack = []
370 self.harts = {}
371
372 self.logfiles = []
373 self.children = []
374 for port in ports:
375 logfile = tempfile.NamedTemporaryFile(prefix="gdb@%d-" % port,
376 suffix=".log")
377 self.logfiles.append(logfile)
378 if print_log_names:
379 real_stdout.write("Temporary gdb log: %s\n" % logfile.name)
380 child = pexpect.spawn(cmd)
381 child.logfile = logfile
382 child.logfile.write("+ %s\n" % cmd)
383 self.children.append(child)
384 self.active_child = self.children[0]
385
386 def connect(self):
387 for port, child in zip(self.ports, self.children):
388 self.select_child(child)
389 self.wait()
390 self.command("set confirm off")
391 self.command("set width 0")
392 self.command("set height 0")
393 # Force consistency.
394 self.command("set print entry-values no")
395 self.command("set remotetimeout %d" % self.timeout)
396 self.command("target extended-remote localhost:%d" % port)
397 if self.binary:
398 self.command("file %s" % self.binary)
399 threads = self.threads()
400 for t in threads:
401 hartid = None
402 if t.name:
403 m = re.search(r"Hart (\d+)", t.name)
404 if m:
405 hartid = int(m.group(1))
406 if hartid is None:
407 if self.harts:
408 hartid = max(self.harts) + 1
409 else:
410 hartid = 0
411 # solo: True iff this is the only thread on this child
412 self.harts[hartid] = {'child': child,
413 'thread': t,
414 'solo': len(threads) == 1}
415
416 def __del__(self):
417 for child in self.children:
418 del child
419
420 def one_hart_per_gdb(self):
421 return all(h['solo'] for h in self.harts.itervalues())
422
423 def lognames(self):
424 return [logfile.name for logfile in self.logfiles]
425
426 def select_child(self, child):
427 self.active_child = child
428
429 def select_hart(self, hart):
430 h = self.harts[hart.id]
431 self.select_child(h['child'])
432 if not h['solo']:
433 output = self.command("thread %s" % h['thread'].id, ops=5)
434 assert "Unknown" not in output
435
436 def push_state(self):
437 self.stack.append({
438 'active_child': self.active_child
439 })
440
441 def pop_state(self):
442 state = self.stack.pop()
443 self.active_child = state['active_child']
444
445 def wait(self):
446 """Wait for prompt."""
447 self.active_child.expect(r"\(gdb\)")
448
449 def command(self, command, ops=1):
450 """ops is the estimated number of operations gdb will have to perform
451 to perform this command. It is used to compute a timeout based on
452 self.timeout."""
453 timeout = ops * self.timeout
454 self.active_child.sendline(command)
455 self.active_child.expect("\n", timeout=timeout)
456 self.active_child.expect(r"\(gdb\)", timeout=timeout)
457 return self.active_child.before.strip()
458
459 def global_command(self, command):
460 """Execute this command on every gdb that we control."""
461 with PrivateState(self):
462 for child in self.children:
463 self.select_child(child)
464 self.command(command)
465
466 def c(self, wait=True, async=False):
467 """
468 Dumb c command.
469 In RTOS mode, gdb will resume all harts.
470 In multi-gdb mode, this command will just go to the current gdb, so
471 will only resume one hart.
472 """
473 if async:
474 async = "&"
475 else:
476 async = ""
477 ops = 10
478 if wait:
479 output = self.command("c%s" % async, ops=ops)
480 assert "Continuing" in output
481 return output
482 else:
483 self.active_child.sendline("c%s" % async)
484 self.active_child.expect("Continuing", timeout=ops * self.timeout)
485
486 def c_all(self, wait=True):
487 """
488 Resume every hart.
489
490 This function works fine when using multiple gdb sessions, but the
491 caller must be careful when using it nonetheless. gdb's behavior is to
492 not set breakpoints until just before the hart is resumed, and then
493 clears them as soon as the hart halts. That means that you can't set
494 one software breakpoint, and expect multiple harts to hit it. It's
495 possible that the first hart completes set/run/halt/clear before the
496 second hart even gets to resume, so it will never hit the breakpoint.
497 """
498 with PrivateState(self):
499 for child in self.children:
500 child.sendline("c")
501 child.expect("Continuing")
502
503 if wait:
504 for child in self.children:
505 child.expect(r"\(gdb\)")
506
507 def interrupt(self):
508 self.active_child.send("\003")
509 self.active_child.expect(r"\(gdb\)", timeout=6000)
510 return self.active_child.before.strip()
511
512 def interrupt_all(self):
513 for child in self.children:
514 self.select_child(child)
515 self.interrupt()
516
517 def x(self, address, size='w'):
518 output = self.command("x/%s %s" % (size, address))
519 value = int(output.split(':')[1].strip(), 0)
520 return value
521
522 def p_raw(self, obj):
523 output = self.command("p %s" % obj)
524 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
525 if m:
526 raise CannotAccess(int(m.group(1), 0))
527 return output.split('=')[-1].strip()
528
529 def parse_string(self, text):
530 text = text.strip()
531 if text.startswith("{") and text.endswith("}"):
532 inner = text[1:-1]
533 return [self.parse_string(t) for t in inner.split(", ")]
534 elif text.startswith('"') and text.endswith('"'):
535 return text[1:-1]
536 else:
537 return int(text, 0)
538
539 def p(self, obj, fmt="/x"):
540 output = self.command("p%s %s" % (fmt, obj))
541 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
542 if m:
543 raise CannotAccess(int(m.group(1), 0))
544 m = re.search(r"Could not fetch register \"(\w+)\"; (.*)$", output)
545 if m:
546 raise CouldNotFetch(m.group(1), m.group(2))
547 rhs = output.split('=')[-1]
548 return self.parse_string(rhs)
549
550 def p_string(self, obj):
551 output = self.command("p %s" % obj)
552 value = shlex.split(output.split('=')[-1].strip())[1]
553 return value
554
555 def stepi(self):
556 output = self.command("stepi", ops=10)
557 return output
558
559 def load(self):
560 output = self.command("load", ops=1000)
561 assert "failed" not in output
562 assert "Transfer rate" in output
563 output = self.command("compare-sections", ops=1000)
564 assert "MIS" not in output
565
566 def b(self, location):
567 output = self.command("b %s" % location, ops=5)
568 assert "not defined" not in output
569 assert "Breakpoint" in output
570 return output
571
572 def hbreak(self, location):
573 output = self.command("hbreak %s" % location, ops=5)
574 assert "not defined" not in output
575 assert "Hardware assisted breakpoint" in output
576 return output
577
578 def threads(self):
579 output = self.command("info threads", ops=100)
580 threads = []
581 for line in output.splitlines():
582 m = re.match(
583 r"[\s\*]*(\d+)\s*"
584 r"(Remote target|Thread (\d+)\s*\(Name: ([^\)]+))"
585 r"\s*(.*)", line)
586 if m:
587 threads.append(Thread(*m.groups()))
588 assert threads
589 #>>>if not threads:
590 #>>> threads.append(Thread('1', '1', 'Default', '???'))
591 return threads
592
593 def thread(self, thread):
594 return self.command("thread %s" % thread.id)
595
596 def where(self):
597 return self.command("where 1")
598
599 class PrivateState(object):
600 def __init__(self, gdb):
601 self.gdb = gdb
602
603 def __enter__(self):
604 self.gdb.push_state()
605
606 def __exit__(self, _type, _value, _traceback):
607 self.gdb.pop_state()
608
609 def run_all_tests(module, target, parsed):
610 try:
611 os.makedirs(parsed.logs)
612 except OSError:
613 # There's a race where multiple instances of the test program might
614 # decide to create the logs directory at the same time.
615 pass
616
617 overall_start = time.time()
618
619 global gdb_cmd # pylint: disable=global-statement
620 gdb_cmd = parsed.gdb
621
622 todo = []
623 examine_added = False
624 for hart in target.harts:
625 if parsed.misaval:
626 hart.misa = int(parsed.misaval, 16)
627 print "Using $misa from command line: 0x%x" % hart.misa
628 elif hart.misa:
629 print "Using $misa from hart definition: 0x%x" % hart.misa
630 elif not examine_added:
631 todo.append(("ExamineTarget", ExamineTarget, None))
632 examine_added = True
633
634 for name in dir(module):
635 definition = getattr(module, name)
636 if isinstance(definition, type) and hasattr(definition, 'test') and \
637 (not parsed.test or any(test in name for test in parsed.test)):
638 todo.append((name, definition, None))
639
640 results, count = run_tests(parsed, target, todo)
641
642 header("ran %d tests in %.0fs" % (count, time.time() - overall_start),
643 dash=':')
644
645 return print_results(results)
646
647 good_results = set(('pass', 'not_applicable'))
648 def run_tests(parsed, target, todo):
649 results = {}
650 count = 0
651
652 for name, definition, hart in todo:
653 log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %
654 (time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))
655 log_fd = open(log_name, 'w')
656 print "[%s] Starting > %s" % (name, log_name)
657 instance = definition(target, hart)
658 sys.stdout.flush()
659 log_fd.write("Test: %s\n" % name)
660 log_fd.write("Target: %s\n" % type(target).__name__)
661 start = time.time()
662 global real_stdout # pylint: disable=global-statement
663 real_stdout = sys.stdout
664 sys.stdout = log_fd
665 try:
666 result = instance.run()
667 log_fd.write("Result: %s\n" % result)
668 log_fd.write("Logfile: %s\n" % log_name)
669 log_fd.write("Reproduce: %s %s %s\n" % (sys.argv[0], parsed.target,
670 name))
671 finally:
672 sys.stdout = real_stdout
673 log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
674 log_fd.flush()
675 print "[%s] %s in %.2fs" % (name, result, time.time() - start)
676 if result not in good_results and parsed.print_failures:
677 sys.stdout.write(open(log_name).read())
678 sys.stdout.flush()
679 results.setdefault(result, []).append((name, log_name))
680 count += 1
681 if result not in good_results and parsed.fail_fast:
682 break
683
684 return results, count
685
686 def print_results(results):
687 result = 0
688 for key, value in results.iteritems():
689 print "%d tests returned %s" % (len(value), key)
690 if key not in good_results:
691 result = 1
692 for name, log_name in value:
693 print " %s > %s" % (name, log_name)
694
695 return result
696
697 def add_test_run_options(parser):
698 parser.add_argument("--logs", default="logs",
699 help="Store logs in the specified directory.")
700 parser.add_argument("--fail-fast", "-f", action="store_true",
701 help="Exit as soon as any test fails.")
702 parser.add_argument("--print-failures", action="store_true",
703 help="When a test fails, print the log file to stdout.")
704 parser.add_argument("--print-log-names", "--pln", action="store_true",
705 help="Print names of temporary log files as soon as they are "
706 "created.")
707 parser.add_argument("test", nargs='*',
708 help="Run only tests that are named here.")
709 parser.add_argument("--gdb",
710 help="The command to use to start gdb.")
711 parser.add_argument("--misaval",
712 help="Don't run ExamineTarget, just assume the misa value which is "
713 "specified.")
714
715 def header(title, dash='-', length=78):
716 if title:
717 dashes = dash * (length - 4 - len(title))
718 before = dashes[:len(dashes)/2]
719 after = dashes[len(dashes)/2:]
720 print "%s[ %s ]%s" % (before, title, after)
721 else:
722 print dash * length
723
724 def print_log_handle(name, handle):
725 header(name)
726 for l in handle:
727 sys.stdout.write(l)
728 print
729
730 def print_log(path):
731 print_log_handle(path, open(path, "r"))
732
733 class BaseTest(object):
734 compiled = {}
735
736 def __init__(self, target, hart=None):
737 self.target = target
738 if hart:
739 self.hart = hart
740 else:
741 self.hart = random.choice(target.harts)
742 self.hart = target.harts[-1] #<<<
743 self.server = None
744 self.target_process = None
745 self.binary = None
746 self.start = 0
747 self.logs = []
748
749 def early_applicable(self):
750 """Return a false value if the test has determined it cannot run
751 without ever needing to talk to the target or server."""
752 # pylint: disable=no-self-use
753 return True
754
755 def setup(self):
756 pass
757
758 def compile(self):
759 compile_args = getattr(self, 'compile_args', None)
760 if compile_args:
761 if compile_args not in BaseTest.compiled:
762 BaseTest.compiled[compile_args] = \
763 self.target.compile(self.hart, *compile_args)
764 self.binary = BaseTest.compiled.get(compile_args)
765
766 def classSetup(self):
767 self.compile()
768 self.target_process = self.target.create()
769 if self.target_process:
770 self.logs.append(self.target_process.logname)
771 try:
772 self.server = self.target.server()
773 self.logs.append(self.server.logname)
774 except Exception:
775 for log in self.logs:
776 print_log(log)
777 raise
778
779 def classTeardown(self):
780 del self.server
781 del self.target_process
782
783 def postMortem(self):
784 pass
785
786 def run(self):
787 """
788 If compile_args is set, compile a program and set self.binary.
789
790 Call setup().
791
792 Then call test() and return the result, displaying relevant information
793 if an exception is raised.
794 """
795
796 sys.stdout.flush()
797
798 if not self.early_applicable():
799 return "not_applicable"
800
801 self.start = time.time()
802
803 try:
804 self.classSetup()
805 self.setup()
806 result = self.test() # pylint: disable=no-member
807 except TestNotApplicable:
808 result = "not_applicable"
809 except Exception as e: # pylint: disable=broad-except
810 if isinstance(e, TestFailed):
811 result = "fail"
812 else:
813 result = "exception"
814 if isinstance(e, TestFailed):
815 header("Message")
816 print e.message
817 header("Traceback")
818 traceback.print_exc(file=sys.stdout)
819 try:
820 self.postMortem()
821 except Exception as e: # pylint: disable=broad-except
822 header("postMortem Exception")
823 print e
824 traceback.print_exc(file=sys.stdout)
825 return result
826
827 finally:
828 # Get handles to logs before the files are deleted.
829 logs = []
830 for log in self.logs:
831 logs.append((log, open(log, "r")))
832
833 self.classTeardown()
834 for name, handle in logs:
835 print_log_handle(name, handle)
836 header("End of logs")
837
838 if not result:
839 result = 'pass'
840 return result
841
842 gdb_cmd = None
843 class GdbTest(BaseTest):
844 def __init__(self, target, hart=None):
845 BaseTest.__init__(self, target, hart=hart)
846 self.gdb = None
847
848 def classSetup(self):
849 BaseTest.classSetup(self)
850
851 if gdb_cmd:
852 self.gdb = Gdb(self.server.gdb_ports, gdb_cmd,
853 timeout=self.target.timeout_sec, binary=self.binary)
854 else:
855 self.gdb = Gdb(self.server.gdb_ports,
856 timeout=self.target.timeout_sec, binary=self.binary)
857
858 self.logs += self.gdb.lognames()
859 self.gdb.connect()
860
861 self.gdb.global_command("set remotetimeout %d" %
862 self.target.timeout_sec)
863
864 for cmd in self.target.gdb_setup:
865 self.gdb.command(cmd)
866
867 self.gdb.select_hart(self.hart)
868
869 # FIXME: OpenOCD doesn't handle PRIV now
870 #self.gdb.p("$priv=3")
871
872 def postMortem(self):
873 if not self.gdb:
874 return
875 self.gdb.interrupt()
876 self.gdb.command("disassemble", ops=20)
877 self.gdb.command("info registers all", ops=100)
878 self.gdb.command("flush regs")
879 self.gdb.command("info threads", ops=100)
880
881 def classTeardown(self):
882 del self.gdb
883 BaseTest.classTeardown(self)
884
885 def parkOtherHarts(self):
886 """Park harts besides the currently selected one in loop_forever()."""
887 for hart in self.target.harts:
888 # Park all harts that we're not using in a safe place.
889 if hart != self.hart:
890 self.gdb.select_hart(hart)
891 self.gdb.p("$pc=loop_forever")
892
893 self.gdb.select_hart(self.hart)
894
895 class GdbSingleHartTest(GdbTest):
896 def classSetup(self):
897 GdbTest.classSetup(self)
898 self.parkOtherHarts()
899
900 class ExamineTarget(GdbTest):
901 def test(self):
902 for hart in self.target.harts:
903 self.gdb.select_hart(hart)
904
905 hart.misa = self.gdb.p("$misa")
906
907 txt = "RV"
908 misa_xlen = 0
909 if ((hart.misa & 0xFFFFFFFF) >> 30) == 1:
910 misa_xlen = 32
911 elif ((hart.misa & 0xFFFFFFFFFFFFFFFF) >> 62) == 2:
912 misa_xlen = 64
913 elif ((hart.misa & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) >> 126) == 3:
914 misa_xlen = 128
915 else:
916 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
917 self.hart.misa)
918
919 if misa_xlen != hart.xlen:
920 raise TestFailed("MISA reported XLEN of %d but we were "\
921 "expecting XLEN of %d\n" % (misa_xlen, hart.xlen))
922
923 txt += ("%d" % misa_xlen)
924
925 for i in range(26):
926 if hart.misa & (1<<i):
927 txt += chr(i + ord('A'))
928 print txt,
929
930 class TestFailed(Exception):
931 def __init__(self, message):
932 Exception.__init__(self)
933 self.message = message
934
935 class TestNotApplicable(Exception):
936 def __init__(self, message):
937 Exception.__init__(self)
938 self.message = message
939
940 def assertEqual(a, b):
941 if a != b:
942 raise TestFailed("%r != %r" % (a, b))
943
944 def assertNotEqual(a, b):
945 if a == b:
946 raise TestFailed("%r == %r" % (a, b))
947
948 def assertIn(a, b):
949 if a not in b:
950 raise TestFailed("%r not in %r" % (a, b))
951
952 def assertNotIn(a, b):
953 if a in b:
954 raise TestFailed("%r in %r" % (a, b))
955
956 def assertGreater(a, b):
957 if not a > b:
958 raise TestFailed("%r not greater than %r" % (a, b))
959
960 def assertLess(a, b):
961 if not a < b:
962 raise TestFailed("%r not less than %r" % (a, b))
963
964 def assertTrue(a):
965 if not a:
966 raise TestFailed("%r is not True" % a)
967
968 def assertRegexpMatches(text, regexp):
969 if not re.search(regexp, text):
970 raise TestFailed("can't find %r in %r" % (regexp, text))