Decrease sleep time to 0.1s.
[riscv-tests.git] / debug / testlib.py
index 76989806b8ff57278c341c0776a3bb754dd4384e..b8e9ad4d209c47e41865e4aab933ab6d99d90504 100644 (file)
@@ -108,7 +108,7 @@ class VcsSim(object):
             if match:
                 done = True
                 self.port = int(match.group(1))
-                print "Using port %d for JTAG VPI" % self.port
+                os.environ['JTAG_VPI_PORT'] = str(self.port)
 
     def __del__(self):
         try:
@@ -130,21 +130,16 @@ class Openocd(object):
         if debug:
             cmd.append("-d")
 
-        # Assign port
-        self.port = unused_port()
-        print "Using port %d for gdb server" % self.port
         # This command needs to come before any config scripts on the command
         # line, since they are executed in order.
-        cmd[1:1] = ["--command", "gdb_port %d" % self.port]
-
-        env = os.environ.copy()
-        env['JTAG_VPI_PORT'] = str(otherProcess.port)
+        # Tell OpenOCD to bind to an unused port.
+        cmd[1:1] = ["--command", "gdb_port %d" % 0]
 
         logfile = open(Openocd.logname, "w")
         logfile.write("+ %s\n" % " ".join(cmd))
         logfile.flush()
         self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
-                stdout=logfile, stderr=logfile, env=env)
+                stdout=logfile, stderr=logfile)
 
         # Wait for OpenOCD to have made it through riscv_examine(). When using
         # OpenOCD to communicate with a simulator this may take a long time,
@@ -162,6 +157,31 @@ class Openocd(object):
                 messaged = True
                 print "Waiting for OpenOCD to examine RISCV core..."
 
+        self.port = self._get_gdb_server_port()
+
+    def _get_gdb_server_port(self):
+        """Get port that OpenOCD's gdb server is listening on."""
+        MAX_ATTEMPTS = 50
+        PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
+        for _ in range(MAX_ATTEMPTS):
+            with open(os.devnull, 'w') as devnull:
+                output = subprocess.check_output([
+                    'lsof',
+                    '-a',  # Take the AND of the following selectors
+                    '-p{}'.format(self.process.pid),  # Filter on PID
+                    '-iTCP',  # Filter only TCP sockets
+                ], stderr=devnull)
+            matches = list(PORT_REGEX.finditer(output))
+            if len(matches) > 1:
+                raise Exception(
+                    "OpenOCD listening on multiple ports. Cannot uniquely "
+                    "identify gdb server port.")
+            elif matches:
+                [match] = matches
+                return int(match.group('port'))
+            time.sleep(0.1)
+        raise Exception("Timed out waiting for gdb server to obtain port.")
+
     def __del__(self):
         try:
             self.process.kill()
@@ -171,14 +191,29 @@ class Openocd(object):
 
 class OpenocdCli(object):
     def __init__(self, port=4444):
-        self.child = pexpect.spawn("sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
+        self.child = pexpect.spawn(
+                "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
         self.child.expect("> ")
 
     def command(self, cmd):
         self.child.sendline(cmd)
+        self.child.expect(cmd)
         self.child.expect("\n")
         self.child.expect("> ")
-        return self.child.before.strip()
+        return self.child.before.strip("\t\r\n \0")
+
+    def reg(self, reg=''):
+        output = self.command("reg %s" % reg)
+        matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
+        values = {r: int(v, 0) for r, v in matches}
+        if reg:
+            return values[reg]
+        return values
+
+    def load_image(self, image):
+        output = self.command("load_image %s" % image)
+        if 'invalid ELF file, only 32bits files are supported' in output:
+            raise TestNotApplicable(output)
 
 class CannotAccess(Exception):
     def __init__(self, address):
@@ -227,6 +262,13 @@ class Gdb(object):
         value = int(output.split(':')[1].strip(), 0)
         return value
 
+    def p_raw(self, obj):
+        output = self.command("p %s" % obj)
+        m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
+        if m:
+            raise CannotAccess(int(m.group(1), 0))
+        return output.split('=')[-1].strip()
+
     def p(self, obj):
         output = self.command("p/x %s" % obj)
         m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
@@ -375,6 +417,8 @@ class BaseTest(object):
         try:
             self.setup()
             result = self.test()    # pylint: disable=no-member
+        except TestNotApplicable:
+            result = "not_applicable"
         except Exception as e: # pylint: disable=broad-except
             if isinstance(e, TestFailed):
                 result = "fail"
@@ -406,6 +450,11 @@ class TestFailed(Exception):
         Exception.__init__(self)
         self.message = message
 
+class TestNotApplicable(Exception):
+    def __init__(self, message):
+        Exception.__init__(self)
+        self.message = message
+
 def assertEqual(a, b):
     if a != b:
         raise TestFailed("%r != %r" % (a, b))
@@ -426,6 +475,10 @@ def assertGreater(a, b):
     if not a > b:
         raise TestFailed("%r not greater than %r" % (a, b))
 
+def assertLess(a, b):
+    if not a < b:
+        raise TestFailed("%r not less than %r" % (a, b))
+
 def assertTrue(a):
     if not a:
         raise TestFailed("%r is not True" % a)