Disable tcl and telnet servers when running OpenOCD because the port numbers might...
[riscv-tests.git] / debug / testlib.py
index 76989806b8ff57278c341c0776a3bb754dd4384e..ac1d7713f9f5f63d319efd3dbbfb42f98d500c73 100644 (file)
@@ -27,9 +27,17 @@ def compile(args, xlen=32): # pylint: disable=redefined-builtin
             cmd.append(found)
         else:
             cmd.append(arg)
-    cmd = " ".join(cmd)
-    result = os.system(cmd)
-    assert result == 0, "%r failed" % cmd
+    process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE)
+    stdout, stderr = process.communicate()
+    if process.returncode:
+        print
+        header("Compile failed")
+        print "+", " ".join(cmd)
+        print stdout,
+        print stderr,
+        header("")
+        raise Exception("Compile failed!")
 
 def unused_port():
     # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
@@ -108,7 +116,7 @@ class VcsSim(object):
             if match:
                 done = True
                 self.port = int(match.group(1))
-                print "Using port %d for JTAG VPI" % self.port
+                os.environ['JTAG_VPI_PORT'] = str(self.port)
 
     def __del__(self):
         try:
@@ -130,21 +138,26 @@ class Openocd(object):
         if debug:
             cmd.append("-d")
 
-        # Assign port
-        self.port = unused_port()
-        print "Using port %d for gdb server" % self.port
         # This command needs to come before any config scripts on the command
         # line, since they are executed in order.
-        cmd[1:1] = ["--command", "gdb_port %d" % self.port]
-
-        env = os.environ.copy()
-        env['JTAG_VPI_PORT'] = str(otherProcess.port)
+        cmd[1:1] = [
+            # Tell OpenOCD to bind gdb to an unused, ephemeral port.
+            "--command",
+            "gdb_port 0",
+            # Disable tcl and telnet servers, since they are unused and because
+            # the port numbers will conflict if multiple OpenOCD processes are
+            # running on the same server.
+            "--command",
+            "tcl_port disabled",
+            "--command",
+            "telnet_port disabled",
+        ]
 
         logfile = open(Openocd.logname, "w")
         logfile.write("+ %s\n" % " ".join(cmd))
         logfile.flush()
         self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
-                stdout=logfile, stderr=logfile, env=env)
+                stdout=logfile, stderr=logfile)
 
         # Wait for OpenOCD to have made it through riscv_examine(). When using
         # OpenOCD to communicate with a simulator this may take a long time,
@@ -162,6 +175,37 @@ class Openocd(object):
                 messaged = True
                 print "Waiting for OpenOCD to examine RISCV core..."
 
+        self.port = self._get_gdb_server_port()
+
+    def _get_gdb_server_port(self):
+        """Get port that OpenOCD's gdb server is listening on."""
+        MAX_ATTEMPTS = 50
+        PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
+        for _ in range(MAX_ATTEMPTS):
+            with open(os.devnull, 'w') as devnull:
+                try:
+                    output = subprocess.check_output([
+                        'lsof',
+                        '-a',  # Take the AND of the following selectors
+                        '-p{}'.format(self.process.pid),  # Filter on PID
+                        '-iTCP',  # Filter only TCP sockets
+                    ], stderr=devnull)
+                except subprocess.CalledProcessError:
+                    output = ""
+            matches = list(PORT_REGEX.finditer(output))
+            matches = [m for m in matches
+                    if m.group('port') not in ('6666', '4444')]
+            if len(matches) > 1:
+                print output
+                raise Exception(
+                    "OpenOCD listening on multiple ports. Cannot uniquely "
+                    "identify gdb server port.")
+            elif matches:
+                [match] = matches
+                return int(match.group('port'))
+            time.sleep(0.1)
+        raise Exception("Timed out waiting for gdb server to obtain port.")
+
     def __del__(self):
         try:
             self.process.kill()
