Add test for multicore failure
[riscv-tests.git] / debug / testlib.py
1 import collections
2 import os
3 import os.path
4 import random
5 import re
6 import shlex
7 import subprocess
8 import sys
9 import tempfile
10 import time
11 import traceback
12 import pipes
13
14 import pexpect
15
16 print_log_names = False
17 real_stdout = sys.stdout
18
19 # Note that gdb comes with its own testsuite. I was unable to figure out how to
20 # run that testsuite against the spike simulator.
21
22 def find_file(path):
23 for directory in (os.getcwd(), os.path.dirname(__file__)):
24 fullpath = os.path.join(directory, path)
25 relpath = os.path.relpath(fullpath)
26 if len(relpath) >= len(fullpath):
27 relpath = fullpath
28 if os.path.exists(relpath):
29 return relpath
30 return None
31
32 def compile(args, xlen=32): # pylint: disable=redefined-builtin
33 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
34 cmd = [cc, "-g"]
35 if xlen == 32:
36 cmd.append("-march=rv32imac")
37 cmd.append("-mabi=ilp32")
38 else:
39 cmd.append("-march=rv64imac")
40 cmd.append("-mabi=lp64")
41 for arg in args:
42 found = find_file(arg)
43 if found:
44 cmd.append(found)
45 else:
46 cmd.append(arg)
47 header("Compile")
48 print "+", " ".join(cmd)
49 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
50 stderr=subprocess.PIPE)
51 stdout, stderr = process.communicate()
52 if process.returncode:
53 print stdout,
54 print stderr,
55 header("")
56 raise Exception("Compile failed!")
57
58 class Spike(object):
59 def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True,
60 isa=None):
61 """Launch spike. Return tuple of its process and the port it's running
62 on."""
63 self.process = None
64 self.isa = isa
65
66 if target.harts:
67 harts = target.harts
68 else:
69 harts = [target]
70
71 cmd = self.command(target, harts, halted, timeout, with_jtag_gdb)
72 self.infinite_loop = target.compile(harts[0],
73 "programs/checksum.c", "programs/tiny-malloc.c",
74 "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
75 cmd.append(self.infinite_loop)
76 self.logfile = tempfile.NamedTemporaryFile(prefix="spike-",
77 suffix=".log")
78 self.logname = self.logfile.name
79 if print_log_names:
80 real_stdout.write("Temporary spike log: %s\n" % self.logname)
81 self.logfile.write("+ %s\n" % " ".join(cmd))
82 self.logfile.flush()
83 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
84 stdout=self.logfile, stderr=self.logfile)
85
86 if with_jtag_gdb:
87 self.port = None
88 for _ in range(30):
89 m = re.search(r"Listening for remote bitbang connection on "
90 r"port (\d+).", open(self.logname).read())
91 if m:
92 self.port = int(m.group(1))
93 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
94 break
95 time.sleep(0.11)
96 if not self.port:
97 print_log(self.logname)
98 raise Exception("Didn't get spike message about bitbang "
99 "connection")
100
101 def command(self, target, harts, halted, timeout, with_jtag_gdb):
102 # pylint: disable=no-self-use
103 if target.sim_cmd:
104 cmd = shlex.split(target.sim_cmd)
105 else:
106 spike = os.path.expandvars("$RISCV/bin/spike")
107 cmd = [spike]
108
109 cmd += ["-p%d" % len(harts)]
110
111 assert len(set(t.xlen for t in harts)) == 1, \
112 "All spike harts must have the same XLEN"
113
114 if self.isa:
115 isa = self.isa
116 else:
117 isa = "RV%dG" % harts[0].xlen
118
119 cmd += ["--isa", isa]
120
121 assert len(set(t.ram for t in harts)) == 1, \
122 "All spike harts must have the same RAM layout"
123 assert len(set(t.ram_size for t in harts)) == 1, \
124 "All spike harts must have the same RAM layout"
125 cmd += ["-m0x%x:0x%x" % (harts[0].ram, harts[0].ram_size)]
126
127 if timeout:
128 cmd = ["timeout", str(timeout)] + cmd
129
130 if halted:
131 cmd.append('-H')
132 if with_jtag_gdb:
133 cmd += ['--rbb-port', '0']
134 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
135
136 return cmd
137
138 def __del__(self):
139 if self.process:
140 try:
141 self.process.kill()
142 self.process.wait()
143 except OSError:
144 pass
145
146 def wait(self, *args, **kwargs):
147 return self.process.wait(*args, **kwargs)
148
149 class VcsSim(object):
150 logfile = tempfile.NamedTemporaryFile(prefix='simv', suffix='.log')
151 logname = logfile.name
152
153 def __init__(self, sim_cmd=None, debug=False, timeout=300):
154 if sim_cmd:
155 cmd = shlex.split(sim_cmd)
156 else:
157 cmd = ["simv"]
158 cmd += ["+jtag_vpi_enable"]
159 if debug:
160 cmd[0] = cmd[0] + "-debug"
161 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
162
163 logfile = open(self.logname, "w")
164 if print_log_names:
165 real_stdout.write("Temporary VCS log: %s\n" % self.logname)
166 logfile.write("+ %s\n" % " ".join(cmd))
167 logfile.flush()
168
169 listenfile = open(self.logname, "r")
170 listenfile.seek(0, 2)
171 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
172 stdout=logfile, stderr=logfile)
173 done = False
174 start = time.time()
175 while not done:
176 # Fail if VCS exits early
177 exit_code = self.process.poll()
178 if exit_code is not None:
179 raise RuntimeError('VCS simulator exited early with status %d'
180 % exit_code)
181
182 line = listenfile.readline()
183 if not line:
184 time.sleep(1)
185 match = re.match(r"^Listening on port (\d+)$", line)
186 if match:
187 done = True
188 self.port = int(match.group(1))
189 os.environ['JTAG_VPI_PORT'] = str(self.port)
190
191 if (time.time() - start) > timeout:
192 raise Exception("Timed out waiting for VCS to listen for JTAG "
193 "vpi")
194
195 def __del__(self):
196 try:
197 self.process.kill()
198 self.process.wait()
199 except OSError:
200 pass
201
202 class Openocd(object):
203 logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')
204 logname = logfile.name
205
206 def __init__(self, server_cmd=None, config=None, debug=False, timeout=60):
207 self.timeout = timeout
208
209 if server_cmd:
210 cmd = shlex.split(server_cmd)
211 else:
212 openocd = os.path.expandvars("$RISCV/bin/openocd")
213 cmd = [openocd]
214 if debug:
215 cmd.append("-d")
216
217 # This command needs to come before any config scripts on the command
218 # line, since they are executed in order.
219 cmd += [
220 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
221 "--command",
222 "gdb_port 0",
223 # Disable tcl and telnet servers, since they are unused and because
224 # the port numbers will conflict if multiple OpenOCD processes are
225 # running on the same server.
226 "--command",
227 "tcl_port disabled",
228 "--command",
229 "telnet_port disabled",
230 ]
231
232 if config:
233 f = find_file(config)
234 if f is None:
235 print "Unable to read file " + config
236 exit(1)
237
238 cmd += ["-f", f]
239 if debug:
240 cmd.append("-d")
241
242 logfile = open(Openocd.logname, "w")
243 if print_log_names:
244 real_stdout.write("Temporary OpenOCD log: %s\n" % Openocd.logname)
245 env_entries = ("REMOTE_BITBANG_HOST", "REMOTE_BITBANG_PORT")
246 env_entries = [key for key in env_entries if key in os.environ]
247 logfile.write("+ %s%s\n" % (
248 "".join("%s=%s " % (key, os.environ[key]) for key in env_entries),
249 " ".join(map(pipes.quote, cmd))))
250 logfile.flush()
251
252 self.gdb_ports = []
253 self.process = self.start(cmd, logfile)
254
255 def start(self, cmd, logfile):
256 process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
257 stdout=logfile, stderr=logfile)
258
259 try:
260 # Wait for OpenOCD to have made it through riscv_examine(). When
261 # using OpenOCD to communicate with a simulator this may take a
262 # long time, and gdb will time out when trying to connect if we
263 # attempt too early.
264 start = time.time()
265 messaged = False
266 fd = open(Openocd.logname, "r")
267 while True:
268 line = fd.readline()
269 if not line:
270 if not process.poll() is None:
271 raise Exception("OpenOCD exited early.")
272 time.sleep(0.1)
273 continue
274
275 m = re.search(r"Listening on port (\d+) for gdb connections",
276 line)
277 if m:
278 self.gdb_ports.append(int(m.group(1)))
279
280 if "telnet server disabled" in line:
281 return process
282
283 if not messaged and time.time() - start > 1:
284 messaged = True
285 print "Waiting for OpenOCD to start..."
286 if (time.time() - start) > self.timeout:
287 raise Exception("Timed out waiting for OpenOCD to "
288 "listen for gdb")
289
290 except Exception:
291 print_log(Openocd.logname)
292 raise
293
294 def __del__(self):
295 try:
296 self.process.kill()
297 self.process.wait()
298 except (OSError, AttributeError):
299 pass
300
301 class OpenocdCli(object):
302 def __init__(self, port=4444):
303 self.child = pexpect.spawn(
304 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
305 self.child.expect("> ")
306
307 def command(self, cmd):
308 self.child.sendline(cmd)
309 self.child.expect(cmd)
310 self.child.expect("\n")
311 self.child.expect("> ")
312 return self.child.before.strip("\t\r\n \0")
313
314 def reg(self, reg=''):
315 output = self.command("reg %s" % reg)
316 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
317 values = {r: int(v, 0) for r, v in matches}
318 if reg:
319 return values[reg]
320 return values
321
322 def load_image(self, image):
323 output = self.command("load_image %s" % image)
324 if 'invalid ELF file, only 32bits files are supported' in output:
325 raise TestNotApplicable(output)
326
327 class CannotAccess(Exception):
328 def __init__(self, address):
329 Exception.__init__(self)
330 self.address = address
331
332 Thread = collections.namedtuple('Thread', ('id', 'description', 'target_id',
333 'name', 'frame'))
334
335 class Gdb(object):
336 """A single gdb class which can interact with one or more gdb instances."""
337
338 # pylint: disable=too-many-public-methods
339 # pylint: disable=too-many-instance-attributes
340
341 def __init__(self, ports,
342 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb"),
343 timeout=60, binary=None):
344 assert ports
345
346 self.ports = ports
347 self.cmd = cmd
348 self.timeout = timeout
349 self.binary = binary
350
351 self.stack = []
352 self.harts = {}
353
354 self.logfiles = []
355 self.children = []
356 for port in ports:
357 logfile = tempfile.NamedTemporaryFile(prefix="gdb@%d-" % port,
358 suffix=".log")
359 self.logfiles.append(logfile)
360 if print_log_names:
361 real_stdout.write("Temporary gdb log: %s\n" % logfile.name)
362 child = pexpect.spawn(cmd)
363 child.logfile = logfile
364 child.logfile.write("+ %s\n" % cmd)
365 self.children.append(child)
366 self.active_child = self.children[0]
367
368 def connect(self):
369 for port, child in zip(self.ports, self.children):
370 self.select_child(child)
371 self.wait()
372 self.command("set confirm off")
373 self.command("set width 0")
374 self.command("set height 0")
375 # Force consistency.
376 self.command("set print entry-values no")
377 self.command("set remotetimeout %d" % self.timeout)
378 self.command("target extended-remote localhost:%d" % port)
379 if self.binary:
380 self.command("file %s" % self.binary)
381 threads = self.threads()
382 for t in threads:
383 hartid = None
384 if t.name:
385 m = re.search(r"Hart (\d+)", t.name)
386 if m:
387 hartid = int(m.group(1))
388 if hartid is None:
389 if self.harts:
390 hartid = max(self.harts) + 1
391 else:
392 hartid = 0
393 # solo: True iff this is the only thread on this child
394 self.harts[hartid] = {'child': child,
395 'thread': t,
396 'solo': len(threads) == 1}
397
398 def __del__(self):
399 for child in self.children:
400 del child
401
402 def one_hart_per_gdb(self):
403 return all(h['solo'] for h in self.harts.itervalues())
404
405 def lognames(self):
406 return [logfile.name for logfile in self.logfiles]
407
408 def select_child(self, child):
409 self.active_child = child
410
411 def select_hart(self, hart):
412 h = self.harts[hart.id]
413 self.select_child(h['child'])
414 if not h['solo']:
415 output = self.command("thread %s" % h['thread'].id, timeout=10)
416 assert "Unknown" not in output
417
418 def push_state(self):
419 self.stack.append({
420 'active_child': self.active_child
421 })
422
423 def pop_state(self):
424 state = self.stack.pop()
425 self.active_child = state['active_child']
426
427 def wait(self):
428 """Wait for prompt."""
429 self.active_child.expect(r"\(gdb\)")
430
431 def command(self, command, timeout=6000):
432 """timeout is in seconds"""
433 self.active_child.sendline(command)
434 self.active_child.expect("\n", timeout=timeout)
435 self.active_child.expect(r"\(gdb\)", timeout=timeout)
436 return self.active_child.before.strip()
437
438 def global_command(self, command):
439 """Execute this command on every gdb that we control."""
440 with PrivateState(self):
441 for child in self.children:
442 self.select_child(child)
443 self.command(command)
444
445 def c(self, wait=True, timeout=-1, async=False):
446 """
447 Dumb c command.
448 In RTOS mode, gdb will resume all harts.
449 In multi-gdb mode, this command will just go to the current gdb, so
450 will only resume one hart.
451 """
452 if async:
453 async = "&"
454 else:
455 async = ""
456 if wait:
457 output = self.command("c%s" % async, timeout=timeout)
458 assert "Continuing" in output
459 return output
460 else:
461 self.active_child.sendline("c%s" % async)
462 self.active_child.expect("Continuing")
463
464 def c_all(self):
465 """
466 Resume every hart.
467
468 This function works fine when using multiple gdb sessions, but the
469 caller must be careful when using it nonetheless. gdb's behavior is to
470 not set breakpoints until just before the hart is resumed, and then
471 clears them as soon as the hart halts. That means that you can't set
472 one software breakpoint, and expect multiple harts to hit it. It's
473 possible that the first hart completes set/run/halt/clear before the
474 second hart even gets to resume, so it will never hit the breakpoint.
475 """
476 with PrivateState(self):
477 for child in self.children:
478 child.sendline("c")
479 child.expect("Continuing")
480
481 # Now wait for them all to halt
482 for child in self.children:
483 child.expect(r"\(gdb\)")
484
485 def interrupt(self):
486 self.active_child.send("\003")
487 self.active_child.expect(r"\(gdb\)", timeout=6000)
488 return self.active_child.before.strip()
489
490 def x(self, address, size='w'):
491 output = self.command("x/%s %s" % (size, address))
492 value = int(output.split(':')[1].strip(), 0)
493 return value
494
495 def p_raw(self, obj):
496 output = self.command("p %s" % obj)
497 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
498 if m:
499 raise CannotAccess(int(m.group(1), 0))
500 return output.split('=')[-1].strip()
501
502 def parse_string(self, text):
503 text = text.strip()
504 if text.startswith("{") and text.endswith("}"):
505 inner = text[1:-1]
506 return [self.parse_string(t) for t in inner.split(", ")]
507 elif text.startswith('"') and text.endswith('"'):
508 return text[1:-1]
509 else:
510 return int(text, 0)
511
512 def p(self, obj, fmt="/x"):
513 output = self.command("p%s %s" % (fmt, obj))
514 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
515 if m:
516 raise CannotAccess(int(m.group(1), 0))
517 rhs = output.split('=')[-1]
518 return self.parse_string(rhs)
519
520 def p_string(self, obj):
521 output = self.command("p %s" % obj)
522 value = shlex.split(output.split('=')[-1].strip())[1]
523 return value
524
525 def stepi(self):
526 output = self.command("stepi", timeout=60)
527 return output
528
529 def load(self):
530 output = self.command("load", timeout=6000)
531 assert "failed" not in output
532 assert "Transfer rate" in output
533
534 def b(self, location):
535 output = self.command("b %s" % location)
536 assert "not defined" not in output
537 assert "Breakpoint" in output
538 return output
539
540 def hbreak(self, location):
541 output = self.command("hbreak %s" % location)
542 assert "not defined" not in output
543 assert "Hardware assisted breakpoint" in output
544 return output
545
546 def threads(self):
547 output = self.command("info threads")
548 threads = []
549 for line in output.splitlines():
550 m = re.match(
551 r"[\s\*]*(\d+)\s*"
552 r"(Remote target|Thread (\d+)\s*\(Name: ([^\)]+))"
553 r"\s*(.*)", line)
554 if m:
555 threads.append(Thread(*m.groups()))
556 assert threads
557 #>>>if not threads:
558 #>>> threads.append(Thread('1', '1', 'Default', '???'))
559 return threads
560
561 def thread(self, thread):
562 return self.command("thread %s" % thread.id)
563
564 def where(self):
565 return self.command("where 1")
566
567 class PrivateState(object):
568 def __init__(self, gdb):
569 self.gdb = gdb
570
571 def __enter__(self):
572 self.gdb.push_state()
573
574 def __exit__(self, _type, _value, _traceback):
575 self.gdb.pop_state()
576
577 def run_all_tests(module, target, parsed):
578 if not os.path.exists(parsed.logs):
579 os.makedirs(parsed.logs)
580
581 overall_start = time.time()
582
583 global gdb_cmd # pylint: disable=global-statement
584 gdb_cmd = parsed.gdb
585
586 todo = []
587 examine_added = False
588 for hart in target.harts:
589 if parsed.misaval:
590 hart.misa = int(parsed.misaval, 16)
591 print "Using $misa from command line: 0x%x" % hart.misa
592 elif hart.misa:
593 print "Using $misa from hart definition: 0x%x" % hart.misa
594 elif not examine_added:
595 todo.append(("ExamineTarget", ExamineTarget, None))
596 examine_added = True
597
598 for name in dir(module):
599 definition = getattr(module, name)
600 if isinstance(definition, type) and hasattr(definition, 'test') and \
601 (not parsed.test or any(test in name for test in parsed.test)):
602 todo.append((name, definition, None))
603
604 results, count = run_tests(parsed, target, todo)
605
606 header("ran %d tests in %.0fs" % (count, time.time() - overall_start),
607 dash=':')
608
609 return print_results(results)
610
611 good_results = set(('pass', 'not_applicable'))
612 def run_tests(parsed, target, todo):
613 results = {}
614 count = 0
615
616 for name, definition, hart in todo:
617 log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %
618 (time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))
619 log_fd = open(log_name, 'w')
620 print "[%s] Starting > %s" % (name, log_name)
621 instance = definition(target, hart)
622 sys.stdout.flush()
623 log_fd.write("Test: %s\n" % name)
624 log_fd.write("Target: %s\n" % type(target).__name__)
625 start = time.time()
626 global real_stdout # pylint: disable=global-statement
627 real_stdout = sys.stdout
628 sys.stdout = log_fd
629 try:
630 result = instance.run()
631 log_fd.write("Result: %s\n" % result)
632 finally:
633 sys.stdout = real_stdout
634 log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
635 log_fd.flush()
636 print "[%s] %s in %.2fs" % (name, result, time.time() - start)
637 if result not in good_results and parsed.print_failures:
638 sys.stdout.write(open(log_name).read())
639 sys.stdout.flush()
640 results.setdefault(result, []).append((name, log_name))
641 count += 1
642 if result not in good_results and parsed.fail_fast:
643 break
644
645 return results, count
646
647 def print_results(results):
648 result = 0
649 for key, value in results.iteritems():
650 print "%d tests returned %s" % (len(value), key)
651 if key not in good_results:
652 result = 1
653 for name, log_name in value:
654 print " %s > %s" % (name, log_name)
655
656 return result
657
658 def add_test_run_options(parser):
659 parser.add_argument("--logs", default="logs",
660 help="Store logs in the specified directory.")
661 parser.add_argument("--fail-fast", "-f", action="store_true",
662 help="Exit as soon as any test fails.")
663 parser.add_argument("--print-failures", action="store_true",
664 help="When a test fails, print the log file to stdout.")
665 parser.add_argument("--print-log-names", "--pln", action="store_true",
666 help="Print names of temporary log files as soon as they are "
667 "created.")
668 parser.add_argument("test", nargs='*',
669 help="Run only tests that are named here.")
670 parser.add_argument("--gdb",
671 help="The command to use to start gdb.")
672 parser.add_argument("--misaval",
673 help="Don't run ExamineTarget, just assume the misa value which is "
674 "specified.")
675
676 def header(title, dash='-', length=78):
677 if title:
678 dashes = dash * (length - 4 - len(title))
679 before = dashes[:len(dashes)/2]
680 after = dashes[len(dashes)/2:]
681 print "%s[ %s ]%s" % (before, title, after)
682 else:
683 print dash * length
684
685 def print_log(path):
686 header(path)
687 for l in open(path, "r"):
688 sys.stdout.write(l)
689 print
690
691 class BaseTest(object):
692 compiled = {}
693
694 def __init__(self, target, hart=None):
695 self.target = target
696 if hart:
697 self.hart = hart
698 else:
699 self.hart = random.choice(target.harts)
700 self.hart = target.harts[-1] #<<<
701 self.server = None
702 self.target_process = None
703 self.binary = None
704 self.start = 0
705 self.logs = []
706
707 def early_applicable(self):
708 """Return a false value if the test has determined it cannot run
709 without ever needing to talk to the target or server."""
710 # pylint: disable=no-self-use
711 return True
712
713 def setup(self):
714 pass
715
716 def compile(self):
717 compile_args = getattr(self, 'compile_args', None)
718 if compile_args:
719 if compile_args not in BaseTest.compiled:
720 BaseTest.compiled[compile_args] = \
721 self.target.compile(self.hart, *compile_args)
722 self.binary = BaseTest.compiled.get(compile_args)
723
724 def classSetup(self):
725 self.compile()
726 self.target_process = self.target.create()
727 if self.target_process:
728 self.logs.append(self.target_process.logname)
729 try:
730 self.server = self.target.server()
731 self.logs.append(self.server.logname)
732 except Exception:
733 for log in self.logs:
734 print_log(log)
735 raise
736
737 def classTeardown(self):
738 del self.server
739 del self.target_process
740
741 def postMortem(self):
742 pass
743
744 def run(self):
745 """
746 If compile_args is set, compile a program and set self.binary.
747
748 Call setup().
749
750 Then call test() and return the result, displaying relevant information
751 if an exception is raised.
752 """
753
754 sys.stdout.flush()
755
756 if not self.early_applicable():
757 return "not_applicable"
758
759 self.start = time.time()
760
761 try:
762 self.classSetup()
763 self.setup()
764 result = self.test() # pylint: disable=no-member
765 except TestNotApplicable:
766 result = "not_applicable"
767 except Exception as e: # pylint: disable=broad-except
768 if isinstance(e, TestFailed):
769 result = "fail"
770 else:
771 result = "exception"
772 if isinstance(e, TestFailed):
773 header("Message")
774 print e.message
775 header("Traceback")
776 traceback.print_exc(file=sys.stdout)
777 try:
778 self.postMortem()
779 except Exception as e: # pylint: disable=broad-except
780 header("postMortem Exception")
781 print e
782 traceback.print_exc(file=sys.stdout)
783 return result
784
785 finally:
786 for log in self.logs:
787 print_log(log)
788 header("End of logs")
789 self.classTeardown()
790
791 if not result:
792 result = 'pass'
793 return result
794
795 gdb_cmd = None
796 class GdbTest(BaseTest):
797 def __init__(self, target, hart=None):
798 BaseTest.__init__(self, target, hart=hart)
799 self.gdb = None
800
801 def classSetup(self):
802 BaseTest.classSetup(self)
803
804 if gdb_cmd:
805 self.gdb = Gdb(self.server.gdb_ports, gdb_cmd,
806 timeout=self.target.timeout_sec, binary=self.binary)
807 else:
808 self.gdb = Gdb(self.server.gdb_ports,
809 timeout=self.target.timeout_sec, binary=self.binary)
810
811 self.logs += self.gdb.lognames()
812 self.gdb.connect()
813
814 self.gdb.global_command("set remotetimeout %d" %
815 self.target.timeout_sec)
816
817 for cmd in self.target.gdb_setup:
818 self.gdb.command(cmd)
819
820 self.gdb.select_hart(self.hart)
821
822 # FIXME: OpenOCD doesn't handle PRIV now
823 #self.gdb.p("$priv=3")
824
825 def postMortem(self):
826 if not self.gdb:
827 return
828 self.gdb.interrupt()
829 self.gdb.command("info registers all", timeout=10)
830
831 def classTeardown(self):
832 del self.gdb
833 BaseTest.classTeardown(self)
834
835 class GdbSingleHartTest(GdbTest):
836 def classSetup(self):
837 GdbTest.classSetup(self)
838
839 for hart in self.target.harts:
840 # Park all harts that we're not using in a safe place.
841 if hart != self.hart:
842 self.gdb.select_hart(hart)
843 self.gdb.p("$pc=loop_forever")
844 self.gdb.select_hart(self.hart)
845
846 class ExamineTarget(GdbTest):
847 def test(self):
848 for hart in self.target.harts:
849 self.gdb.select_hart(hart)
850
851 hart.misa = self.gdb.p("$misa")
852
853 txt = "RV"
854 misa_xlen = 0
855 if ((hart.misa & 0xFFFFFFFF) >> 30) == 1:
856 misa_xlen = 32
857 elif ((hart.misa & 0xFFFFFFFFFFFFFFFF) >> 62) == 2:
858 misa_xlen = 64
859 elif ((hart.misa & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) >> 126) == 3:
860 misa_xlen = 128
861 else:
862 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
863 self.hart.misa)
864
865 if misa_xlen != hart.xlen:
866 raise TestFailed("MISA reported XLEN of %d but we were "\
867 "expecting XLEN of %d\n" % (misa_xlen, hart.xlen))
868
869 txt += ("%d" % misa_xlen)
870
871 for i in range(26):
872 if hart.misa & (1<<i):
873 txt += chr(i + ord('A'))
874 print txt,
875
876 class TestFailed(Exception):
877 def __init__(self, message):
878 Exception.__init__(self)
879 self.message = message
880
881 class TestNotApplicable(Exception):
882 def __init__(self, message):
883 Exception.__init__(self)
884 self.message = message
885
886 def assertEqual(a, b):
887 if a != b:
888 raise TestFailed("%r != %r" % (a, b))
889
890 def assertNotEqual(a, b):
891 if a == b:
892 raise TestFailed("%r == %r" % (a, b))
893
894 def assertIn(a, b):
895 if a not in b:
896 raise TestFailed("%r not in %r" % (a, b))
897
898 def assertNotIn(a, b):
899 if a in b:
900 raise TestFailed("%r in %r" % (a, b))
901
902 def assertGreater(a, b):
903 if not a > b:
904 raise TestFailed("%r not greater than %r" % (a, b))
905
906 def assertLess(a, b):
907 if not a < b:
908 raise TestFailed("%r not less than %r" % (a, b))
909
910 def assertTrue(a):
911 if not a:
912 raise TestFailed("%r is not True" % a)
913
914 def assertRegexpMatches(text, regexp):
915 if not re.search(regexp, text):
916 raise TestFailed("can't find %r in %r" % (regexp, text))