14 # Note that gdb comes with its own testsuite. I was unable to figure out how to
15 # run that testsuite against the spike simulator.
18 for directory
in (os
.getcwd(), os
.path
.dirname(__file__
)):
19 fullpath
= os
.path
.join(directory
, path
)
20 if os
.path
.exists(fullpath
):
24 def compile(args
, xlen
=32): # pylint: disable=redefined-builtin
25 cc
= os
.path
.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
28 cmd
.append("-march=rv32imac")
29 cmd
.append("-mabi=ilp32")
31 cmd
.append("-march=rv64imac")
32 cmd
.append("-mabi=lp64")
34 found
= find_file(arg
)
39 process
= subprocess
.Popen(cmd
, stdout
=subprocess
.PIPE
,
40 stderr
=subprocess
.PIPE
)
41 stdout
, stderr
= process
.communicate()
42 if process
.returncode
:
44 header("Compile failed")
45 print "+", " ".join(cmd
)
49 raise Exception("Compile failed!")
52 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
54 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
56 port
= s
.getsockname()[1]
61 logname
= "spike-%d.log" % os
.getpid()
63 def __init__(self
, target
, halted
=False, timeout
=None, with_jtag_gdb
=True):
64 """Launch spike. Return tuple of its process and the port it's running
67 cmd
= shlex
.split(target
.sim_cmd
)
69 spike
= os
.path
.expandvars("$RISCV/bin/spike")
72 cmd
+= ["--isa", "RV32G"]
74 cmd
+= ["--isa", "RV64G"]
75 cmd
+= ["-m0x%x:0x%x" % (target
.ram
, target
.ram_size
)]
78 cmd
= ["timeout", str(timeout
)] + cmd
83 cmd
+= ['--rbb-port', '0']
84 os
.environ
['REMOTE_BITBANG_HOST'] = 'localhost'
85 self
.infinite_loop
= target
.compile(
86 "programs/checksum.c", "programs/tiny-malloc.c",
87 "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
88 cmd
.append(self
.infinite_loop
)
89 logfile
= open(self
.logname
, "w")
90 logfile
.write("+ %s\n" % " ".join(cmd
))
92 self
.process
= subprocess
.Popen(cmd
, stdin
=subprocess
.PIPE
,
93 stdout
=logfile
, stderr
=logfile
)
98 m
= re
.search(r
"Listening for remote bitbang connection on "
99 r
"port (\d+).", open(self
.logname
).read())
101 self
.port
= int(m
.group(1))
102 os
.environ
['REMOTE_BITBANG_PORT'] = m
.group(1)
105 assert self
.port
, "Didn't get spike message about bitbang " \
115 def wait(self
, *args
, **kwargs
):
116 return self
.process
.wait(*args
, **kwargs
)
118 class VcsSim(object):
119 def __init__(self
, sim_cmd
=None, debug
=False):
121 cmd
= shlex
.split(sim_cmd
)
124 cmd
+= ["+jtag_vpi_enable"]
126 cmd
[0] = cmd
[0] + "-debug"
127 cmd
+= ["+vcdplusfile=output/gdbserver.vpd"]
128 logfile
= open("simv.log", "w")
129 logfile
.write("+ %s\n" % " ".join(cmd
))
131 listenfile
= open("simv.log", "r")
132 listenfile
.seek(0, 2)
133 self
.process
= subprocess
.Popen(cmd
, stdin
=subprocess
.PIPE
,
134 stdout
=logfile
, stderr
=logfile
)
137 # Fail if VCS exits early
138 exit_code
= self
.process
.poll()
139 if exit_code
is not None:
140 raise RuntimeError('VCS simulator exited early with status %d'
143 line
= listenfile
.readline()
146 match
= re
.match(r
"^Listening on port (\d+)$", line
)
149 self
.port
= int(match
.group(1))
150 os
.environ
['JTAG_VPI_PORT'] = str(self
.port
)
159 class Openocd(object):
160 logfile
= tempfile
.NamedTemporaryFile(prefix
='openocd', suffix
='.log')
161 logname
= logfile
.name
163 def __init__(self
, server_cmd
=None, config
=None, debug
=False):
165 cmd
= shlex
.split(server_cmd
)
167 openocd
= os
.path
.expandvars("$RISCV/bin/openocd")
172 # This command needs to come before any config scripts on the command
173 # line, since they are executed in order.
175 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
178 # Disable tcl and telnet servers, since they are unused and because
179 # the port numbers will conflict if multiple OpenOCD processes are
180 # running on the same server.
184 "telnet_port disabled",
188 f
= find_file(config
)
190 print "Unable to read file " + config
197 logfile
= open(Openocd
.logname
, "w")
198 logfile
.write("+ %s\n" % " ".join(cmd
))
200 self
.process
= subprocess
.Popen(cmd
, stdin
=subprocess
.PIPE
,
201 stdout
=logfile
, stderr
=logfile
)
203 # Wait for OpenOCD to have made it through riscv_examine(). When using
204 # OpenOCD to communicate with a simulator this may take a long time,
205 # and gdb will time out when trying to connect if we attempt too early.
209 log
= open(Openocd
.logname
).read()
210 if "Ready for Remote Connections" in log
:
212 if not self
.process
.poll() is None:
213 header("OpenOCD log")
214 sys
.stdout
.write(log
)
216 "OpenOCD exited before completing riscv_examine()")
217 if not messaged
and time
.time() - start
> 1:
219 print "Waiting for OpenOCD to examine RISCV core..."
220 if time
.time() - start
> 60:
221 raise Exception("ERROR: Timed out waiting for OpenOCD to "
222 "examine RISCV core")
225 self
.port
= self
._get
_gdb
_server
_port
()
227 header("OpenOCD log")
228 sys
.stdout
.write(log
)
231 def _get_gdb_server_port(self
):
232 """Get port that OpenOCD's gdb server is listening on."""
234 PORT_REGEX
= re
.compile(r
'(?P<port>\d+) \(LISTEN\)')
235 for _
in range(MAX_ATTEMPTS
):
236 with
open(os
.devnull
, 'w') as devnull
:
238 output
= subprocess
.check_output([
240 '-a', # Take the AND of the following selectors
241 '-p{}'.format(self
.process
.pid
), # Filter on PID
242 '-iTCP', # Filter only TCP sockets
244 except subprocess
.CalledProcessError
:
246 matches
= list(PORT_REGEX
.finditer(output
))
247 matches
= [m
for m
in matches
248 if m
.group('port') not in ('6666', '4444')]
252 "OpenOCD listening on multiple ports. Cannot uniquely "
253 "identify gdb server port.")
256 return int(match
.group('port'))
258 raise Exception("Timed out waiting for gdb server to obtain port.")
264 except (OSError, AttributeError):
267 class OpenocdCli(object):
268 def __init__(self
, port
=4444):
269 self
.child
= pexpect
.spawn(
270 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port
)
271 self
.child
.expect("> ")
273 def command(self
, cmd
):
274 self
.child
.sendline(cmd
)
275 self
.child
.expect(cmd
)
276 self
.child
.expect("\n")
277 self
.child
.expect("> ")
278 return self
.child
.before
.strip("\t\r\n \0")
280 def reg(self
, reg
=''):
281 output
= self
.command("reg %s" % reg
)
282 matches
= re
.findall(r
"(\w+) \(/\d+\): (0x[0-9A-F]+)", output
)
283 values
= {r
: int(v
, 0) for r
, v
in matches
}
288 def load_image(self
, image
):
289 output
= self
.command("load_image %s" % image
)
290 if 'invalid ELF file, only 32bits files are supported' in output
:
291 raise TestNotApplicable(output
)
293 class CannotAccess(Exception):
294 def __init__(self
, address
):
295 Exception.__init
__(self
)
296 self
.address
= address
298 Thread
= collections
.namedtuple('Thread', ('id', 'target_id', 'name',
302 logfile
= tempfile
.NamedTemporaryFile(prefix
="gdb", suffix
=".log")
303 logname
= logfile
.name
306 cmd
=os
.path
.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
307 self
.child
= pexpect
.spawn(cmd
)
308 self
.child
.logfile
= open(self
.logname
, "w")
309 self
.child
.logfile
.write("+ %s\n" % cmd
)
311 self
.command("set confirm off")
312 self
.command("set width 0")
313 self
.command("set height 0")
315 self
.command("set print entry-values no")
318 """Wait for prompt."""
319 self
.child
.expect(r
"\(gdb\)")
321 def command(self
, command
, timeout
=6000):
322 self
.child
.sendline(command
)
323 self
.child
.expect("\n", timeout
=timeout
)
324 self
.child
.expect(r
"\(gdb\)", timeout
=timeout
)
325 return self
.child
.before
.strip()
327 def c(self
, wait
=True, timeout
=-1, async=False):
333 output
= self
.command("c%s" % async, timeout
=timeout
)
334 assert "Continuing" in output
337 self
.child
.sendline("c%s" % async)
338 self
.child
.expect("Continuing")
341 self
.child
.send("\003")
342 self
.child
.expect(r
"\(gdb\)", timeout
=6000)
343 return self
.child
.before
.strip()
345 def x(self
, address
, size
='w'):
346 output
= self
.command("x/%s %s" % (size
, address
))
347 value
= int(output
.split(':')[1].strip(), 0)
350 def p_raw(self
, obj
):
351 output
= self
.command("p %s" % obj
)
352 m
= re
.search("Cannot access memory at address (0x[0-9a-f]+)", output
)
354 raise CannotAccess(int(m
.group(1), 0))
355 return output
.split('=')[-1].strip()
358 output
= self
.command("p/x %s" % obj
)
359 m
= re
.search("Cannot access memory at address (0x[0-9a-f]+)", output
)
361 raise CannotAccess(int(m
.group(1), 0))
362 value
= int(output
.split('=')[-1].strip(), 0)
365 def p_string(self
, obj
):
366 output
= self
.command("p %s" % obj
)
367 value
= shlex
.split(output
.split('=')[-1].strip())[1]
371 output
= self
.command("stepi")
375 output
= self
.command("load", timeout
=6000)
376 assert "failed" not in output
377 assert "Transfer rate" in output
379 def b(self
, location
):
380 output
= self
.command("b %s" % location
)
381 assert "not defined" not in output
382 assert "Breakpoint" in output
385 def hbreak(self
, location
):
386 output
= self
.command("hbreak %s" % location
)
387 assert "not defined" not in output
388 assert "Hardware assisted breakpoint" in output
392 output
= self
.command("info threads")
394 for line
in output
.splitlines():
396 r
"[\s\*]*(\d+)\s*Thread (\d+)\s*\(Name: ([^\)]+)\s*(.*)",
399 threads
.append(Thread(*m
.groups()))
401 threads
.append(Thread('1', '1', 'Default', '???'))
404 def thread(self
, thread
):
405 return self
.command("thread %s" % thread
.id)
407 def run_all_tests(module
, target
, parsed
):
408 if not os
.path
.exists(parsed
.logs
):
409 os
.makedirs(parsed
.logs
)
411 overall_start
= time
.time()
413 global gdb_cmd
# pylint: disable=global-statement
418 target
.misa
= int(parsed
.misaval
, 16)
419 print "Using $misa from command line: 0x%x" % target
.misa
421 print "Using $misa from target definition: 0x%x" % target
.misa
423 todo
.append(("ExamineTarget", ExamineTarget
))
425 for name
in dir(module
):
426 definition
= getattr(module
, name
)
427 if type(definition
) == type and hasattr(definition
, 'test') and \
428 (not parsed
.test
or any(test
in name
for test
in parsed
.test
)):
429 todo
.append((name
, definition
))
431 results
, count
= run_tests(parsed
, target
, todo
)
433 header("ran %d tests in %.0fs" % (count
, time
.time() - overall_start
),
436 return print_results(results
)
438 good_results
= set(('pass', 'not_applicable'))
439 def run_tests(parsed
, target
, todo
):
443 for name
, definition
in todo
:
444 instance
= definition(target
)
445 log_name
= os
.path
.join(parsed
.logs
, "%s-%s-%s.log" %
446 (time
.strftime("%Y%m%d-%H%M%S"), type(target
).__name
__, name
))
447 log_fd
= open(log_name
, 'w')
448 print "Running", name
, "...",
450 log_fd
.write("Test: %s\n" % name
)
451 log_fd
.write("Target: %s\n" % type(target
).__name
__)
453 real_stdout
= sys
.stdout
456 result
= instance
.run()
457 log_fd
.write("Result: %s\n" % result
)
459 sys
.stdout
= real_stdout
460 log_fd
.write("Time elapsed: %.2fs\n" % (time
.time() - start
))
461 print "%s in %.2fs" % (result
, time
.time() - start
)
463 results
.setdefault(result
, []).append(name
)
465 if result
not in good_results
and parsed
.fail_fast
:
468 return results
, count
470 def print_results(results
):
472 for key
, value
in results
.iteritems():
473 print "%d tests returned %s" % (len(value
), key
)
474 if key
not in good_results
:
481 def add_test_run_options(parser
):
483 parser
.add_argument("--logs", default
="logs",
484 help="Store logs in the specified directory.")
485 parser
.add_argument("--fail-fast", "-f", action
="store_true",
486 help="Exit as soon as any test fails.")
487 parser
.add_argument("test", nargs
='*',
488 help="Run only tests that are named here.")
489 parser
.add_argument("--gdb",
490 help="The command to use to start gdb.")
491 parser
.add_argument("--misaval",
492 help="Don't run ExamineTarget, just assume the misa value which is "
495 def header(title
, dash
='-', length
=78):
497 dashes
= dash
* (length
- 4 - len(title
))
498 before
= dashes
[:len(dashes
)/2]
499 after
= dashes
[len(dashes
)/2:]
500 print "%s[ %s ]%s" % (before
, title
, after
)
506 lines
= open(path
, "r").readlines()
511 class BaseTest(object):
514 def __init__(self
, target
):
517 self
.target_process
= None
522 def early_applicable(self
):
523 """Return a false value if the test has determined it cannot run
524 without ever needing to talk to the target or server."""
525 # pylint: disable=no-self-use
532 compile_args
= getattr(self
, 'compile_args', None)
534 if compile_args
not in BaseTest
.compiled
:
535 # pylint: disable=star-args
536 BaseTest
.compiled
[compile_args
] = \
537 self
.target
.compile(*compile_args
)
538 self
.binary
= BaseTest
.compiled
.get(compile_args
)
540 def classSetup(self
):
542 self
.target_process
= self
.target
.create()
543 if self
.target_process
:
544 self
.logs
.append(self
.target_process
.logname
)
546 self
.server
= self
.target
.server()
547 self
.logs
.append(self
.server
.logname
)
549 for log
in self
.logs
:
553 def classTeardown(self
):
555 del self
.target_process
559 If compile_args is set, compile a program and set self.binary.
563 Then call test() and return the result, displaying relevant information
564 if an exception is raised.
569 if not self
.early_applicable():
570 return "not_applicable"
572 self
.start
= time
.time()
577 result
= self
.test() # pylint: disable=no-member
578 except TestNotApplicable
:
579 result
= "not_applicable"
580 except Exception as e
: # pylint: disable=broad-except
581 if isinstance(e
, TestFailed
):
585 if isinstance(e
, TestFailed
):
589 traceback
.print_exc(file=sys
.stdout
)
593 for log
in self
.logs
:
595 header("End of logs")
603 class GdbTest(BaseTest
):
604 def __init__(self
, target
):
605 BaseTest
.__init
__(self
, target
)
608 def classSetup(self
):
609 BaseTest
.classSetup(self
)
612 self
.gdb
= Gdb(gdb_cmd
)
616 self
.logs
.append(self
.gdb
.logname
)
619 self
.gdb
.command("file %s" % self
.binary
)
621 self
.gdb
.command("set arch riscv:rv%d" % self
.target
.xlen
)
622 self
.gdb
.command("set remotetimeout %d" % self
.target
.timeout_sec
)
625 "target extended-remote localhost:%d" % self
.server
.port
)
626 # Select a random thread.
627 # TODO: Allow a command line option to force a specific thread.
628 thread
= random
.choice(self
.gdb
.threads())
629 self
.gdb
.thread(thread
)
631 for cmd
in self
.target
.gdb_setup
:
632 self
.gdb
.command(cmd
)
634 # FIXME: OpenOCD doesn't handle PRIV now
635 #self.gdb.p("$priv=3")
637 def classTeardown(self
):
639 BaseTest
.classTeardown(self
)
641 class ExamineTarget(GdbTest
):
643 self
.target
.misa
= self
.gdb
.p("$misa")
646 if (self
.target
.misa
>> 30) == 1:
648 elif (self
.target
.misa
>> 62) == 2:
650 elif (self
.target
.misa
>> 126) == 3:
653 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
657 if self
.target
.misa
& (1<<i
):
658 txt
+= chr(i
+ ord('A'))
661 class TestFailed(Exception):
662 def __init__(self
, message
):
663 Exception.__init
__(self
)
664 self
.message
= message
666 class TestNotApplicable(Exception):
667 def __init__(self
, message
):
668 Exception.__init
__(self
)
669 self
.message
= message
671 def assertEqual(a
, b
):
673 raise TestFailed("%r != %r" % (a
, b
))
675 def assertNotEqual(a
, b
):
677 raise TestFailed("%r == %r" % (a
, b
))
681 raise TestFailed("%r not in %r" % (a
, b
))
683 def assertNotIn(a
, b
):
685 raise TestFailed("%r in %r" % (a
, b
))
687 def assertGreater(a
, b
):
689 raise TestFailed("%r not greater than %r" % (a
, b
))
691 def assertLess(a
, b
):
693 raise TestFailed("%r not less than %r" % (a
, b
))
697 raise TestFailed("%r is not True" % a
)
699 def assertRegexpMatches(text
, regexp
):
700 if not re
.search(regexp
, text
):
701 raise TestFailed("can't find %r in %r" % (regexp
, text
))