import collections
+import os
import os.path
import random
import re
raise Exception("Compile failed!")
class Spike(object):
- logname = "spike-%d.log" % os.getpid()
-
def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True):
"""Launch spike. Return tuple of its process and the port it's running
on."""
"programs/checksum.c", "programs/tiny-malloc.c",
"programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
cmd.append(self.infinite_loop)
- logfile = open(self.logname, "w")
- logfile.write("+ %s\n" % " ".join(cmd))
- logfile.flush()
+ self.logfile = tempfile.NamedTemporaryFile(prefix="spike-",
+ suffix=".log")
+ self.logname = self.logfile.name
+ self.logfile.write("+ %s\n" % " ".join(cmd))
+ self.logfile.flush()
self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
- stdout=logfile, stderr=logfile)
+ stdout=self.logfile, stderr=self.logfile)
if with_jtag_gdb:
self.port = None
logfile.write("+ %s\n" % " ".join(cmd))
logfile.flush()
- self.ports = []
- self.port = None
+ self.gdb_ports = []
self.process = self.start(cmd, logfile)
def start(self, cmd, logfile):
# attempt too early.
start = time.time()
messaged = False
+ fd = open(Openocd.logname, "r")
while True:
- log = open(Openocd.logname).read()
+ line = fd.readline()
+ if not line:
+ if not process.poll() is None:
+ raise Exception("OpenOCD exited early.")
+ time.sleep(0.1)
+ continue
+
m = re.search(r"Listening on port (\d+) for gdb connections",
- log)
+ line)
if m:
- if not self.ports:
- self.port = int(m.group(1))
- self.ports.append(int(m.group(1)))
+ self.gdb_ports.append(int(m.group(1)))
- if "telnet server disabled" in log:
- break
+ if "telnet server disabled" in line:
+ return process
- if not process.poll() is None:
- raise Exception(
- "OpenOCD exited before completing riscv_examine()")
if not messaged and time.time() - start > 1:
messaged = True
print "Waiting for OpenOCD to start..."
if (time.time() - start) > self.timeout:
- raise Exception("ERROR: Timed out waiting for OpenOCD to "
+ raise Exception("Timed out waiting for OpenOCD to "
"listen for gdb")
- return process
+
except Exception:
- header("OpenOCD log")
- sys.stdout.write(log)
+ print_log(Openocd.logname)
raise
def __del__(self):
Exception.__init__(self)
self.address = address
-Thread = collections.namedtuple('Thread', ('id', 'target_id', 'name',
- 'frame'))
+Thread = collections.namedtuple('Thread', ('id', 'description', 'target_id',
+ 'name', 'frame'))
class Gdb(object):
- logfile = tempfile.NamedTemporaryFile(prefix="gdb", suffix=".log")
- logname = logfile.name
- print "GDB Temporary Log File: %s" % logname
-
- def __init__(self,
- cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
- self.child = pexpect.spawn(cmd)
- self.child.logfile = open(self.logname, "w")
- self.child.logfile.write("+ %s\n" % cmd)
- self.wait()
- self.command("set confirm off")
- self.command("set width 0")
- self.command("set height 0")
- # Force consistency.
- self.command("set print entry-values no")
+ """A single gdb class which can interact with one or more gdb instances."""
+
+ # pylint: disable=too-many-public-methods
+
+ def __init__(self, ports,
+ cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb"),
+ binary=None):
+ assert ports
+
+ self.stack = []
+
+ self.logfiles = []
+ self.children = []
+ for port in ports:
+ logfile = tempfile.NamedTemporaryFile(prefix="gdb@%d-" % port,
+ suffix=".log")
+ self.logfiles.append(logfile)
+ child = pexpect.spawn(cmd)
+ child.logfile = logfile
+ child.logfile.write("+ %s\n" % cmd)
+ self.children.append(child)
+ self.active_child = self.children[0]
+
+ self.harts = {}
+ for port, child in zip(ports, self.children):
+ self.select_child(child)
+ self.wait()
+ self.command("set confirm off")
+ self.command("set width 0")
+ self.command("set height 0")
+ # Force consistency.
+ self.command("set print entry-values no")
+ self.command("target extended-remote localhost:%d" % port)
+ if binary:
+ self.command("file %s" % binary)
+ threads = self.threads()
+ for t in threads:
+ hartid = None
+ if t.name:
+ m = re.search(r"Hart (\d+)", t.name)
+ if m:
+ hartid = int(m.group(1))
+ if hartid is None:
+ if self.harts:
+ hartid = max(self.harts) + 1
+ else:
+ hartid = 0
+ self.harts[hartid] = (child, t)
+
+ def __del__(self):
+ for child in self.children:
+ del child
+
+ def lognames(self):
+ return [logfile.name for logfile in self.logfiles]
+
+ def select_child(self, child):
+ self.active_child = child
def select_hart(self, hart):
- output = self.command("thread %d" % (hart.index + 1))
+ child, thread = self.harts[hart.id]
+ self.select_child(child)
+ output = self.command("thread %s" % thread.id)
assert "Unknown" not in output
+ def push_state(self):
+ self.stack.append({
+ 'active_child': self.active_child
+ })
+
+ def pop_state(self):
+ state = self.stack.pop()
+ self.active_child = state['active_child']
+
def wait(self):
"""Wait for prompt."""
- self.child.expect(r"\(gdb\)")
+ self.active_child.expect(r"\(gdb\)")
def command(self, command, timeout=6000):
"""timeout is in seconds"""
- self.child.sendline(command)
- self.child.expect("\n", timeout=timeout)
- self.child.expect(r"\(gdb\)", timeout=timeout)
- return self.child.before.strip()
+ self.active_child.sendline(command)
+ self.active_child.expect("\n", timeout=timeout)
+ self.active_child.expect(r"\(gdb\)", timeout=timeout)
+ return self.active_child.before.strip()
+
+ def global_command(self, command):
+ """Execute this command on every gdb that we control."""
+ with PrivateState(self):
+ for child in self.children:
+ self.select_child(child)
+ self.command(command)
def c(self, wait=True, timeout=-1, async=False):
+ """
+ Dumb c command.
+ In RTOS mode, gdb will resume all harts.
+ In multi-gdb mode, this command will just go to the current gdb, so
+ will only resume one hart.
+ """
if async:
async = "&"
else:
assert "Continuing" in output
return output
else:
- self.child.sendline("c%s" % async)
- self.child.expect("Continuing")
+ self.active_child.sendline("c%s" % async)
+ self.active_child.expect("Continuing")
+
+ def c_all(self):
+ """Resume every hart."""
+ with PrivateState(self):
+ for child in self.children:
+ child.sendline("c")
+ child.expect("Continuing")
+
+ # Now wait for them all to halt
+ for child in self.children:
+ child.expect(r"\(gdb\)")
def interrupt(self):
- self.child.send("\003")
- self.child.expect(r"\(gdb\)", timeout=6000)
- return self.child.before.strip()
+ self.active_child.send("\003")
+ self.active_child.expect(r"\(gdb\)", timeout=6000)
+ return self.active_child.before.strip()
def x(self, address, size='w'):
output = self.command("x/%s %s" % (size, address))
threads = []
for line in output.splitlines():
m = re.match(
- r"[\s\*]*(\d+)\s*Thread (\d+)\s*\(Name: ([^\)]+)\s*(.*)",
- line)
+ r"[\s\*]*(\d+)\s*"
+ r"(Remote target|Thread (\d+)\s*\(Name: ([^\)]+))"
+ r"\s*(.*)", line)
if m:
threads.append(Thread(*m.groups()))
- if not threads:
- threads.append(Thread('1', '1', 'Default', '???'))
+ assert threads
+ #>>>if not threads:
+ #>>> threads.append(Thread('1', '1', 'Default', '???'))
return threads
def thread(self, thread):
return self.command("thread %s" % thread.id)
+ def where(self):
+ return self.command("where 1")
+
+class PrivateState(object):
+ def __init__(self, gdb):
+ self.gdb = gdb
+
+ def __enter__(self):
+ self.gdb.push_state()
+
+ def __exit__(self, _type, _value, _traceback):
+ self.gdb.pop_state()
+
def run_all_tests(module, target, parsed):
if not os.path.exists(parsed.logs):
os.makedirs(parsed.logs)
log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %
(time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))
log_fd = open(log_name, 'w')
- print "Running %s > %s ..." % (name, log_name),
+ print "[%s] Starting > %s" % (name, log_name)
instance = definition(target, hart)
sys.stdout.flush()
log_fd.write("Test: %s\n" % name)
real_stdout = sys.stdout
sys.stdout = log_fd
try:
- result = instance.run()
+ result = instance.run(real_stdout)
log_fd.write("Result: %s\n" % result)
finally:
sys.stdout = real_stdout
log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
- print "%s in %.2fs" % (result, time.time() - start)
+ print "[%s] %s in %.2fs" % (name, result, time.time() - start)
if result not in good_results and parsed.print_failures:
sys.stdout.write(open(log_name).read())
sys.stdout.flush()
def postMortem(self):
pass
- def run(self):
+ def run(self, real_stdout):
"""
If compile_args is set, compile a program and set self.binary.
try:
self.classSetup()
+ real_stdout.write("[%s] Temporary logs: %s\n" % (
+ type(self).__name__, ", ".join(self.logs)))
self.setup()
result = self.test() # pylint: disable=no-member
except TestNotApplicable:
print e.message
header("Traceback")
traceback.print_exc(file=sys.stdout)
- self.postMortem()
+ try:
+ self.postMortem()
+ except Exception as e: # pylint: disable=broad-except
+ header("postMortem Exception")
+ print e
+ traceback.print_exc(file=sys.stdout)
return result
finally:
BaseTest.classSetup(self)
if gdb_cmd:
- self.gdb = Gdb(gdb_cmd)
+ self.gdb = Gdb(self.server.gdb_ports, gdb_cmd, binary=self.binary)
else:
- self.gdb = Gdb()
+ self.gdb = Gdb(self.server.gdb_ports, binary=self.binary)
- self.logs.append(self.gdb.logname)
+ self.logs += self.gdb.lognames()
- if self.binary:
- self.gdb.command("file %s" % self.binary)
if self.target:
- self.gdb.command("set arch riscv:rv%d" % self.hart.xlen)
- self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
- if self.server.port:
- self.gdb.command(
- "target extended-remote localhost:%d" % self.server.port)
- self.gdb.select_hart(self.hart)
+ self.gdb.global_command("set arch riscv:rv%d" % self.hart.xlen)
+ self.gdb.global_command("set remotetimeout %d" %
+ self.target.timeout_sec)
for cmd in self.target.gdb_setup:
self.gdb.command(cmd)
+ self.gdb.select_hart(self.hart)
+
# FIXME: OpenOCD doesn't handle PRIV now
#self.gdb.p("$priv=3")