Test OpenOCD step and resume.
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv%d-unknown-elf-gcc" % xlen)
23 cmd = [cc, "-g"]
24 for arg in args:
25 found = find_file(arg)
26 if found:
27 cmd.append(found)
28 else:
29 cmd.append(arg)
30 cmd = " ".join(cmd)
31 result = os.system(cmd)
32 assert result == 0, "%r failed" % cmd
33
34 def unused_port():
35 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
36 import socket
37 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
38 s.bind(("", 0))
39 port = s.getsockname()[1]
40 s.close()
41 return port
42
43 class Spike(object):
44 logname = "spike.log"
45
46 def __init__(self, cmd, binary=None, halted=False, with_gdb=True,
47 timeout=None, xlen=64):
48 """Launch spike. Return tuple of its process and the port it's running
49 on."""
50 if cmd:
51 cmd = shlex.split(cmd)
52 else:
53 cmd = ["spike"]
54 if xlen == 32:
55 cmd += ["--isa", "RV32"]
56
57 if timeout:
58 cmd = ["timeout", str(timeout)] + cmd
59
60 if halted:
61 cmd.append('-H')
62 if with_gdb:
63 self.port = unused_port()
64 cmd += ['--gdb-port', str(self.port)]
65 cmd.append("-m32")
66 cmd.append('pk')
67 if binary:
68 cmd.append(binary)
69 logfile = open(self.logname, "w")
70 logfile.write("+ %s\n" % " ".join(cmd))
71 logfile.flush()
72 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
73 stdout=logfile, stderr=logfile)
74
75 def __del__(self):
76 try:
77 self.process.kill()
78 self.process.wait()
79 except OSError:
80 pass
81
82 def wait(self, *args, **kwargs):
83 return self.process.wait(*args, **kwargs)
84
85 class VcsSim(object):
86 def __init__(self, simv=None, debug=False):
87 if simv:
88 cmd = shlex.split(simv)
89 else:
90 cmd = ["simv"]
91 cmd += ["+jtag_vpi_enable"]
92 if debug:
93 cmd[0] = cmd[0] + "-debug"
94 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
95 logfile = open("simv.log", "w")
96 logfile.write("+ %s\n" % " ".join(cmd))
97 logfile.flush()
98 listenfile = open("simv.log", "r")
99 listenfile.seek(0, 2)
100 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
101 stdout=logfile, stderr=logfile)
102 done = False
103 while not done:
104 line = listenfile.readline()
105 if not line:
106 time.sleep(1)
107 match = re.match(r"^Listening on port (\d+)$", line)
108 if match:
109 done = True
110 self.port = int(match.group(1))
111 os.environ['JTAG_VPI_PORT'] = str(self.port)
112
113 def __del__(self):
114 try:
115 self.process.kill()
116 self.process.wait()
117 except OSError:
118 pass
119
120 class Openocd(object):
121 logname = "openocd.log"
122
123 def __init__(self, cmd=None, config=None, debug=False):
124 if cmd:
125 cmd = shlex.split(cmd)
126 else:
127 cmd = ["openocd"]
128 if config:
129 cmd += ["-f", find_file(config)]
130 if debug:
131 cmd.append("-d")
132
133 # Assign port
134 self.port = unused_port()
135 # This command needs to come before any config scripts on the command
136 # line, since they are executed in order.
137 cmd[1:1] = ["--command", "gdb_port %d" % self.port]
138
139 logfile = open(Openocd.logname, "w")
140 logfile.write("+ %s\n" % " ".join(cmd))
141 logfile.flush()
142 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
143 stdout=logfile, stderr=logfile)
144
145 # Wait for OpenOCD to have made it through riscv_examine(). When using
146 # OpenOCD to communicate with a simulator this may take a long time,
147 # and gdb will time out when trying to connect if we attempt too early.
148 start = time.time()
149 messaged = False
150 while True:
151 log = open(Openocd.logname).read()
152 if "Examined RISCV core" in log:
153 break
154 if not self.process.poll() is None:
155 raise Exception(
156 "OpenOCD exited before completing riscv_examine()")
157 if not messaged and time.time() - start > 1:
158 messaged = True
159 print "Waiting for OpenOCD to examine RISCV core..."
160
161 def __del__(self):
162 try:
163 self.process.kill()
164 self.process.wait()
165 except OSError:
166 pass
167
168 class OpenocdCli(object):
169 def __init__(self, port=4444):
170 self.child = pexpect.spawn(
171 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
172 self.child.expect("> ")
173
174 def command(self, cmd):
175 self.child.sendline(cmd)
176 self.child.expect(cmd)
177 self.child.expect("\n")
178 self.child.expect("> ")
179 return self.child.before.strip("\t\r\n \0")
180
181 def reg(self, reg=''):
182 output = self.command("reg %s" % reg)
183 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
184 values = {r: int(v, 0) for r, v in matches}
185 if reg:
186 return values[reg]
187 return values
188
189 def load_image(self, image):
190 output = self.command("load_image %s" % image)
191 if 'invalid ELF file, only 32bits files are supported' in output:
192 raise TestNotApplicable(output)
193
194 class CannotAccess(Exception):
195 def __init__(self, address):
196 Exception.__init__(self)
197 self.address = address
198
199 class Gdb(object):
200 def __init__(self,
201 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
202 self.child = pexpect.spawn(cmd)
203 self.child.logfile = open("gdb.log", "w")
204 self.child.logfile.write("+ %s\n" % cmd)
205 self.wait()
206 self.command("set confirm off")
207 self.command("set width 0")
208 self.command("set height 0")
209 # Force consistency.
210 self.command("set print entry-values no")
211
212 def wait(self):
213 """Wait for prompt."""
214 self.child.expect(r"\(gdb\)")
215
216 def command(self, command, timeout=-1):
217 self.child.sendline(command)
218 self.child.expect("\n", timeout=timeout)
219 self.child.expect(r"\(gdb\)", timeout=timeout)
220 return self.child.before.strip()
221
222 def c(self, wait=True):
223 if wait:
224 output = self.command("c")
225 assert "Continuing" in output
226 return output
227 else:
228 self.child.sendline("c")
229 self.child.expect("Continuing")
230
231 def interrupt(self):
232 self.child.send("\003")
233 self.child.expect(r"\(gdb\)", timeout=60)
234 return self.child.before.strip()
235
236 def x(self, address, size='w'):
237 output = self.command("x/%s %s" % (size, address))
238 value = int(output.split(':')[1].strip(), 0)
239 return value
240
241 def p(self, obj):
242 output = self.command("p/x %s" % obj)
243 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
244 if m:
245 raise CannotAccess(int(m.group(1), 0))
246 value = int(output.split('=')[-1].strip(), 0)
247 return value
248
249 def p_string(self, obj):
250 output = self.command("p %s" % obj)
251 value = shlex.split(output.split('=')[-1].strip())[1]
252 return value
253
254 def stepi(self):
255 output = self.command("stepi")
256 return output
257
258 def load(self):
259 output = self.command("load", timeout=60)
260 assert "failed" not in output
261 assert "Transfer rate" in output
262
263 def b(self, location):
264 output = self.command("b %s" % location)
265 assert "not defined" not in output
266 assert "Breakpoint" in output
267 return output
268
269 def hbreak(self, location):
270 output = self.command("hbreak %s" % location)
271 assert "not defined" not in output
272 assert "Hardware assisted breakpoint" in output
273 return output
274
275 def run_all_tests(module, target, tests, fail_fast):
276 good_results = set(('pass', 'not_applicable'))
277
278 start = time.time()
279
280 results = {}
281 count = 0
282 for name in dir(module):
283 definition = getattr(module, name)
284 if type(definition) == type and hasattr(definition, 'test') and \
285 (not tests or any(test in name for test in tests)):
286 instance = definition(target)
287 result = instance.run()
288 results.setdefault(result, []).append(name)
289 count += 1
290 if result not in good_results and fail_fast:
291 break
292
293 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
294
295 result = 0
296 for key, value in results.iteritems():
297 print "%d tests returned %s" % (len(value), key)
298 if key not in good_results:
299 result = 1
300 for test in value:
301 print " ", test
302
303 return result
304
305 def add_test_run_options(parser):
306 parser.add_argument("--fail-fast", "-f", action="store_true",
307 help="Exit as soon as any test fails.")
308 parser.add_argument("test", nargs='*',
309 help="Run only tests that are named here.")
310
311 def header(title, dash='-'):
312 dashes = dash * (36 - len(title))
313 before = dashes[:len(dashes)/2]
314 after = dashes[len(dashes)/2:]
315 print "%s[ %s ]%s" % (before, title, after)
316
317 class BaseTest(object):
318 compiled = {}
319 logs = []
320
321 def __init__(self, target):
322 self.target = target
323 self.server = None
324 self.target_process = None
325 self.binary = None
326 self.start = 0
327
328 def early_applicable(self):
329 """Return a false value if the test has determined it cannot run
330 without ever needing to talk to the target or server."""
331 # pylint: disable=no-self-use
332 return True
333
334 def setup(self):
335 pass
336
337 def compile(self):
338 compile_args = getattr(self, 'compile_args', None)
339 if compile_args:
340 if compile_args not in BaseTest.compiled:
341 try:
342 # pylint: disable=star-args
343 BaseTest.compiled[compile_args] = \
344 self.target.compile(*compile_args)
345 except Exception: # pylint: disable=broad-except
346 print "exception while compiling in %.2fs" % (
347 time.time() - self.start)
348 print "=" * 40
349 header("Traceback")
350 traceback.print_exc(file=sys.stdout)
351 print "/" * 40
352 return "exception"
353 self.binary = BaseTest.compiled.get(compile_args)
354
355 def classSetup(self):
356 self.compile()
357 self.target_process = self.target.target()
358 self.server = self.target.server()
359 self.logs.append(self.server.logname)
360
361 def classTeardown(self):
362 del self.server
363 del self.target_process
364
365 def run(self):
366 """
367 If compile_args is set, compile a program and set self.binary.
368
369 Call setup().
370
371 Then call test() and return the result, displaying relevant information
372 if an exception is raised.
373 """
374
375 print "Running", type(self).__name__, "...",
376 sys.stdout.flush()
377
378 if not self.early_applicable():
379 print "not_applicable"
380 return "not_applicable"
381
382 self.start = time.time()
383
384 self.classSetup()
385
386 try:
387 self.setup()
388 result = self.test() # pylint: disable=no-member
389 except TestNotApplicable:
390 result = "not_applicable"
391 except Exception as e: # pylint: disable=broad-except
392 if isinstance(e, TestFailed):
393 result = "fail"
394 else:
395 result = "exception"
396 print "%s in %.2fs" % (result, time.time() - self.start)
397 print "=" * 40
398 if isinstance(e, TestFailed):
399 header("Message")
400 print e.message
401 header("Traceback")
402 traceback.print_exc(file=sys.stdout)
403 for log in self.logs:
404 header(log)
405 print open(log, "r").read()
406 print "/" * 40
407 return result
408
409 finally:
410 self.classTeardown()
411
412 if not result:
413 result = 'pass'
414 print "%s in %.2fs" % (result, time.time() - self.start)
415 return result
416
417 class TestFailed(Exception):
418 def __init__(self, message):
419 Exception.__init__(self)
420 self.message = message
421
422 class TestNotApplicable(Exception):
423 def __init__(self, message):
424 Exception.__init__(self)
425 self.message = message
426
427 def assertEqual(a, b):
428 if a != b:
429 raise TestFailed("%r != %r" % (a, b))
430
431 def assertNotEqual(a, b):
432 if a == b:
433 raise TestFailed("%r == %r" % (a, b))
434
435 def assertIn(a, b):
436 if a not in b:
437 raise TestFailed("%r not in %r" % (a, b))
438
439 def assertNotIn(a, b):
440 if a in b:
441 raise TestFailed("%r in %r" % (a, b))
442
443 def assertGreater(a, b):
444 if not a > b:
445 raise TestFailed("%r not greater than %r" % (a, b))
446
447 def assertTrue(a):
448 if not a:
449 raise TestFailed("%r is not True" % a)
450
451 def assertRegexpMatches(text, regexp):
452 if not re.search(regexp, text):
453 raise TestFailed("can't find %r in %r" % (regexp, text))