cmd.append(found)
else:
cmd.append(arg)
- cmd = " ".join(cmd)
- result = os.system(cmd)
- assert result == 0, "%r failed" % cmd
+ process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ stdout, stderr = process.communicate()
+ if process.returncode:
+ print
+ header("Compile failed")
+ print "+", " ".join(cmd)
+ print stdout,
+ print stderr,
+ header("")
+ raise Exception("Compile failed!")
def unused_port():
# http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
line = listenfile.readline()
if not line:
time.sleep(1)
- if "Listening on port 5555" in line:
+ match = re.match(r"^Listening on port (\d+)$", line)
+ if match:
done = True
+ self.port = int(match.group(1))
+ os.environ['JTAG_VPI_PORT'] = str(self.port)
def __del__(self):
try:
cmd += ["-f", find_file(config)]
if debug:
cmd.append("-d")
+
+ # This command needs to come before any config scripts on the command
+ # line, since they are executed in order.
+ # Tell OpenOCD to bind to an unused port.
+ cmd[1:1] = ["--command", "gdb_port %d" % 0]
+
logfile = open(Openocd.logname, "w")
logfile.write("+ %s\n" % " ".join(cmd))
logfile.flush()
self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
stdout=logfile, stderr=logfile)
- # TODO: Pick a random port
- self.port = 3333
# Wait for OpenOCD to have made it through riscv_examine(). When using
# OpenOCD to communicate with a simulator this may take a long time,
messaged = True
print "Waiting for OpenOCD to examine RISCV core..."
+ self.port = self._get_gdb_server_port()
+
+ def _get_gdb_server_port(self):
+ """Get port that OpenOCD's gdb server is listening on."""
+ MAX_ATTEMPTS = 50
+ PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
+ for _ in range(MAX_ATTEMPTS):
+ with open(os.devnull, 'w') as devnull:
+ try:
+ output = subprocess.check_output([
+ 'lsof',
+ '-a', # Take the AND of the following selectors
+ '-p{}'.format(self.process.pid), # Filter on PID
+ '-iTCP', # Filter only TCP sockets
+ ], stderr=devnull)
+ except subprocess.CalledProcessError:
+ output = ""
+ matches = list(PORT_REGEX.finditer(output))
+ matches = [m for m in matches
+ if m.group('port') not in ('6666', '4444')]
+ if len(matches) > 1:
+ print output
+ raise Exception(
+ "OpenOCD listening on multiple ports. Cannot uniquely "
+ "identify gdb server port.")
+ elif matches:
+ [match] = matches
+ return int(match.group('port'))
+ time.sleep(0.1)
+ raise Exception("Timed out waiting for gdb server to obtain port.")
+
def __del__(self):
try:
self.process.kill()
class OpenocdCli(object):
def __init__(self, port=4444):
- self.child = pexpect.spawn("sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
+ self.child = pexpect.spawn(
+ "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
self.child.expect("> ")
def command(self, cmd):
self.child.sendline(cmd)
+ self.child.expect(cmd)
self.child.expect("\n")
self.child.expect("> ")
- return self.child.before.strip()
+ return self.child.before.strip("\t\r\n \0")
+
+ def reg(self, reg=''):
+ output = self.command("reg %s" % reg)
+ matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
+ values = {r: int(v, 0) for r, v in matches}
+ if reg:
+ return values[reg]
+ return values
+
+ def load_image(self, image):
+ output = self.command("load_image %s" % image)
+ if 'invalid ELF file, only 32bits files are supported' in output:
+ raise TestNotApplicable(output)
class CannotAccess(Exception):
def __init__(self, address):
value = int(output.split(':')[1].strip(), 0)
return value
+ def p_raw(self, obj):
+ output = self.command("p %s" % obj)
+ m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
+ if m:
+ raise CannotAccess(int(m.group(1), 0))
+ return output.split('=')[-1].strip()
+
def p(self, obj):
output = self.command("p/x %s" % obj)
m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
help="Run only tests that are named here.")
def header(title, dash='-'):
- dashes = dash * (36 - len(title))
- before = dashes[:len(dashes)/2]
- after = dashes[len(dashes)/2:]
- print "%s[ %s ]%s" % (before, title, after)
+ if title:
+ dashes = dash * (36 - len(title))
+ before = dashes[:len(dashes)/2]
+ after = dashes[len(dashes)/2:]
+ print "%s[ %s ]%s" % (before, title, after)
+ else:
+ print dash * 40
class BaseTest(object):
compiled = {}
compile_args = getattr(self, 'compile_args', None)
if compile_args:
if compile_args not in BaseTest.compiled:
- try:
- # pylint: disable=star-args
- BaseTest.compiled[compile_args] = \
- self.target.compile(*compile_args)
- except Exception: # pylint: disable=broad-except
- print "exception while compiling in %.2fs" % (
- time.time() - self.start)
- print "=" * 40
- header("Traceback")
- traceback.print_exc(file=sys.stdout)
- print "/" * 40
- return "exception"
+ # pylint: disable=star-args
+ BaseTest.compiled[compile_args] = \
+ self.target.compile(*compile_args)
self.binary = BaseTest.compiled.get(compile_args)
def classSetup(self):
try:
self.setup()
result = self.test() # pylint: disable=no-member
+ except TestNotApplicable:
+ result = "not_applicable"
except Exception as e: # pylint: disable=broad-except
if isinstance(e, TestFailed):
result = "fail"
Exception.__init__(self)
self.message = message
+class TestNotApplicable(Exception):
+ def __init__(self, message):
+ Exception.__init__(self)
+ self.message = message
+
def assertEqual(a, b):
if a != b:
raise TestFailed("%r != %r" % (a, b))
if not a > b:
raise TestFailed("%r not greater than %r" % (a, b))
+def assertLess(a, b):
+ if not a < b:
+ raise TestFailed("%r not less than %r" % (a, b))
+
def assertTrue(a):
if not a:
raise TestFailed("%r is not True" % a)