308bf2157619ceb09f33363a1acbf2ecbe9feb7d
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import time
6
7 import pexpect
8
9 # Note that gdb comes with its own testsuite. I was unable to figure out how to
10 # run that testsuite against the spike simulator.
11
12 def find_file(path):
13 for directory in (os.getcwd(), os.path.dirname(__file__)):
14 fullpath = os.path.join(directory, path)
15 if os.path.exists(fullpath):
16 return fullpath
17 return None
18
19 def compile(args, xlen=32): # pylint: disable=redefined-builtin
20 cc = os.path.expandvars("$RISCV/bin/riscv%d-unknown-elf-gcc" % xlen)
21 cmd = [cc, "-g"]
22 for arg in args:
23 found = find_file(arg)
24 if found:
25 cmd.append(found)
26 else:
27 cmd.append(arg)
28 cmd = " ".join(cmd)
29 result = os.system(cmd)
30 assert result == 0, "%r failed" % cmd
31
32 def unused_port():
33 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
34 import socket
35 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
36 s.bind(("", 0))
37 port = s.getsockname()[1]
38 s.close()
39 return port
40
41 class Spike(object):
42 logname = "spike.log"
43
44 def __init__(self, cmd, binary=None, halted=False, with_gdb=True,
45 timeout=None, xlen=64):
46 """Launch spike. Return tuple of its process and the port it's running
47 on."""
48 if cmd:
49 cmd = shlex.split(cmd)
50 else:
51 cmd = ["spike"]
52 if xlen == 32:
53 cmd += ["--isa", "RV32"]
54
55 if timeout:
56 cmd = ["timeout", str(timeout)] + cmd
57
58 if halted:
59 cmd.append('-H')
60 if with_gdb:
61 self.port = unused_port()
62 cmd += ['--gdb-port', str(self.port)]
63 cmd.append("-m32")
64 cmd.append('pk')
65 if binary:
66 cmd.append(binary)
67 logfile = open(self.logname, "w")
68 logfile.write("+ %s\n" % " ".join(cmd))
69 logfile.flush()
70 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
71 stdout=logfile, stderr=logfile)
72
73 def __del__(self):
74 try:
75 self.process.kill()
76 self.process.wait()
77 except OSError:
78 pass
79
80 def wait(self, *args, **kwargs):
81 return self.process.wait(*args, **kwargs)
82
83 class VcsSim(object):
84 def __init__(self, simv=None, debug=False):
85 if simv:
86 cmd = shlex.split(simv)
87 else:
88 cmd = ["simv"]
89 cmd += ["+jtag_vpi_enable"]
90 if debug:
91 cmd[0] = cmd[0] + "-debug"
92 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
93 logfile = open("simv.log", "w")
94 logfile.write("+ %s\n" % " ".join(cmd))
95 logfile.flush()
96 listenfile = open("simv.log", "r")
97 listenfile.seek(0, 2)
98 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
99 stdout=logfile, stderr=logfile)
100 done = False
101 while not done:
102 line = listenfile.readline()
103 if not line:
104 time.sleep(1)
105 match = re.match(r"^Listening on port (\d+)$", line)
106 if match:
107 done = True
108 self.port = int(match.group(1))
109 print "Using port %d for JTAG VPI" % self.port
110
111 def __del__(self):
112 try:
113 self.process.kill()
114 self.process.wait()
115 except OSError:
116 pass
117
118 class Openocd(object):
119 logname = "openocd.log"
120
121 def __init__(self, cmd=None, config=None, debug=False, otherProcess=None):
122
123 # keep handles to other processes -- don't let them be
124 # garbage collected yet.
125
126 self.otherProcess = otherProcess
127 if cmd:
128 cmd = shlex.split(cmd)
129 else:
130 cmd = ["openocd"]
131 if config:
132 cmd += ["-f", find_file(config)]
133 if debug:
134 cmd.append("-d")
135
136 # Assign port
137 self.port = unused_port()
138 print "Using port %d for gdb server" % self.port
139 # This command needs to come before any config scripts on the command
140 # line, since they are executed in order.
141 cmd[1:1] = ["--command", "gdb_port %d" % self.port]
142
143 env = os.environ.copy()
144 env['JTAG_VPI_PORT'] = str(otherProcess.port)
145
146 logfile = open(Openocd.logname, "w")
147 logfile.write("+ %s\n" % " ".join(cmd))
148 logfile.flush()
149 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
150 stdout=logfile, stderr=logfile, env=env)
151
152 # Wait for OpenOCD to have made it through riscv_examine(). When using
153 # OpenOCD to communicate with a simulator this may take a long time,
154 # and gdb will time out when trying to connect if we attempt too early.
155 start = time.time()
156 messaged = False
157 while True:
158 log = open(Openocd.logname).read()
159 if "Examined RISCV core" in log:
160 break
161 if not self.process.poll() is None:
162 raise Exception("OpenOCD exited before completing riscv_examine()")
163 if not messaged and time.time() - start > 1:
164 messaged = True
165 print "Waiting for OpenOCD to examine RISCV core..."
166
167 def __del__(self):
168 try:
169 self.process.kill()
170 self.process.wait()
171 except OSError:
172 pass
173
174 class CannotAccess(Exception):
175 def __init__(self, address):
176 Exception.__init__(self)
177 self.address = address
178
179 class Gdb(object):
180 def __init__(self,
181 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
182 self.child = pexpect.spawn(cmd)
183 self.child.logfile = open("gdb.log", "w")
184 self.child.logfile.write("+ %s\n" % cmd)
185 self.wait()
186 self.command("set confirm off")
187 self.command("set width 0")
188 self.command("set height 0")
189 # Force consistency.
190 self.command("set print entry-values no")
191
192 def wait(self):
193 """Wait for prompt."""
194 self.child.expect(r"\(gdb\)")
195
196 def command(self, command, timeout=-1):
197 self.child.sendline(command)
198 self.child.expect("\n", timeout=timeout)
199 self.child.expect(r"\(gdb\)", timeout=timeout)
200 return self.child.before.strip()
201
202 def c(self, wait=True):
203 if wait:
204 output = self.command("c")
205 assert "Continuing" in output
206 return output
207 else:
208 self.child.sendline("c")
209 self.child.expect("Continuing")
210
211 def interrupt(self):
212 self.child.send("\003")
213 self.child.expect(r"\(gdb\)", timeout=60)
214 return self.child.before.strip()
215
216 def x(self, address, size='w'):
217 output = self.command("x/%s %s" % (size, address))
218 value = int(output.split(':')[1].strip(), 0)
219 return value
220
221 def p(self, obj):
222 output = self.command("p/x %s" % obj)
223 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
224 if m:
225 raise CannotAccess(int(m.group(1), 0))
226 value = int(output.split('=')[-1].strip(), 0)
227 return value
228
229 def p_string(self, obj):
230 output = self.command("p %s" % obj)
231 value = shlex.split(output.split('=')[-1].strip())[1]
232 return value
233
234 def stepi(self):
235 output = self.command("stepi")
236 return output
237
238 def load(self):
239 output = self.command("load", timeout=60)
240 assert "failed" not in output
241 assert "Transfer rate" in output
242
243 def b(self, location):
244 output = self.command("b %s" % location)
245 assert "not defined" not in output
246 assert "Breakpoint" in output
247 return output
248
249 def hbreak(self, location):
250 output = self.command("hbreak %s" % location)
251 assert "not defined" not in output
252 assert "Hardware assisted breakpoint" in output
253 return output