bump env
[riscv-tests.git] / debug / testlib.py
1 import collections
2 import os
3 import os.path
4 import random
5 import re
6 import shlex
7 import subprocess
8 import sys
9 import tempfile
10 import time
11 import traceback
12 import pipes
13
14 import pexpect
15
16 print_log_names = False
17 real_stdout = sys.stdout
18
19 # Note that gdb comes with its own testsuite. I was unable to figure out how to
20 # run that testsuite against the spike simulator.
21
22 def find_file(path):
23 for directory in (os.getcwd(), os.path.dirname(__file__)):
24 fullpath = os.path.join(directory, path)
25 relpath = os.path.relpath(fullpath)
26 if len(relpath) >= len(fullpath):
27 relpath = fullpath
28 if os.path.exists(relpath):
29 return relpath
30 return None
31
32 def compile(args, xlen=32): # pylint: disable=redefined-builtin
33 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
34 cmd = [cc, "-g"]
35 if xlen == 32:
36 cmd.append("-march=rv32imac")
37 cmd.append("-mabi=ilp32")
38 else:
39 cmd.append("-march=rv64imac")
40 cmd.append("-mabi=lp64")
41 for arg in args:
42 found = find_file(arg)
43 if found:
44 cmd.append(found)
45 else:
46 cmd.append(arg)
47 header("Compile")
48 print "+", " ".join(cmd)
49 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
50 stderr=subprocess.PIPE)
51 stdout, stderr = process.communicate()
52 if process.returncode:
53 print stdout,
54 print stderr,
55 header("")
56 raise Exception("Compile failed!")
57
58 class Spike(object):
59 def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True,
60 isa=None, progbufsize=None):
61 """Launch spike. Return tuple of its process and the port it's running
62 on."""
63 self.process = None
64 self.isa = isa
65 self.progbufsize = progbufsize
66
67 if target.harts:
68 harts = target.harts
69 else:
70 harts = [target]
71
72 cmd = self.command(target, harts, halted, timeout, with_jtag_gdb)
73 self.infinite_loop = target.compile(harts[0],
74 "programs/checksum.c", "programs/tiny-malloc.c",
75 "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
76 cmd.append(self.infinite_loop)
77 self.logfile = tempfile.NamedTemporaryFile(prefix="spike-",
78 suffix=".log")
79 self.logname = self.logfile.name
80 if print_log_names:
81 real_stdout.write("Temporary spike log: %s\n" % self.logname)
82 self.logfile.write("+ %s\n" % " ".join(cmd))
83 self.logfile.flush()
84 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
85 stdout=self.logfile, stderr=self.logfile)
86
87 if with_jtag_gdb:
88 self.port = None
89 for _ in range(30):
90 m = re.search(r"Listening for remote bitbang connection on "
91 r"port (\d+).", open(self.logname).read())
92 if m:
93 self.port = int(m.group(1))
94 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
95 break
96 time.sleep(0.11)
97 if not self.port:
98 print_log(self.logname)
99 raise Exception("Didn't get spike message about bitbang "
100 "connection")
101
102 def command(self, target, harts, halted, timeout, with_jtag_gdb):
103 # pylint: disable=no-self-use
104 if target.sim_cmd:
105 cmd = shlex.split(target.sim_cmd)
106 else:
107 spike = os.path.expandvars("$RISCV/bin/spike")
108 cmd = [spike]
109
110 cmd += ["-p%d" % len(harts)]
111
112 assert len(set(t.xlen for t in harts)) == 1, \
113 "All spike harts must have the same XLEN"
114
115 if self.isa:
116 isa = self.isa
117 else:
118 isa = "RV%dG" % harts[0].xlen
119
120 cmd += ["--isa", isa]
121 cmd += ["--debug-auth"]
122
123 if not self.progbufsize is None:
124 cmd += ["--progsize", str(self.progbufsize)]
125 cmd += ["--debug-sba", "32"]
126
127 assert len(set(t.ram for t in harts)) == 1, \
128 "All spike harts must have the same RAM layout"
129 assert len(set(t.ram_size for t in harts)) == 1, \
130 "All spike harts must have the same RAM layout"
131 cmd += ["-m0x%x:0x%x" % (harts[0].ram, harts[0].ram_size)]
132
133 if timeout:
134 cmd = ["timeout", str(timeout)] + cmd
135
136 if halted:
137 cmd.append('-H')
138 if with_jtag_gdb:
139 cmd += ['--rbb-port', '0']
140 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
141
142 return cmd
143
144 def __del__(self):
145 if self.process:
146 try:
147 self.process.kill()
148 self.process.wait()
149 except OSError:
150 pass
151
152 def wait(self, *args, **kwargs):
153 return self.process.wait(*args, **kwargs)
154
155 class VcsSim(object):
156 logfile = tempfile.NamedTemporaryFile(prefix='simv', suffix='.log')
157 logname = logfile.name
158
159 def __init__(self, sim_cmd=None, debug=False, timeout=300):
160 if sim_cmd:
161 cmd = shlex.split(sim_cmd)
162 else:
163 cmd = ["simv"]
164 cmd += ["+jtag_vpi_enable"]
165 if debug:
166 cmd[0] = cmd[0] + "-debug"
167 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
168
169 logfile = open(self.logname, "w")
170 if print_log_names:
171 real_stdout.write("Temporary VCS log: %s\n" % self.logname)
172 logfile.write("+ %s\n" % " ".join(cmd))
173 logfile.flush()
174
175 listenfile = open(self.logname, "r")
176 listenfile.seek(0, 2)
177 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
178 stdout=logfile, stderr=logfile)
179 done = False
180 start = time.time()
181 while not done:
182 # Fail if VCS exits early
183 exit_code = self.process.poll()
184 if exit_code is not None:
185 raise RuntimeError('VCS simulator exited early with status %d'
186 % exit_code)
187
188 line = listenfile.readline()
189 if not line:
190 time.sleep(1)
191 match = re.match(r"^Listening on port (\d+)$", line)
192 if match:
193 done = True
194 self.port = int(match.group(1))
195 os.environ['JTAG_VPI_PORT'] = str(self.port)
196
197 if (time.time() - start) > timeout:
198 raise Exception("Timed out waiting for VCS to listen for JTAG "
199 "vpi")
200
201 def __del__(self):
202 try:
203 self.process.kill()
204 self.process.wait()
205 except OSError:
206 pass
207
208 class Openocd(object):
209 logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')
210 logname = logfile.name
211
212 def __init__(self, server_cmd=None, config=None, debug=False, timeout=60):
213 self.timeout = timeout
214
215 if server_cmd:
216 cmd = shlex.split(server_cmd)
217 else:
218 openocd = os.path.expandvars("$RISCV/bin/openocd")
219 cmd = [openocd]
220 if debug:
221 cmd.append("-d")
222
223 # This command needs to come before any config scripts on the command
224 # line, since they are executed in order.
225 cmd += [
226 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
227 "--command",
228 "gdb_port 0",
229 # Disable tcl and telnet servers, since they are unused and because
230 # the port numbers will conflict if multiple OpenOCD processes are
231 # running on the same server.
232 "--command",
233 "tcl_port disabled",
234 "--command",
235 "telnet_port disabled",
236 ]
237
238 if config:
239 f = find_file(config)
240 if f is None:
241 print "Unable to read file " + config
242 exit(1)
243
244 cmd += ["-f", f]
245 if debug:
246 cmd.append("-d")
247
248 logfile = open(Openocd.logname, "w")
249 if print_log_names:
250 real_stdout.write("Temporary OpenOCD log: %s\n" % Openocd.logname)
251 env_entries = ("REMOTE_BITBANG_HOST", "REMOTE_BITBANG_PORT")
252 env_entries = [key for key in env_entries if key in os.environ]
253 logfile.write("+ %s%s\n" % (
254 "".join("%s=%s " % (key, os.environ[key]) for key in env_entries),
255 " ".join(map(pipes.quote, cmd))))
256 logfile.flush()
257
258 self.gdb_ports = []
259 self.process = self.start(cmd, logfile)
260
261 def start(self, cmd, logfile):
262 process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
263 stdout=logfile, stderr=logfile)
264
265 try:
266 # Wait for OpenOCD to have made it through riscv_examine(). When
267 # using OpenOCD to communicate with a simulator this may take a
268 # long time, and gdb will time out when trying to connect if we
269 # attempt too early.
270 start = time.time()
271 messaged = False
272 fd = open(Openocd.logname, "r")
273 while True:
274 line = fd.readline()
275 if not line:
276 if not process.poll() is None:
277 raise Exception("OpenOCD exited early.")
278 time.sleep(0.1)
279 continue
280
281 m = re.search(r"Listening on port (\d+) for gdb connections",
282 line)
283 if m:
284 self.gdb_ports.append(int(m.group(1)))
285
286 if "telnet server disabled" in line:
287 return process
288
289 if not messaged and time.time() - start > 1:
290 messaged = True
291 print "Waiting for OpenOCD to start..."
292 if (time.time() - start) > self.timeout:
293 raise Exception("Timed out waiting for OpenOCD to "
294 "listen for gdb")
295
296 except Exception:
297 print_log(Openocd.logname)
298 raise
299
300 def __del__(self):
301 try:
302 self.process.terminate()
303 start = time.time()
304 while time.time() < start + 10000:
305 if self.process.poll():
306 break
307 else:
308 self.process.kill()
309 self.process.wait()
310 except (OSError, AttributeError):
311 pass
312
313 class OpenocdCli(object):
314 def __init__(self, port=4444):
315 self.child = pexpect.spawn(
316 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
317 self.child.expect("> ")
318
319 def command(self, cmd):
320 self.child.sendline(cmd)
321 self.child.expect(cmd)
322 self.child.expect("\n")
323 self.child.expect("> ")
324 return self.child.before.strip("\t\r\n \0")
325
326 def reg(self, reg=''):
327 output = self.command("reg %s" % reg)
328 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
329 values = {r: int(v, 0) for r, v in matches}
330 if reg:
331 return values[reg]
332 return values
333
334 def load_image(self, image):
335 output = self.command("load_image %s" % image)
336 if 'invalid ELF file, only 32bits files are supported' in output:
337 raise TestNotApplicable(output)
338
339 class CannotAccess(Exception):
340 def __init__(self, address):
341 Exception.__init__(self)
342 self.address = address
343
344 class CouldNotFetch(Exception):
345 def __init__(self, regname, explanation):
346 Exception.__init__(self)
347 self.regname = regname
348 self.explanation = explanation
349
350 Thread = collections.namedtuple('Thread', ('id', 'description', 'target_id',
351 'name', 'frame'))
352
353 class Gdb(object):
354 """A single gdb class which can interact with one or more gdb instances."""
355
356 # pylint: disable=too-many-public-methods
357 # pylint: disable=too-many-instance-attributes
358
359 def __init__(self, ports,
360 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb"),
361 timeout=60, binary=None):
362 assert ports
363
364 self.ports = ports
365 self.cmd = cmd
366 self.timeout = timeout
367 self.binary = binary
368
369 self.stack = []
370 self.harts = {}
371
372 self.logfiles = []
373 self.children = []
374 for port in ports:
375 logfile = tempfile.NamedTemporaryFile(prefix="gdb@%d-" % port,
376 suffix=".log")
377 self.logfiles.append(logfile)
378 if print_log_names:
379 real_stdout.write("Temporary gdb log: %s\n" % logfile.name)
380 child = pexpect.spawn(cmd)
381 child.logfile = logfile
382 child.logfile.write("+ %s\n" % cmd)
383 self.children.append(child)
384 self.active_child = self.children[0]
385
386 def connect(self):
387 for port, child in zip(self.ports, self.children):
388 self.select_child(child)
389 self.wait()
390 self.command("set confirm off")
391 self.command("set width 0")
392 self.command("set height 0")
393 # Force consistency.
394 self.command("set print entry-values no")
395 self.command("set remotetimeout %d" % self.timeout)
396 self.command("target extended-remote localhost:%d" % port)
397 if self.binary:
398 self.command("file %s" % self.binary)
399 threads = self.threads()
400 for t in threads:
401 hartid = None
402 if t.name:
403 m = re.search(r"Hart (\d+)", t.name)
404 if m:
405 hartid = int(m.group(1))
406 if hartid is None:
407 if self.harts:
408 hartid = max(self.harts) + 1
409 else:
410 hartid = 0
411 # solo: True iff this is the only thread on this child
412 self.harts[hartid] = {'child': child,
413 'thread': t,
414 'solo': len(threads) == 1}
415
416 def __del__(self):
417 for child in self.children:
418 del child
419
420 def one_hart_per_gdb(self):
421 return all(h['solo'] for h in self.harts.itervalues())
422
423 def lognames(self):
424 return [logfile.name for logfile in self.logfiles]
425
426 def select_child(self, child):
427 self.active_child = child
428
429 def select_hart(self, hart):
430 h = self.harts[hart.id]
431 self.select_child(h['child'])
432 if not h['solo']:
433 output = self.command("thread %s" % h['thread'].id, ops=5)
434 assert "Unknown" not in output
435
436 def push_state(self):
437 self.stack.append({
438 'active_child': self.active_child
439 })
440
441 def pop_state(self):
442 state = self.stack.pop()
443 self.active_child = state['active_child']
444
445 def wait(self):
446 """Wait for prompt."""
447 self.active_child.expect(r"\(gdb\)")
448
449 def command(self, command, ops=1):
450 """ops is the estimated number of operations gdb will have to perform
451 to perform this command. It is used to compute a timeout based on
452 self.timeout."""
453 timeout = ops * self.timeout
454 self.active_child.sendline(command)
455 self.active_child.expect("\n", timeout=timeout)
456 self.active_child.expect(r"\(gdb\)", timeout=timeout)
457 return self.active_child.before.strip()
458
459 def global_command(self, command):
460 """Execute this command on every gdb that we control."""
461 with PrivateState(self):
462 for child in self.children:
463 self.select_child(child)
464 self.command(command)
465
466 def c(self, wait=True, async=False):
467 """
468 Dumb c command.
469 In RTOS mode, gdb will resume all harts.
470 In multi-gdb mode, this command will just go to the current gdb, so
471 will only resume one hart.
472 """
473 if async:
474 async = "&"
475 else:
476 async = ""
477 ops = 10
478 if wait:
479 output = self.command("c%s" % async, ops=ops)
480 assert "Continuing" in output
481 return output
482 else:
483 self.active_child.sendline("c%s" % async)
484 self.active_child.expect("Continuing", timeout=ops * self.timeout)
485
486 def c_all(self, wait=True):
487 """
488 Resume every hart.
489
490 This function works fine when using multiple gdb sessions, but the
491 caller must be careful when using it nonetheless. gdb's behavior is to
492 not set breakpoints until just before the hart is resumed, and then
493 clears them as soon as the hart halts. That means that you can't set
494 one software breakpoint, and expect multiple harts to hit it. It's
495 possible that the first hart completes set/run/halt/clear before the
496 second hart even gets to resume, so it will never hit the breakpoint.
497 """
498 with PrivateState(self):
499 for child in self.children:
500 child.sendline("c")
501 child.expect("Continuing")
502
503 if wait:
504 for child in self.children:
505 child.expect(r"\(gdb\)")
506
507 def interrupt(self):
508 self.active_child.send("\003")
509 self.active_child.expect(r"\(gdb\)", timeout=6000)
510 return self.active_child.before.strip()
511
512 def interrupt_all(self):
513 for child in self.children:
514 self.select_child(child)
515 self.interrupt()
516
517 def x(self, address, size='w'):
518 output = self.command("x/%s %s" % (size, address))
519 value = int(output.split(':')[1].strip(), 0)
520 return value
521
522 def p_raw(self, obj):
523 output = self.command("p %s" % obj)
524 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
525 if m:
526 raise CannotAccess(int(m.group(1), 0))
527 return output.split('=')[-1].strip()
528
529 def parse_string(self, text):
530 text = text.strip()
531 if text.startswith("{") and text.endswith("}"):
532 inner = text[1:-1]
533 return [self.parse_string(t) for t in inner.split(", ")]
534 elif text.startswith('"') and text.endswith('"'):
535 return text[1:-1]
536 else:
537 return int(text, 0)
538
539 def p(self, obj, fmt="/x"):
540 output = self.command("p%s %s" % (fmt, obj))
541 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
542 if m:
543 raise CannotAccess(int(m.group(1), 0))
544 m = re.search(r"Could not fetch register \"(\w+)\"; (.*)$", output)
545 if m:
546 raise CouldNotFetch(m.group(1), m.group(2))
547 rhs = output.split('=')[-1]
548 return self.parse_string(rhs)
549
550 def p_string(self, obj):
551 output = self.command("p %s" % obj)
552 value = shlex.split(output.split('=')[-1].strip())[1]
553 return value
554
555 def info_registers(self, group):
556 output = self.command("info registers %s" % group)
557 result = {}
558 for line in output.splitlines():
559 parts = line.split()
560 name = parts[0]
561 if "Could not fetch" in line:
562 result[name] = " ".join(parts[1:])
563 else:
564 result[name] = int(parts[1], 0)
565 return result
566
567 def stepi(self):
568 output = self.command("stepi", ops=10)
569 return output
570
571 def load(self):
572 output = self.command("load", ops=1000)
573 assert "failed" not in output
574 assert "Transfer rate" in output
575 output = self.command("compare-sections", ops=1000)
576 assert "MIS" not in output
577
578 def b(self, location):
579 output = self.command("b %s" % location, ops=5)
580 assert "not defined" not in output
581 assert "Breakpoint" in output
582 return output
583
584 def hbreak(self, location):
585 output = self.command("hbreak %s" % location, ops=5)
586 assert "not defined" not in output
587 assert "Hardware assisted breakpoint" in output
588 return output
589
590 def threads(self):
591 output = self.command("info threads", ops=100)
592 threads = []
593 for line in output.splitlines():
594 m = re.match(
595 r"[\s\*]*(\d+)\s*"
596 r"(Remote target|Thread (\d+)\s*\(Name: ([^\)]+))"
597 r"\s*(.*)", line)
598 if m:
599 threads.append(Thread(*m.groups()))
600 assert threads
601 #>>>if not threads:
602 #>>> threads.append(Thread('1', '1', 'Default', '???'))
603 return threads
604
605 def thread(self, thread):
606 return self.command("thread %s" % thread.id)
607
608 def where(self):
609 return self.command("where 1")
610
611 class PrivateState(object):
612 def __init__(self, gdb):
613 self.gdb = gdb
614
615 def __enter__(self):
616 self.gdb.push_state()
617
618 def __exit__(self, _type, _value, _traceback):
619 self.gdb.pop_state()
620
621 def run_all_tests(module, target, parsed):
622 try:
623 os.makedirs(parsed.logs)
624 except OSError:
625 # There's a race where multiple instances of the test program might
626 # decide to create the logs directory at the same time.
627 pass
628
629 overall_start = time.time()
630
631 global gdb_cmd # pylint: disable=global-statement
632 gdb_cmd = parsed.gdb
633
634 todo = []
635 examine_added = False
636 for hart in target.harts:
637 if parsed.misaval:
638 hart.misa = int(parsed.misaval, 16)
639 print "Using $misa from command line: 0x%x" % hart.misa
640 elif hart.misa:
641 print "Using $misa from hart definition: 0x%x" % hart.misa
642 elif not examine_added:
643 todo.append(("ExamineTarget", ExamineTarget, None))
644 examine_added = True
645
646 for name in dir(module):
647 definition = getattr(module, name)
648 if isinstance(definition, type) and hasattr(definition, 'test') and \
649 (not parsed.test or any(test in name for test in parsed.test)):
650 todo.append((name, definition, None))
651
652 results, count = run_tests(parsed, target, todo)
653
654 header("ran %d tests in %.0fs" % (count, time.time() - overall_start),
655 dash=':')
656
657 return print_results(results)
658
659 good_results = set(('pass', 'not_applicable'))
660 def run_tests(parsed, target, todo):
661 results = {}
662 count = 0
663
664 for name, definition, hart in todo:
665 log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %
666 (time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))
667 log_fd = open(log_name, 'w')
668 print "[%s] Starting > %s" % (name, log_name)
669 instance = definition(target, hart)
670 sys.stdout.flush()
671 log_fd.write("Test: %s\n" % name)
672 log_fd.write("Target: %s\n" % type(target).__name__)
673 start = time.time()
674 global real_stdout # pylint: disable=global-statement
675 real_stdout = sys.stdout
676 sys.stdout = log_fd
677 try:
678 result = instance.run()
679 log_fd.write("Result: %s\n" % result)
680 log_fd.write("Logfile: %s\n" % log_name)
681 log_fd.write("Reproduce: %s %s %s\n" % (sys.argv[0], parsed.target,
682 name))
683 finally:
684 sys.stdout = real_stdout
685 log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
686 log_fd.flush()
687 print "[%s] %s in %.2fs" % (name, result, time.time() - start)
688 if result not in good_results and parsed.print_failures:
689 sys.stdout.write(open(log_name).read())
690 sys.stdout.flush()
691 results.setdefault(result, []).append((name, log_name))
692 count += 1
693 if result not in good_results and parsed.fail_fast:
694 break
695
696 return results, count
697
698 def print_results(results):
699 result = 0
700 for key, value in results.iteritems():
701 print "%d tests returned %s" % (len(value), key)
702 if key not in good_results:
703 result = 1
704 for name, log_name in value:
705 print " %s > %s" % (name, log_name)
706
707 return result
708
709 def add_test_run_options(parser):
710 parser.add_argument("--logs", default="logs",
711 help="Store logs in the specified directory.")
712 parser.add_argument("--fail-fast", "-f", action="store_true",
713 help="Exit as soon as any test fails.")
714 parser.add_argument("--print-failures", action="store_true",
715 help="When a test fails, print the log file to stdout.")
716 parser.add_argument("--print-log-names", "--pln", action="store_true",
717 help="Print names of temporary log files as soon as they are "
718 "created.")
719 parser.add_argument("test", nargs='*',
720 help="Run only tests that are named here.")
721 parser.add_argument("--gdb",
722 help="The command to use to start gdb.")
723 parser.add_argument("--misaval",
724 help="Don't run ExamineTarget, just assume the misa value which is "
725 "specified.")
726
727 def header(title, dash='-', length=78):
728 if title:
729 dashes = dash * (length - 4 - len(title))
730 before = dashes[:len(dashes)/2]
731 after = dashes[len(dashes)/2:]
732 print "%s[ %s ]%s" % (before, title, after)
733 else:
734 print dash * length
735
736 def print_log_handle(name, handle):
737 header(name)
738 for l in handle:
739 sys.stdout.write(l)
740 print
741
742 def print_log(path):
743 print_log_handle(path, open(path, "r"))
744
745 class BaseTest(object):
746 compiled = {}
747
748 def __init__(self, target, hart=None):
749 self.target = target
750 if hart:
751 self.hart = hart
752 else:
753 self.hart = random.choice(target.harts)
754 self.hart = target.harts[-1] #<<<
755 self.server = None
756 self.target_process = None
757 self.binary = None
758 self.start = 0
759 self.logs = []
760
761 def early_applicable(self):
762 """Return a false value if the test has determined it cannot run
763 without ever needing to talk to the target or server."""
764 # pylint: disable=no-self-use
765 return True
766
767 def setup(self):
768 pass
769
770 def compile(self):
771 compile_args = getattr(self, 'compile_args', None)
772 if compile_args:
773 if compile_args not in BaseTest.compiled:
774 BaseTest.compiled[compile_args] = \
775 self.target.compile(self.hart, *compile_args)
776 self.binary = BaseTest.compiled.get(compile_args)
777
778 def classSetup(self):
779 self.compile()
780 self.target_process = self.target.create()
781 if self.target_process:
782 self.logs.append(self.target_process.logname)
783 try:
784 self.server = self.target.server()
785 self.logs.append(self.server.logname)
786 except Exception:
787 for log in self.logs:
788 print_log(log)
789 raise
790
791 def classTeardown(self):
792 del self.server
793 del self.target_process
794
795 def postMortem(self):
796 pass
797
798 def run(self):
799 """
800 If compile_args is set, compile a program and set self.binary.
801
802 Call setup().
803
804 Then call test() and return the result, displaying relevant information
805 if an exception is raised.
806 """
807
808 sys.stdout.flush()
809
810 if not self.early_applicable():
811 return "not_applicable"
812
813 self.start = time.time()
814
815 try:
816 self.classSetup()
817 self.setup()
818 result = self.test() # pylint: disable=no-member
819 except TestNotApplicable:
820 result = "not_applicable"
821 except Exception as e: # pylint: disable=broad-except
822 if isinstance(e, TestFailed):
823 result = "fail"
824 else:
825 result = "exception"
826 if isinstance(e, TestFailed):
827 header("Message")
828 print e.message
829 header("Traceback")
830 traceback.print_exc(file=sys.stdout)
831 try:
832 self.postMortem()
833 except Exception as e: # pylint: disable=broad-except
834 header("postMortem Exception")
835 print e
836 traceback.print_exc(file=sys.stdout)
837 return result
838
839 finally:
840 # Get handles to logs before the files are deleted.
841 logs = []
842 for log in self.logs:
843 logs.append((log, open(log, "r")))
844
845 self.classTeardown()
846 for name, handle in logs:
847 print_log_handle(name, handle)
848 header("End of logs")
849
850 if not result:
851 result = 'pass'
852 return result
853
854 gdb_cmd = None
855 class GdbTest(BaseTest):
856 def __init__(self, target, hart=None):
857 BaseTest.__init__(self, target, hart=hart)
858 self.gdb = None
859
860 def classSetup(self):
861 BaseTest.classSetup(self)
862
863 if gdb_cmd:
864 self.gdb = Gdb(self.server.gdb_ports, gdb_cmd,
865 timeout=self.target.timeout_sec, binary=self.binary)
866 else:
867 self.gdb = Gdb(self.server.gdb_ports,
868 timeout=self.target.timeout_sec, binary=self.binary)
869
870 self.logs += self.gdb.lognames()
871 self.gdb.connect()
872
873 self.gdb.global_command("set remotetimeout %d" %
874 self.target.timeout_sec)
875
876 for cmd in self.target.gdb_setup:
877 self.gdb.command(cmd)
878
879 self.gdb.select_hart(self.hart)
880
881 # FIXME: OpenOCD doesn't handle PRIV now
882 #self.gdb.p("$priv=3")
883
884 def postMortem(self):
885 if not self.gdb:
886 return
887 self.gdb.interrupt()
888 self.gdb.command("disassemble", ops=20)
889 self.gdb.command("info registers all", ops=100)
890 self.gdb.command("flush regs")
891 self.gdb.command("info threads", ops=100)
892
893 def classTeardown(self):
894 del self.gdb
895 BaseTest.classTeardown(self)
896
897 def parkOtherHarts(self):
898 """Park harts besides the currently selected one in loop_forever()."""
899 for hart in self.target.harts:
900 # Park all harts that we're not using in a safe place.
901 if hart != self.hart:
902 self.gdb.select_hart(hart)
903 self.gdb.p("$pc=loop_forever")
904
905 self.gdb.select_hart(self.hart)
906
907 class GdbSingleHartTest(GdbTest):
908 def classSetup(self):
909 GdbTest.classSetup(self)
910 self.parkOtherHarts()
911
912 class ExamineTarget(GdbTest):
913 def test(self):
914 for hart in self.target.harts:
915 self.gdb.select_hart(hart)
916
917 hart.misa = self.gdb.p("$misa")
918
919 txt = "RV"
920 misa_xlen = 0
921 if ((hart.misa & 0xFFFFFFFF) >> 30) == 1:
922 misa_xlen = 32
923 elif ((hart.misa & 0xFFFFFFFFFFFFFFFF) >> 62) == 2:
924 misa_xlen = 64
925 elif ((hart.misa & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) >> 126) == 3:
926 misa_xlen = 128
927 else:
928 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
929 self.hart.misa)
930
931 if misa_xlen != hart.xlen:
932 raise TestFailed("MISA reported XLEN of %d but we were "\
933 "expecting XLEN of %d\n" % (misa_xlen, hart.xlen))
934
935 txt += ("%d" % misa_xlen)
936
937 for i in range(26):
938 if hart.misa & (1<<i):
939 txt += chr(i + ord('A'))
940 print txt,
941
942 class TestFailed(Exception):
943 def __init__(self, message):
944 Exception.__init__(self)
945 self.message = message
946
947 class TestNotApplicable(Exception):
948 def __init__(self, message):
949 Exception.__init__(self)
950 self.message = message
951
952 def assertEqual(a, b):
953 if a != b:
954 raise TestFailed("%r != %r" % (a, b))
955
956 def assertNotEqual(a, b):
957 if a == b:
958 raise TestFailed("%r == %r" % (a, b))
959
960 def assertIn(a, b):
961 if a not in b:
962 raise TestFailed("%r not in %r" % (a, b))
963
964 def assertNotIn(a, b):
965 if a in b:
966 raise TestFailed("%r in %r" % (a, b))
967
968 def assertGreater(a, b):
969 if not a > b:
970 raise TestFailed("%r not greater than %r" % (a, b))
971
972 def assertLess(a, b):
973 if not a < b:
974 raise TestFailed("%r not less than %r" % (a, b))
975
976 def assertTrue(a):
977 if not a:
978 raise TestFailed("%r is not True" % a)
979
980 def assertRegexpMatches(text, regexp):
981 if not re.search(regexp, text):
982 raise TestFailed("can't find %r in %r" % (regexp, text))