bump env
[riscv-tests.git] / debug / testlib.py
index 3aaa542a5c76dce60bf92ca485119e2045b3dbe0..59440b361330cbf472be506cfc6bfa83c2ec3797 100644 (file)
@@ -57,11 +57,12 @@ def compile(args, xlen=32): # pylint: disable=redefined-builtin
 
 class Spike(object):
     def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True,
-            isa=None):
+            isa=None, progbufsize=None):
         """Launch spike. Return tuple of its process and the port it's running
         on."""
         self.process = None
         self.isa = isa
+        self.progbufsize = progbufsize
 
         if target.harts:
             harts = target.harts
@@ -117,6 +118,11 @@ class Spike(object):
             isa = "RV%dG" % harts[0].xlen
 
         cmd += ["--isa", isa]
+        cmd += ["--debug-auth"]
+
+        if not self.progbufsize is None:
+            cmd += ["--progsize", str(self.progbufsize)]
+            cmd += ["--debug-sba", "32"]
 
         assert len(set(t.ram for t in harts)) == 1, \
                 "All spike harts must have the same RAM layout"
@@ -293,7 +299,13 @@ class Openocd(object):
 
     def __del__(self):
         try:
-            self.process.kill()
+            self.process.terminate()
+            start = time.time()
+            while time.time() < start + 10000:
+                if self.process.poll():
+                    break
+            else:
+                self.process.kill()
             self.process.wait()
         except (OSError, AttributeError):
             pass
@@ -418,7 +430,7 @@ class Gdb(object):
         h = self.harts[hart.id]
         self.select_child(h['child'])
         if not h['solo']:
-            output = self.command("thread %s" % h['thread'].id, timeout=10)
+            output = self.command("thread %s" % h['thread'].id, ops=5)
             assert "Unknown" not in output
 
     def push_state(self):
@@ -434,8 +446,11 @@ class Gdb(object):
         """Wait for prompt."""
         self.active_child.expect(r"\(gdb\)")
 
-    def command(self, command, timeout=6000):
-        """timeout is in seconds"""
+    def command(self, command, ops=1):
+        """ops is the estimated number of operations gdb will have to perform
+        to perform this command. It is used to compute a timeout based on
+        self.timeout."""
+        timeout = ops * self.timeout
         self.active_child.sendline(command)
         self.active_child.expect("\n", timeout=timeout)
         self.active_child.expect(r"\(gdb\)", timeout=timeout)
@@ -448,7 +463,7 @@ class Gdb(object):
                 self.select_child(child)
                 self.command(command)
 
-    def c(self, wait=True, timeout=-1, async=False):
+    def c(self, wait=True, async=False):
         """
         Dumb c command.
         In RTOS mode, gdb will resume all harts.
@@ -459,15 +474,16 @@ class Gdb(object):
             async = "&"
         else:
             async = ""
+        ops = 10
         if wait:
-            output = self.command("c%s" % async, timeout=timeout)
+            output = self.command("c%s" % async, ops=ops)
             assert "Continuing" in output
             return output
         else:
             self.active_child.sendline("c%s" % async)
-            self.active_child.expect("Continuing")
+            self.active_child.expect("Continuing", timeout=ops * self.timeout)
 
-    def c_all(self):
+    def c_all(self, wait=True):
         """
         Resume every hart.
 
@@ -484,15 +500,20 @@ class Gdb(object):
                 child.sendline("c")
                 child.expect("Continuing")
 
-            # Now wait for them all to halt
-            for child in self.children:
-                child.expect(r"\(gdb\)")
+            if wait:
+                for child in self.children:
+                    child.expect(r"\(gdb\)")
 
     def interrupt(self):
         self.active_child.send("\003")
         self.active_child.expect(r"\(gdb\)", timeout=6000)
         return self.active_child.before.strip()
 
+    def interrupt_all(self):
+        for child in self.children:
+            self.select_child(child)
+            self.interrupt()
+
     def x(self, address, size='w'):
         output = self.command("x/%s %s" % (size, address))
         value = int(output.split(':')[1].strip(), 0)
@@ -531,29 +552,43 @@ class Gdb(object):
         value = shlex.split(output.split('=')[-1].strip())[1]
         return value
 
