29fa1703efdd81f339e726a649e700bac4d77bf1
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import time
6
7 import pexpect
8
9 # Note that gdb comes with its own testsuite. I was unable to figure out how to
10 # run that testsuite against the spike simulator.
11
12 def find_file(path):
13 for directory in (os.getcwd(), os.path.dirname(__file__)):
14 fullpath = os.path.join(directory, path)
15 if os.path.exists(fullpath):
16 return fullpath
17 return None
18
19 def compile(args, xlen=32): # pylint: disable=redefined-builtin
20 cc = os.path.expandvars("$RISCV/bin/riscv%d-unknown-elf-gcc" % xlen)
21 cmd = [cc, "-g"]
22 for arg in args:
23 found = find_file(arg)
24 if found:
25 cmd.append(found)
26 else:
27 cmd.append(arg)
28 cmd = " ".join(cmd)
29 result = os.system(cmd)
30 assert result == 0, "%r failed" % cmd
31
32 def unused_port():
33 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
34 import socket
35 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
36 s.bind(("", 0))
37 port = s.getsockname()[1]
38 s.close()
39 return port
40
41 class Spike(object):
42 logname = "spike.log"
43
44 def __init__(self, cmd, binary=None, halted=False, with_gdb=True,
45 timeout=None, xlen=64):
46 """Launch spike. Return tuple of its process and the port it's running
47 on."""
48 if cmd:
49 cmd = shlex.split(cmd)
50 else:
51 cmd = ["spike"]
52 if xlen == 32:
53 cmd += ["--isa", "RV32"]
54
55 if timeout:
56 cmd = ["timeout", str(timeout)] + cmd
57
58 if halted:
59 cmd.append('-H')
60 if with_gdb:
61 self.port = unused_port()
62 cmd += ['--gdb-port', str(self.port)]
63 cmd.append("-m32")
64 cmd.append('pk')
65 if binary:
66 cmd.append(binary)
67 logfile = open(self.logname, "w")
68 logfile.write("+ %s\n" % " ".join(cmd))
69 logfile.flush()
70 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
71 stdout=logfile, stderr=logfile)
72
73 def __del__(self):
74 try:
75 self.process.kill()
76 self.process.wait()
77 except OSError:
78 pass
79
80 def wait(self, *args, **kwargs):
81 return self.process.wait(*args, **kwargs)
82
83 class VcsSim(object):
84 def __init__(self, simv=None, debug=False):
85 if simv:
86 cmd = shlex.split(simv)
87 else:
88 cmd = ["simv"]
89 cmd += ["+jtag_vpi_enable"]
90 if debug:
91 cmd[0] = cmd[0] + "-debug"
92 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
93 logfile = open("simv.log", "w")
94 logfile.write("+ %s\n" % " ".join(cmd))
95 logfile.flush()
96 listenfile = open("simv.log", "r")
97 listenfile.seek(0, 2)
98 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
99 stdout=logfile, stderr=logfile)
100 done = False
101 while not done:
102 line = listenfile.readline()
103 if not line:
104 time.sleep(1)
105 if "Listening on port 5555" in line:
106 done = True
107
108 def __del__(self):
109 try:
110 self.process.kill()
111 self.process.wait()
112 except OSError:
113 pass
114
115 class Openocd(object):
116 logname = "openocd.log"
117
118 def __init__(self, cmd=None, config=None, debug=False, otherProcess=None):
119
120 # keep handles to other processes -- don't let them be
121 # garbage collected yet.
122
123 self.otherProcess = otherProcess
124 if cmd:
125 cmd = shlex.split(cmd)
126 else:
127 cmd = ["openocd"]
128 if config:
129 cmd += ["-f", find_file(config)]
130 if debug:
131 cmd.append("-d")
132
133 # Assign port
134 self.port = unused_port()
135 print "Using port %d for gdb server" % self.port
136 # This command needs to come before any config scripts on the command
137 # line, since they are executed in order.
138 cmd[1:1] = ["--command", "gdb_port %d" % self.port]
139
140 logfile = open(Openocd.logname, "w")
141 logfile.write("+ %s\n" % " ".join(cmd))
142 logfile.flush()
143 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
144 stdout=logfile, stderr=logfile)
145
146 # Wait for OpenOCD to have made it through riscv_examine(). When using
147 # OpenOCD to communicate with a simulator this may take a long time,
148 # and gdb will time out when trying to connect if we attempt too early.
149 start = time.time()
150 messaged = False
151 while True:
152 log = open(Openocd.logname).read()
153 if "Examined RISCV core" in log:
154 break
155 if not self.process.poll() is None:
156 raise Exception("OpenOCD exited before completing riscv_examine()")
157 if not messaged and time.time() - start > 1:
158 messaged = True
159 print "Waiting for OpenOCD to examine RISCV core..."
160
161 def __del__(self):
162 try:
163 self.process.kill()
164 self.process.wait()
165 except OSError:
166 pass
167
168 class CannotAccess(Exception):
169 def __init__(self, address):
170 Exception.__init__(self)
171 self.address = address
172
173 class Gdb(object):
174 def __init__(self,
175 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
176 self.child = pexpect.spawn(cmd)
177 self.child.logfile = open("gdb.log", "w")
178 self.child.logfile.write("+ %s\n" % cmd)
179 self.wait()
180 self.command("set confirm off")
181 self.command("set width 0")
182 self.command("set height 0")
183 # Force consistency.
184 self.command("set print entry-values no")
185
186 def wait(self):
187 """Wait for prompt."""
188 self.child.expect(r"\(gdb\)")
189
190 def command(self, command, timeout=-1):
191 self.child.sendline(command)
192 self.child.expect("\n", timeout=timeout)
193 self.child.expect(r"\(gdb\)", timeout=timeout)
194 return self.child.before.strip()
195
196 def c(self, wait=True):
197 if wait:
198 output = self.command("c")
199 assert "Continuing" in output
200 return output
201 else:
202 self.child.sendline("c")
203 self.child.expect("Continuing")
204
205 def interrupt(self):
206 self.child.send("\003")
207 self.child.expect(r"\(gdb\)", timeout=60)
208 return self.child.before.strip()
209
210 def x(self, address, size='w'):
211 output = self.command("x/%s %s" % (size, address))
212 value = int(output.split(':')[1].strip(), 0)
213 return value
214
215 def p(self, obj):
216 output = self.command("p/x %s" % obj)
217 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
218 if m:
219 raise CannotAccess(int(m.group(1), 0))
220 value = int(output.split('=')[-1].strip(), 0)
221 return value
222
223 def p_string(self, obj):
224 output = self.command("p %s" % obj)
225 value = shlex.split(output.split('=')[-1].strip())[1]
226 return value
227
228 def stepi(self):
229 output = self.command("stepi")
230 return output
231
232 def load(self):
233 output = self.command("load", timeout=60)
234 assert "failed" not in output
235 assert "Transfer rate" in output
236
237 def b(self, location):
238 output = self.command("b %s" % location)
239 assert "not defined" not in output
240 assert "Breakpoint" in output
241 return output
242
243 def hbreak(self, location):
244 output = self.command("hbreak %s" % location)
245 assert "not defined" not in output
246 assert "Hardware assisted breakpoint" in output
247 return output