94ee83e79792e93de50af688068fd276a4974d3b
16 print_log_names
= False
17 real_stdout
= sys
.stdout
19 # Note that gdb comes with its own testsuite. I was unable to figure out how to
20 # run that testsuite against the spike simulator.
23 for directory
in (os
.getcwd(), os
.path
.dirname(__file__
)):
24 fullpath
= os
.path
.join(directory
, path
)
25 relpath
= os
.path
.relpath(fullpath
)
26 if len(relpath
) >= len(fullpath
):
28 if os
.path
.exists(relpath
):
32 def compile(args
, xlen
=32): # pylint: disable=redefined-builtin
33 cc
= os
.path
.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
36 cmd
.append("-march=rv32imac")
37 cmd
.append("-mabi=ilp32")
39 cmd
.append("-march=rv64imac")
40 cmd
.append("-mabi=lp64")
42 found
= find_file(arg
)
48 print "+", " ".join(cmd
)
49 process
= subprocess
.Popen(cmd
, stdout
=subprocess
.PIPE
,
50 stderr
=subprocess
.PIPE
)
51 stdout
, stderr
= process
.communicate()
52 if process
.returncode
:
56 raise Exception("Compile failed!")
59 def __init__(self
, target
, halted
=False, timeout
=None, with_jtag_gdb
=True,
61 """Launch spike. Return tuple of its process and the port it's running
71 cmd
= self
.command(target
, harts
, halted
, timeout
, with_jtag_gdb
)
72 self
.infinite_loop
= target
.compile(harts
[0],
73 "programs/checksum.c", "programs/tiny-malloc.c",
74 "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
75 cmd
.append(self
.infinite_loop
)
76 self
.logfile
= tempfile
.NamedTemporaryFile(prefix
="spike-",
78 self
.logname
= self
.logfile
.name
80 real_stdout
.write("Temporary spike log: %s\n" % self
.logname
)
81 self
.logfile
.write("+ %s\n" % " ".join(cmd
))
83 self
.process
= subprocess
.Popen(cmd
, stdin
=subprocess
.PIPE
,
84 stdout
=self
.logfile
, stderr
=self
.logfile
)
89 m
= re
.search(r
"Listening for remote bitbang connection on "
90 r
"port (\d+).", open(self
.logname
).read())
92 self
.port
= int(m
.group(1))
93 os
.environ
['REMOTE_BITBANG_PORT'] = m
.group(1)
97 print_log(self
.logname
)
98 raise Exception("Didn't get spike message about bitbang "
101 def command(self
, target
, harts
, halted
, timeout
, with_jtag_gdb
):
102 # pylint: disable=no-self-use
104 cmd
= shlex
.split(target
.sim_cmd
)
106 spike
= os
.path
.expandvars("$RISCV/bin/spike")
109 cmd
+= ["-p%d" % len(harts
)]
111 assert len(set(t
.xlen
for t
in harts
)) == 1, \
112 "All spike harts must have the same XLEN"
117 isa
= "RV%dG" % harts
[0].xlen
119 cmd
+= ["--isa", isa
]
121 assert len(set(t
.ram
for t
in harts
)) == 1, \
122 "All spike harts must have the same RAM layout"
123 assert len(set(t
.ram_size
for t
in harts
)) == 1, \
124 "All spike harts must have the same RAM layout"
125 cmd
+= ["-m0x%x:0x%x" % (harts
[0].ram
, harts
[0].ram_size
)]
128 cmd
= ["timeout", str(timeout
)] + cmd
133 cmd
+= ['--rbb-port', '0']
134 os
.environ
['REMOTE_BITBANG_HOST'] = 'localhost'
146 def wait(self
, *args
, **kwargs
):
147 return self
.process
.wait(*args
, **kwargs
)
149 class VcsSim(object):
150 logfile
= tempfile
.NamedTemporaryFile(prefix
='simv', suffix
='.log')
151 logname
= logfile
.name
153 def __init__(self
, sim_cmd
=None, debug
=False, timeout
=300):
155 cmd
= shlex
.split(sim_cmd
)
158 cmd
+= ["+jtag_vpi_enable"]
160 cmd
[0] = cmd
[0] + "-debug"
161 cmd
+= ["+vcdplusfile=output/gdbserver.vpd"]
163 logfile
= open(self
.logname
, "w")
165 real_stdout
.write("Temporary VCS log: %s\n" % self
.logname
)
166 logfile
.write("+ %s\n" % " ".join(cmd
))
169 listenfile
= open(self
.logname
, "r")
170 listenfile
.seek(0, 2)
171 self
.process
= subprocess
.Popen(cmd
, stdin
=subprocess
.PIPE
,
172 stdout
=logfile
, stderr
=logfile
)
176 # Fail if VCS exits early
177 exit_code
= self
.process
.poll()
178 if exit_code
is not None:
179 raise RuntimeError('VCS simulator exited early with status %d'
182 line
= listenfile
.readline()
185 match
= re
.match(r
"^Listening on port (\d+)$", line
)
188 self
.port
= int(match
.group(1))
189 os
.environ
['JTAG_VPI_PORT'] = str(self
.port
)
191 if (time
.time() - start
) > timeout
:
192 raise Exception("Timed out waiting for VCS to listen for JTAG "
202 class Openocd(object):
203 logfile
= tempfile
.NamedTemporaryFile(prefix
='openocd', suffix
='.log')
204 logname
= logfile
.name
206 def __init__(self
, server_cmd
=None, config
=None, debug
=False, timeout
=60):
207 self
.timeout
= timeout
210 cmd
= shlex
.split(server_cmd
)
212 openocd
= os
.path
.expandvars("$RISCV/bin/openocd")
217 # This command needs to come before any config scripts on the command
218 # line, since they are executed in order.
220 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
223 # Disable tcl and telnet servers, since they are unused and because
224 # the port numbers will conflict if multiple OpenOCD processes are
225 # running on the same server.
229 "telnet_port disabled",
233 f
= find_file(config
)
235 print "Unable to read file " + config
242 logfile
= open(Openocd
.logname
, "w")
244 real_stdout
.write("Temporary OpenOCD log: %s\n" % Openocd
.logname
)
245 env_entries
= ("REMOTE_BITBANG_HOST", "REMOTE_BITBANG_PORT")
246 env_entries
= [key
for key
in env_entries
if key
in os
.environ
]
247 logfile
.write("+ %s%s\n" % (
248 "".join("%s=%s " % (key
, os
.environ
[key
]) for key
in env_entries
),
249 " ".join(map(pipes
.quote
, cmd
))))
253 self
.process
= self
.start(cmd
, logfile
)
255 def start(self
, cmd
, logfile
):
256 process
= subprocess
.Popen(cmd
, stdin
=subprocess
.PIPE
,
257 stdout
=logfile
, stderr
=logfile
)
260 # Wait for OpenOCD to have made it through riscv_examine(). When
261 # using OpenOCD to communicate with a simulator this may take a
262 # long time, and gdb will time out when trying to connect if we
266 fd
= open(Openocd
.logname
, "r")
270 if not process
.poll() is None:
271 raise Exception("OpenOCD exited early.")
275 m
= re
.search(r
"Listening on port (\d+) for gdb connections",
278 self
.gdb_ports
.append(int(m
.group(1)))
280 if "telnet server disabled" in line
:
283 if not messaged
and time
.time() - start
> 1:
285 print "Waiting for OpenOCD to start..."
286 if (time
.time() - start
) > self
.timeout
:
287 raise Exception("Timed out waiting for OpenOCD to "
291 print_log(Openocd
.logname
)
298 except (OSError, AttributeError):
301 class OpenocdCli(object):
302 def __init__(self
, port
=4444):
303 self
.child
= pexpect
.spawn(
304 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port
)
305 self
.child
.expect("> ")
307 def command(self
, cmd
):
308 self
.child
.sendline(cmd
)
309 self
.child
.expect(cmd
)
310 self
.child
.expect("\n")
311 self
.child
.expect("> ")
312 return self
.child
.before
.strip("\t\r\n \0")
314 def reg(self
, reg
=''):
315 output
= self
.command("reg %s" % reg
)
316 matches
= re
.findall(r
"(\w+) \(/\d+\): (0x[0-9A-F]+)", output
)
317 values
= {r
: int(v
, 0) for r
, v
in matches
}
322 def load_image(self
, image
):
323 output
= self
.command("load_image %s" % image
)
324 if 'invalid ELF file, only 32bits files are supported' in output
:
325 raise TestNotApplicable(output
)
327 class CannotAccess(Exception):
328 def __init__(self
, address
):
329 Exception.__init
__(self
)
330 self
.address
= address
332 Thread
= collections
.namedtuple('Thread', ('id', 'description', 'target_id',
336 """A single gdb class which can interact with one or more gdb instances."""
338 # pylint: disable=too-many-public-methods
339 # pylint: disable=too-many-instance-attributes
341 def __init__(self
, ports
,
342 cmd
=os
.path
.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb"),
343 timeout
=60, binary
=None):
348 self
.timeout
= timeout
357 logfile
= tempfile
.NamedTemporaryFile(prefix
="gdb@%d-" % port
,
359 self
.logfiles
.append(logfile
)
361 real_stdout
.write("Temporary gdb log: %s\n" % logfile
.name
)
362 child
= pexpect
.spawn(cmd
)
363 child
.logfile
= logfile
364 child
.logfile
.write("+ %s\n" % cmd
)
365 self
.children
.append(child
)
366 self
.active_child
= self
.children
[0]
369 for port
, child
in zip(self
.ports
, self
.children
):
370 self
.select_child(child
)
372 self
.command("set confirm off")
373 self
.command("set width 0")
374 self
.command("set height 0")
376 self
.command("set print entry-values no")
377 self
.command("set remotetimeout %d" % self
.timeout
)
378 self
.command("target extended-remote localhost:%d" % port
)
380 self
.command("file %s" % self
.binary
)
381 threads
= self
.threads()
385 m
= re
.search(r
"Hart (\d+)", t
.name
)
387 hartid
= int(m
.group(1))
390 hartid
= max(self
.harts
) + 1
393 self
.harts
[hartid
] = (child
, t
)
396 for child
in self
.children
:
400 return [logfile
.name
for logfile
in self
.logfiles
]
402 def select_child(self
, child
):
403 self
.active_child
= child
405 def select_hart(self
, hart
):
406 child
, thread
= self
.harts
[hart
.id]
407 self
.select_child(child
)
408 output
= self
.command("thread %s" % thread
.id)
409 assert "Unknown" not in output
411 def push_state(self
):
413 'active_child': self
.active_child
417 state
= self
.stack
.pop()
418 self
.active_child
= state
['active_child']
421 """Wait for prompt."""
422 self
.active_child
.expect(r
"\(gdb\)")
424 def command(self
, command
, timeout
=6000):
425 """timeout is in seconds"""
426 self
.active_child
.sendline(command
)
427 self
.active_child
.expect("\n", timeout
=timeout
)
428 self
.active_child
.expect(r
"\(gdb\)", timeout
=timeout
)
429 return self
.active_child
.before
.strip()
431 def global_command(self
, command
):
432 """Execute this command on every gdb that we control."""
433 with
PrivateState(self
):
434 for child
in self
.children
:
435 self
.select_child(child
)
436 self
.command(command
)
438 def c(self
, wait
=True, timeout
=-1, async=False):
441 In RTOS mode, gdb will resume all harts.
442 In multi-gdb mode, this command will just go to the current gdb, so
443 will only resume one hart.
450 output
= self
.command("c%s" % async, timeout
=timeout
)
451 assert "Continuing" in output
454 self
.active_child
.sendline("c%s" % async)
455 self
.active_child
.expect("Continuing")
461 This function works fine when using multiple gdb sessions, but the
462 caller must be careful when using it nonetheless. gdb's behavior is to
463 not set breakpoints until just before the hart is resumed, and then
464 clears them as soon as the hart halts. That means that you can't set
465 one software breakpoint, and expect multiple harts to hit it. It's
466 possible that the first hart completes set/run/halt/clear before the
467 second hart even gets to resume, so it will never hit the breakpoint.
469 with
PrivateState(self
):
470 for child
in self
.children
:
472 child
.expect("Continuing")
474 # Now wait for them all to halt
475 for child
in self
.children
:
476 child
.expect(r
"\(gdb\)")
479 self
.active_child
.send("\003")
480 self
.active_child
.expect(r
"\(gdb\)", timeout
=6000)
481 return self
.active_child
.before
.strip()
483 def x(self
, address
, size
='w'):
484 output
= self
.command("x/%s %s" % (size
, address
))
485 value
= int(output
.split(':')[1].strip(), 0)
488 def p_raw(self
, obj
):
489 output
= self
.command("p %s" % obj
)
490 m
= re
.search("Cannot access memory at address (0x[0-9a-f]+)", output
)
492 raise CannotAccess(int(m
.group(1), 0))
493 return output
.split('=')[-1].strip()
495 def parse_string(self
, text
):
497 if text
.startswith("{") and text
.endswith("}"):
499 return [self
.parse_string(t
) for t
in inner
.split(", ")]
500 elif text
.startswith('"') and text
.endswith('"'):
505 def p(self
, obj
, fmt
="/x"):
506 output
= self
.command("p%s %s" % (fmt
, obj
))
507 m
= re
.search("Cannot access memory at address (0x[0-9a-f]+)", output
)
509 raise CannotAccess(int(m
.group(1), 0))
510 rhs
= output
.split('=')[-1]
511 return self
.parse_string(rhs
)
513 def p_string(self
, obj
):
514 output
= self
.command("p %s" % obj
)
515 value
= shlex
.split(output
.split('=')[-1].strip())[1]
519 output
= self
.command("stepi", timeout
=60)
523 output
= self
.command("load", timeout
=6000)
524 assert "failed" not in output
525 assert "Transfer rate" in output
527 def b(self
, location
):
528 output
= self
.command("b %s" % location
)
529 assert "not defined" not in output
530 assert "Breakpoint" in output
533 def hbreak(self
, location
):
534 output
= self
.command("hbreak %s" % location
)
535 assert "not defined" not in output
536 assert "Hardware assisted breakpoint" in output
540 output
= self
.command("info threads")
542 for line
in output
.splitlines():
545 r
"(Remote target|Thread (\d+)\s*\(Name: ([^\)]+))"
548 threads
.append(Thread(*m
.groups()))
551 #>>> threads.append(Thread('1', '1', 'Default', '???'))
554 def thread(self
, thread
):
555 return self
.command("thread %s" % thread
.id)
558 return self
.command("where 1")
560 class PrivateState(object):
561 def __init__(self
, gdb
):
565 self
.gdb
.push_state()
567 def __exit__(self
, _type
, _value
, _traceback
):
570 def run_all_tests(module
, target
, parsed
):
571 if not os
.path
.exists(parsed
.logs
):
572 os
.makedirs(parsed
.logs
)
574 overall_start
= time
.time()
576 global gdb_cmd
# pylint: disable=global-statement
580 examine_added
= False
581 for hart
in target
.harts
:
583 hart
.misa
= int(parsed
.misaval
, 16)
584 print "Using $misa from command line: 0x%x" % hart
.misa
586 print "Using $misa from hart definition: 0x%x" % hart
.misa
587 elif not examine_added
:
588 todo
.append(("ExamineTarget", ExamineTarget
, None))
591 for name
in dir(module
):
592 definition
= getattr(module
, name
)
593 if isinstance(definition
, type) and hasattr(definition
, 'test') and \
594 (not parsed
.test
or any(test
in name
for test
in parsed
.test
)):
595 todo
.append((name
, definition
, None))
597 results
, count
= run_tests(parsed
, target
, todo
)
599 header("ran %d tests in %.0fs" % (count
, time
.time() - overall_start
),
602 return print_results(results
)
604 good_results
= set(('pass', 'not_applicable'))
605 def run_tests(parsed
, target
, todo
):
609 for name
, definition
, hart
in todo
:
610 log_name
= os
.path
.join(parsed
.logs
, "%s-%s-%s.log" %
611 (time
.strftime("%Y%m%d-%H%M%S"), type(target
).__name
__, name
))
612 log_fd
= open(log_name
, 'w')
613 print "[%s] Starting > %s" % (name
, log_name
)
614 instance
= definition(target
, hart
)
616 log_fd
.write("Test: %s\n" % name
)
617 log_fd
.write("Target: %s\n" % type(target
).__name
__)
619 global real_stdout
# pylint: disable=global-statement
620 real_stdout
= sys
.stdout
623 result
= instance
.run()
624 log_fd
.write("Result: %s\n" % result
)
626 sys
.stdout
= real_stdout
627 log_fd
.write("Time elapsed: %.2fs\n" % (time
.time() - start
))
629 print "[%s] %s in %.2fs" % (name
, result
, time
.time() - start
)
630 if result
not in good_results
and parsed
.print_failures
:
631 sys
.stdout
.write(open(log_name
).read())
633 results
.setdefault(result
, []).append((name
, log_name
))
635 if result
not in good_results
and parsed
.fail_fast
:
638 return results
, count
640 def print_results(results
):
642 for key
, value
in results
.iteritems():
643 print "%d tests returned %s" % (len(value
), key
)
644 if key
not in good_results
:
646 for name
, log_name
in value
:
647 print " %s > %s" % (name
, log_name
)
651 def add_test_run_options(parser
):
652 parser
.add_argument("--logs", default
="logs",
653 help="Store logs in the specified directory.")
654 parser
.add_argument("--fail-fast", "-f", action
="store_true",
655 help="Exit as soon as any test fails.")
656 parser
.add_argument("--print-failures", action
="store_true",
657 help="When a test fails, print the log file to stdout.")
658 parser
.add_argument("--print-log-names", "--pln", action
="store_true",
659 help="Print names of temporary log files as soon as they are "
661 parser
.add_argument("test", nargs
='*',
662 help="Run only tests that are named here.")
663 parser
.add_argument("--gdb",
664 help="The command to use to start gdb.")
665 parser
.add_argument("--misaval",
666 help="Don't run ExamineTarget, just assume the misa value which is "
669 def header(title
, dash
='-', length
=78):
671 dashes
= dash
* (length
- 4 - len(title
))
672 before
= dashes
[:len(dashes
)/2]
673 after
= dashes
[len(dashes
)/2:]
674 print "%s[ %s ]%s" % (before
, title
, after
)
680 for l
in open(path
, "r"):
684 class BaseTest(object):
687 def __init__(self
, target
, hart
=None):
692 self
.hart
= random
.choice(target
.harts
)
693 self
.hart
= target
.harts
[-1] #<<<
695 self
.target_process
= None
700 def early_applicable(self
):
701 """Return a false value if the test has determined it cannot run
702 without ever needing to talk to the target or server."""
703 # pylint: disable=no-self-use
710 compile_args
= getattr(self
, 'compile_args', None)
712 if compile_args
not in BaseTest
.compiled
:
713 BaseTest
.compiled
[compile_args
] = \
714 self
.target
.compile(self
.hart
, *compile_args
)
715 self
.binary
= BaseTest
.compiled
.get(compile_args
)
717 def classSetup(self
):
719 self
.target_process
= self
.target
.create()
720 if self
.target_process
:
721 self
.logs
.append(self
.target_process
.logname
)
723 self
.server
= self
.target
.server()
724 self
.logs
.append(self
.server
.logname
)
726 for log
in self
.logs
:
730 def classTeardown(self
):
732 del self
.target_process
734 def postMortem(self
):
739 If compile_args is set, compile a program and set self.binary.
743 Then call test() and return the result, displaying relevant information
744 if an exception is raised.
749 if not self
.early_applicable():
750 return "not_applicable"
752 self
.start
= time
.time()
757 result
= self
.test() # pylint: disable=no-member
758 except TestNotApplicable
:
759 result
= "not_applicable"
760 except Exception as e
: # pylint: disable=broad-except
761 if isinstance(e
, TestFailed
):
765 if isinstance(e
, TestFailed
):
769 traceback
.print_exc(file=sys
.stdout
)
772 except Exception as e
: # pylint: disable=broad-except
773 header("postMortem Exception")
775 traceback
.print_exc(file=sys
.stdout
)
779 for log
in self
.logs
:
781 header("End of logs")
789 class GdbTest(BaseTest
):
790 def __init__(self
, target
, hart
=None):
791 BaseTest
.__init
__(self
, target
, hart
=hart
)
794 def classSetup(self
):
795 BaseTest
.classSetup(self
)
798 self
.gdb
= Gdb(self
.server
.gdb_ports
, gdb_cmd
,
799 timeout
=self
.target
.timeout_sec
, binary
=self
.binary
)
801 self
.gdb
= Gdb(self
.server
.gdb_ports
,
802 timeout
=self
.target
.timeout_sec
, binary
=self
.binary
)
804 self
.logs
+= self
.gdb
.lognames()
807 self
.gdb
.global_command("set remotetimeout %d" %
808 self
.target
.timeout_sec
)
810 for cmd
in self
.target
.gdb_setup
:
811 self
.gdb
.command(cmd
)
813 self
.gdb
.select_hart(self
.hart
)
815 # FIXME: OpenOCD doesn't handle PRIV now
816 #self.gdb.p("$priv=3")
818 def postMortem(self
):
822 self
.gdb
.command("info registers all", timeout
=10)
824 def classTeardown(self
):
826 BaseTest
.classTeardown(self
)
828 class GdbSingleHartTest(GdbTest
):
829 def classSetup(self
):
830 GdbTest
.classSetup(self
)
832 for hart
in self
.target
.harts
:
833 # Park all harts that we're not using in a safe place.
834 if hart
!= self
.hart
:
835 self
.gdb
.select_hart(hart
)
836 self
.gdb
.p("$pc=loop_forever")
837 self
.gdb
.select_hart(self
.hart
)
839 class ExamineTarget(GdbTest
):
841 for hart
in self
.target
.harts
:
842 self
.gdb
.select_hart(hart
)
844 hart
.misa
= self
.gdb
.p("$misa")
848 if ((hart
.misa
& 0xFFFFFFFF) >> 30) == 1:
850 elif ((hart
.misa
& 0xFFFFFFFFFFFFFFFF) >> 62) == 2:
852 elif ((hart
.misa
& 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) >> 126) == 3:
855 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
858 if misa_xlen
!= hart
.xlen
:
859 raise TestFailed("MISA reported XLEN of %d but we were "\
860 "expecting XLEN of %d\n" % (misa_xlen
, hart
.xlen
))
862 txt
+= ("%d" % misa_xlen
)
865 if hart
.misa
& (1<<i
):
866 txt
+= chr(i
+ ord('A'))
869 class TestFailed(Exception):
870 def __init__(self
, message
):
871 Exception.__init
__(self
)
872 self
.message
= message
874 class TestNotApplicable(Exception):
875 def __init__(self
, message
):
876 Exception.__init
__(self
)
877 self
.message
= message
879 def assertEqual(a
, b
):
881 raise TestFailed("%r != %r" % (a
, b
))
883 def assertNotEqual(a
, b
):
885 raise TestFailed("%r == %r" % (a
, b
))
889 raise TestFailed("%r not in %r" % (a
, b
))
891 def assertNotIn(a
, b
):
893 raise TestFailed("%r in %r" % (a
, b
))
895 def assertGreater(a
, b
):
897 raise TestFailed("%r not greater than %r" % (a
, b
))
899 def assertLess(a
, b
):
901 raise TestFailed("%r not less than %r" % (a
, b
))
905 raise TestFailed("%r is not True" % a
)
907 def assertRegexpMatches(text
, regexp
):
908 if not re
.search(regexp
, text
):
909 raise TestFailed("can't find %r in %r" % (regexp
, text
))