+    def info_registers(self, group):
+        output = self.command("info registers %s" % group)
+        result = {}
+        for line in output.splitlines():
+            parts = line.split()
+            name = parts[0]
+            if "Could not fetch" in line:
+                result[name] = " ".join(parts[1:])
+            else:
+                result[name] = int(parts[1], 0)
+        return result
+
     def stepi(self):
-        output = self.command("stepi", timeout=60)
+        output = self.command("stepi", ops=10)
         return output
 
     def load(self):
-        output = self.command("load", timeout=6000)
+        output = self.command("load", ops=1000)
         assert "failed" not in  output
         assert "Transfer rate" in output
+        output = self.command("compare-sections", ops=1000)
+        assert "MIS" not in output
 
     def b(self, location):
-        output = self.command("b %s" % location)
+        output = self.command("b %s" % location, ops=5)
         assert "not defined" not in output
         assert "Breakpoint" in output
         return output
 
     def hbreak(self, location):
-        output = self.command("hbreak %s" % location)
+        output = self.command("hbreak %s" % location, ops=5)
         assert "not defined" not in output
         assert "Hardware assisted breakpoint" in output
         return output
 
     def threads(self):
-        output = self.command("info threads")
+        output = self.command("info threads", ops=100)
         threads = []
         for line in output.splitlines():
             m = re.match(
@@ -584,8 +619,12 @@ class PrivateState(object):
         self.gdb.pop_state()
 
 def run_all_tests(module, target, parsed):
-    if not os.path.exists(parsed.logs):
+    try:
         os.makedirs(parsed.logs)
+    except OSError:
+        # There's a race where multiple instances of the test program might
+        # decide to create the logs directory at the same time.
+        pass
 
     overall_start = time.time()
 
@@ -638,6 +677,9 @@ def run_tests(parsed, target, todo):
         try:
             result = instance.run()
             log_fd.write("Result: %s\n" % result)
+            log_fd.write("Logfile: %s\n" % log_name)
+            log_fd.write("Reproduce: %s %s %s\n" % (sys.argv[0], parsed.target,
+                name))
         finally:
             sys.stdout = real_stdout
             log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
@@ -691,12 +733,15 @@ def header(title, dash='-', length=78):
     else:
         print dash * length
 
-def print_log(path):
-    header(path)
-    for l in open(path, "r"):
+def print_log_handle(name, handle):
+    header(name)
+    for l in handle:
         sys.stdout.write(l)
     print
 
+def print_log(path):
+    print_log_handle(path, open(path, "r"))
+
 class BaseTest(object):
     compiled = {}
 
@@ -792,10 +837,15 @@ class BaseTest(object):
             return result
 
         finally:
+            # Get handles to logs before the files are deleted.
+            logs = []
             for log in self.logs:
-                print_log(log)
-            header("End of logs")
+                logs.append((log, open(log, "r")))
+
             self.classTeardown()
+            for name, handle in logs:
+                print_log_handle(name, handle)
+            header("End of logs")
 
         if not result:
             result = 'pass'
@@ -835,24 +885,30 @@ class GdbTest(BaseTest):
         if not self.gdb:
             return
         self.gdb.interrupt()
-        self.gdb.command("disassemble")
-        self.gdb.command("info registers all", timeout=10)
+        self.gdb.command("disassemble", ops=20)
+        self.gdb.command("info registers all", ops=100)
+        self.gdb.command("flush regs")
+        self.gdb.command("info threads", ops=100)
 
     def classTeardown(self):
         del self.gdb
         BaseTest.classTeardown(self)
 
-class GdbSingleHartTest(GdbTest):
-    def classSetup(self):
-        GdbTest.classSetup(self)
-
+    def parkOtherHarts(self):
+        """Park harts besides the currently selected one in loop_forever()."""
         for hart in self.target.harts:
             # Park all harts that we're not using in a safe place.
             if hart != self.hart:
                 self.gdb.select_hart(hart)
                 self.gdb.p("$pc=loop_forever")
+
         self.gdb.select_hart(self.hart)
 
+class GdbSingleHartTest(GdbTest):
+    def classSetup(self):
+        GdbTest.classSetup(self)
+        self.parkOtherHarts()
+
 class ExamineTarget(GdbTest):
     def test(self):
         for hart in self.target.harts: