0e504d1a7d0fc85a7a270c092936781488b25f9c
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import shlex
3 import subprocess
4 import time
5
6 import pexpect
7
8 # Note that gdb comes with its own testsuite. I was unable to figure out how to
9 # run that testsuite against the spike simulator.
10
11 def find_file(path):
12 for directory in (os.getcwd(), os.path.dirname(__file__)):
13 fullpath = os.path.join(directory, path)
14 if os.path.exists(fullpath):
15 return fullpath
16 return None
17
18 def compile(args, xlen=32): # pylint: disable=redefined-builtin
19 cc = os.path.expandvars("$RISCV/bin/riscv%d-unknown-elf-gcc" % xlen)
20 cmd = [cc, "-g"]
21 for arg in args:
22 found = find_file(arg)
23 if found:
24 cmd.append(found)
25 else:
26 cmd.append(arg)
27 cmd = " ".join(cmd)
28 result = os.system(cmd)
29 assert result == 0, "%r failed" % cmd
30
31 def unused_port():
32 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
33 import socket
34 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
35 s.bind(("", 0))
36 port = s.getsockname()[1]
37 s.close()
38 return port
39
40 class Spike(object):
41 logname = "spike.log"
42
43 def __init__(self, cmd, binary=None, halted=False, with_gdb=True,
44 timeout=None, xlen=64):
45 """Launch spike. Return tuple of its process and the port it's running
46 on."""
47 if cmd:
48 cmd = shlex.split(cmd)
49 else:
50 cmd = ["spike"]
51 if xlen == 32:
52 cmd += ["--isa", "RV32"]
53
54 if timeout:
55 cmd = ["timeout", str(timeout)] + cmd
56
57 if halted:
58 cmd.append('-H')
59 if with_gdb:
60 self.port = unused_port()
61 cmd += ['--gdb-port', str(self.port)]
62 cmd.append("-m32")
63 cmd.append('pk')
64 if binary:
65 cmd.append(binary)
66 logfile = open(self.logname, "w")
67 logfile.write("+ %s\n" % " ".join(cmd))
68 logfile.flush()
69 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
70 stdout=logfile, stderr=logfile)
71
72 def __del__(self):
73 try:
74 self.process.kill()
75 self.process.wait()
76 except OSError:
77 pass
78
79 def wait(self, *args, **kwargs):
80 return self.process.wait(*args, **kwargs)
81
82 class VcsSim(object):
83 def __init__(self, simv=None, debug=False):
84 if simv:
85 cmd = shlex.split(simv)
86 else:
87 cmd = ["simv"]
88 cmd += ["+jtag_vpi_enable"]
89 if debug:
90 cmd[0] = cmd[0] + "-debug"
91 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
92 logfile = open("simv.log", "w")
93 logfile.write("+ %s\n" % " ".join(cmd))
94 logfile.flush()
95 listenfile = open("simv.log", "r")
96 listenfile.seek(0, 2)
97 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
98 stdout=logfile, stderr=logfile)
99 done = False
100 while not done:
101 line = listenfile.readline()
102 if not line:
103 time.sleep(1)
104 if "Listening on port 5555" in line:
105 done = True
106
107 def __del__(self):
108 try:
109 self.process.kill()
110 self.process.wait()
111 except OSError:
112 pass
113
114 class Openocd(object):
115 logname = "openocd.log"
116
117 def __init__(self, cmd=None, config=None, debug=False, otherProcess=None):
118
119 # keep handles to other processes -- don't let them be
120 # garbage collected yet.
121
122 self.otherProcess = otherProcess
123 if cmd:
124 cmd = shlex.split(cmd)
125 else:
126 cmd = ["openocd"]
127 if config:
128 cmd += ["-f", find_file(config)]
129 if debug:
130 cmd.append("-d")
131 logfile = open(Openocd.logname, "w")
132 logfile.write("+ %s\n" % " ".join(cmd))
133 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
134 stdout=logfile, stderr=logfile)
135 # TODO: Pick a random port
136 self.port = 3333
137
138 def __del__(self):
139 try:
140 self.process.kill()
141 self.process.wait()
142 except OSError:
143 pass
144
145 class Gdb(object):
146 def __init__(self,
147 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
148 self.child = pexpect.spawn(cmd)
149 self.child.logfile = open("gdb.log", "w")
150 self.child.logfile.write("+ %s\n" % cmd)
151 self.wait()
152 self.command("set confirm off")
153 self.command("set width 0")
154 self.command("set height 0")
155 # Force consistency.
156 self.command("set print entry-values no")
157
158 def wait(self):
159 """Wait for prompt."""
160 self.child.expect(r"\(gdb\)")
161
162 def command(self, command, timeout=-1):
163 self.child.sendline(command)
164 self.child.expect("\n", timeout=timeout)
165 self.child.expect(r"\(gdb\)", timeout=timeout)
166 return self.child.before.strip()
167
168 def c(self, wait=True):
169 if wait:
170 output = self.command("c")
171 assert "Continuing" in output
172 return output
173 else:
174 self.child.sendline("c")
175 self.child.expect("Continuing")
176
177 def interrupt(self):
178 self.child.send("\003")
179 self.child.expect(r"\(gdb\)", timeout=60)
180 return self.child.before.strip()
181
182 def x(self, address, size='w'):
183 output = self.command("x/%s %s" % (size, address))
184 value = int(output.split(':')[1].strip(), 0)
185 return value
186
187 def p(self, obj):
188 output = self.command("p/x %s" % obj)
189 value = int(output.split('=')[-1].strip(), 0)
190 return value
191
192 def p_string(self, obj):
193 output = self.command("p %s" % obj)
194 value = shlex.split(output.split('=')[-1].strip())[1]
195 return value
196
197 def stepi(self):
198 output = self.command("stepi")
199 assert "Cannot" not in output
200 return output
201
202 def load(self):
203 output = self.command("load", timeout=60)
204 assert "failed" not in output
205 assert "Transfer rate" in output
206
207 def b(self, location):
208 output = self.command("b %s" % location)
209 assert "not defined" not in output
210 assert "Breakpoint" in output
211 return output
212
213 def hbreak(self, location):
214 output = self.command("hbreak %s" % location)
215 assert "not defined" not in output
216 assert "Hardware assisted breakpoint" in output
217 return output