@@ -171,14 +215,29 @@ class Openocd(object):
 
 class OpenocdCli(object):
     def __init__(self, port=4444):
-        self.child = pexpect.spawn("sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
+        self.child = pexpect.spawn(
+                "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
         self.child.expect("> ")
 
     def command(self, cmd):
         self.child.sendline(cmd)
+        self.child.expect(cmd)
         self.child.expect("\n")
         self.child.expect("> ")
-        return self.child.before.strip()
+        return self.child.before.strip("\t\r\n \0")
+
+    def reg(self, reg=''):
+        output = self.command("reg %s" % reg)
+        matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
+        values = {r: int(v, 0) for r, v in matches}
+        if reg:
+            return values[reg]
+        return values
+
+    def load_image(self, image):
+        output = self.command("load_image %s" % image)
+        if 'invalid ELF file, only 32bits files are supported' in output:
+            raise TestNotApplicable(output)
 
 class CannotAccess(Exception):
     def __init__(self, address):
@@ -227,6 +286,13 @@ class Gdb(object):
         value = int(output.split(':')[1].strip(), 0)
         return value
 
+    def p_raw(self, obj):
+        output = self.command("p %s" % obj)
+        m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
+        if m:
+            raise CannotAccess(int(m.group(1), 0))
+        return output.split('=')[-1].strip()
+
     def p(self, obj):
         output = self.command("p/x %s" % obj)
         m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
@@ -298,10 +364,13 @@ def add_test_run_options(parser):
             help="Run only tests that are named here.")
 
 def header(title, dash='-'):
-    dashes = dash * (36 - len(title))
-    before = dashes[:len(dashes)/2]
-    after = dashes[len(dashes)/2:]
-    print "%s[ %s ]%s" % (before, title, after)
+    if title:
+        dashes = dash * (36 - len(title))
+        before = dashes[:len(dashes)/2]
+        after = dashes[len(dashes)/2:]
+        print "%s[ %s ]%s" % (before, title, after)
+    else:
+        print dash * 40
 
 class BaseTest(object):
     compiled = {}
@@ -327,18 +396,9 @@ class BaseTest(object):
         compile_args = getattr(self, 'compile_args', None)
         if compile_args:
             if compile_args not in BaseTest.compiled:
-                try:
-                    # pylint: disable=star-args
-                    BaseTest.compiled[compile_args] = \
-                            self.target.compile(*compile_args)
-                except Exception: # pylint: disable=broad-except
-                    print "exception while compiling in %.2fs" % (
-                            time.time() - self.start)
-                    print "=" * 40
-                    header("Traceback")
-                    traceback.print_exc(file=sys.stdout)
-                    print "/" * 40
-                    return "exception"
+                # pylint: disable=star-args
+                BaseTest.compiled[compile_args] = \
+                        self.target.compile(*compile_args)
         self.binary = BaseTest.compiled.get(compile_args)
 
     def classSetup(self):
@@ -375,6 +435,8 @@ class BaseTest(object):
         try:
             self.setup()
             result = self.test()    # pylint: disable=no-member
+        except TestNotApplicable:
+            result = "not_applicable"
         except Exception as e: # pylint: disable=broad-except
             if isinstance(e, TestFailed):
                 result = "fail"
@@ -406,6 +468,11 @@ class TestFailed(Exception):
         Exception.__init__(self)
         self.message = message
 
+class TestNotApplicable(Exception):
+    def __init__(self, message):
+        Exception.__init__(self)
+        self.message = message
+
 def assertEqual(a, b):
     if a != b:
         raise TestFailed("%r != %r" % (a, b))
@@ -426,6 +493,10 @@ def assertGreater(a, b):
     if not a > b:
         raise TestFailed("%r not greater than %r" % (a, b))
 
+def assertLess(a, b):
+    if not a < b:
+        raise TestFailed("%r not less than %r" % (a, b))
+
 def assertTrue(a):
     if not a:
         raise TestFailed("%r is not True" % a)