split out jtag util functions to separate module
authorLuke Kenneth Casson Leighton <lkcl@lkcl.net>
Thu, 8 Oct 2020 12:21:36 +0000 (13:21 +0100)
committerLuke Kenneth Casson Leighton <lkcl@lkcl.net>
Thu, 8 Oct 2020 12:21:40 +0000 (13:21 +0100)
src/soc/debug/jtagutils.py [new file with mode: 0644]
src/soc/debug/test/jtagremote.py
src/soc/debug/test/test_jtag_tap_srv.py

diff --git a/src/soc/debug/jtagutils.py b/src/soc/debug/jtagutils.py
new file mode 100644 (file)
index 0000000..1dee686
--- /dev/null
@@ -0,0 +1,215 @@
+#The server code
+import socket
+from socket import close, AF_INET, SOCK_STREAM
+import sys
+import select
+import time
+
+
+def client_sync(dut):
+    tck = yield dut.cbus.tck
+    tms = yield dut.cbus.tms
+    tdi = yield dut.cbus.tdi
+    dut.c.jtagremote_client_send((tck, tms, tdi))
+    #print ("about to client recv")
+    while True:
+        tdo = dut.c.jtagremote_client_recv(timeout=0)
+        if tdo is not None:
+            break
+        yield
+    yield dut.cbus.tdo.eq(tdo)
+
+
+def tms_state_set(dut, bits):
+    for bit in bits:
+        yield dut.cbus.tck.eq(1)
+        yield dut.cbus.tms.eq(bit)
+        yield from client_sync(dut)
+        yield
+        yield dut.cbus.tck.eq(0)
+        yield from client_sync(dut)
+        yield
+        yield from client_sync(dut)
+    yield dut.cbus.tms.eq(0)
+    yield from client_sync(dut)
+
+
+def tms_data_getset(dut, tms, d_len, d_in=0):
+    res = 0
+    yield dut.cbus.tms.eq(tms)
+    for i in range(d_len):
+        tdi = 1 if (d_in & (1<<i)) else 0
+        yield dut.cbus.tck.eq(1)
+        yield from client_sync(dut)
+        res |= (1<<i) if (yield dut.bus.tdo) else 0
+        yield
+        yield from client_sync(dut)
+        yield dut.cbus.tdi.eq(tdi)
+        yield dut.cbus.tck.eq(0)
+        yield from client_sync(dut)
+        yield
+        yield from client_sync(dut)
+    yield dut.cbus.tms.eq(0)
+    yield from client_sync(dut)
+
+    return res
+
+
+def jtag_set_reset(dut):
+    yield from tms_state_set(dut, [1, 1, 1, 1, 1])
+
+def jtag_set_shift_dr(dut):
+    yield from tms_state_set(dut, [1, 0, 0])
+
+def jtag_set_shift_ir(dut):
+    yield from tms_state_set(dut, [1, 1, 0])
+
+def jtag_set_run(dut):
+    yield from tms_state_set(dut, [0])
+
+def jtag_set_idle(dut):
+    yield from tms_state_set(dut, [1, 1, 0])
+
+
+def jtag_read_write_reg(dut, addr, d_len, d_in=0):
+    yield from jtag_set_run(dut)
+    yield from jtag_set_shift_ir(dut)
+    yield from tms_data_getset(dut, 0, dut._ir_width, addr)
+    yield from jtag_set_idle(dut)
+
+    yield from jtag_set_shift_dr(dut)
+    result = yield from tms_data_getset(dut, 0, d_len, d_in)
+    yield from jtag_set_idle(dut)
+    return result
+
+
+def jtag_srv(dut):
+    while not dut.stop:
+        # loop and receive data from client
+        tdo = yield dut.bus.tdo
+        #print ("server tdo data", tdo)
+        data = dut.s.jtagremote_server_recv(tdo)
+        #print ("server recv data", data)
+        if not data:
+            yield
+            continue
+        tck, tms, tdi = data
+        yield dut.bus.tck.eq(tck)
+        yield dut.bus.tms.eq(tms)
+        yield dut.bus.tdi.eq(tdi)
+        yield
+
+
+
+def get_data(s, length=1024, timeout=None):
+    r, w, e = select.select( [s], [], [], timeout)
+
+    for sock in r:
+        #incoming message from remote server
+        if sock == s:
+            return sock.recv(length)
+    return None
+
+class JTAGServer:
+    def __init__(self, debug=False):
+        self.debug = debug
+        HOST = ''
+        PORT = 44853
+        s = socket.socket(AF_INET, SOCK_STREAM)
+        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
+        s.bind((HOST, PORT))
+        s.listen(1) #only needs to receive one connection (the client)
+        self.s = s
+        self.conn = None
+
+    def close(self):
+        self.s.close()
+        if self.conn:
+            self.conn.close()
+
+    def get_connection(self, timeout=0):
+        r, w, e = select.select( [self.s], [], [], timeout)
+        for sock in r:
+            #incoming message from remote server
+            if sock == self.s:
+                conn, addr = self.s.accept() #accepts the connection
+                if self.debug:
+                    print("Connected by: ", addr) #prints the connection
+                conn.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
+                self.conn = conn
+                return conn
+        return None
+
+    def get_data(self, length=1024, timeout=None):
+        return get_data(self.conn, length, timeout)
+
+    def send(self, data):
+        return self.conn.sendall(data)
+
+    def jtagremote_server_recv(self, tdo):
+        data = self.get_data(1, 0) # read 1 byte, non-blocking
+        if data is None:
+            return None # no data read
+        data = bytes.decode(data)
+        if self.debug:
+            print ("jtagremote_server_recv", data)
+        # request to read TDO
+        if data == 'R':
+            self.send(str.encode(chr(ord('0') + tdo)))
+            return [] # no data
+        # decode tck, tms, tdi
+        data = ord(data) - ord('0')
+        # encode tck, tms and tdi as number from 0-7
+        tdi = 1 if (data & 1) else 0
+        tms = 1 if (data & 2) else 0
+        tck = 1 if (data & 4) else 0
+
+        return (tck, tms, tdi)
+
+
+
+class JTAGClient:
+    def __init__(self, debug=False):
+        self.debug = debug
+        HOST = 'localhost'
+        PORT = 44853
+        s = socket.socket(AF_INET, SOCK_STREAM)
+        s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
+        s.connect((HOST, PORT))
+        self.s = s
+
+    def close(self):
+        self.s.close()
+
+    def get_data(self, length=1024, timeout=None):
+        return get_data(self.s, length, timeout)
+
+    def send(self, data):
+        return self.s.sendall(data)
+
+    def jtagremote_client_send(self, to_send):
+        # encode tck, tms and tdi as number from 0-7
+        tck, tms, tdi = to_send
+        data = 0
+        if tdi: data |= 1
+        if tms: data |= 2
+        if tck: data |= 4
+        data = chr(ord('0') + data)
+        self.send(str.encode(data))
+        if self.debug:
+            print ("jtagremote_client_send", data)
+        # now read tdo
+        self.send(str.encode('R'))
+
+
+    def jtagremote_client_recv(self, timeout=None):
+        data = self.get_data(1, timeout) # read 1 byte, blocking
+        if data is None:
+            return None
+        if self.debug:
+            print ("client recv", data)
+        data = bytes.decode(data)
+        return ord(data) - ord('0') # subtract ASCII for "0" to give 0 or 1
+
+
index cd23aae860ec2fd16be85d24d995189d368c9cb5..6f4040135373af894d4d1d57c6915e27c25c4050 100644 (file)
@@ -5,117 +5,7 @@ import sys
 import select
 import time
 
-
-def get_data(s, length=1024, timeout=None):
-    r, w, e = select.select( [s], [], [], timeout)
-
-    for sock in r:
-        #incoming message from remote server
-        if sock == s:
-            return sock.recv(length)
-    return None
-
-class JTAGServer:
-    def __init__(self, debug=False):
-        self.debug = debug
-        HOST = ''
-        PORT = 44853
-        s = socket.socket(AF_INET, SOCK_STREAM)
-        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
-        s.bind((HOST, PORT))
-        s.listen(1) #only needs to receive one connection (the client)
-        self.s = s
-        self.conn = None
-
-    def close(self):
-        self.s.close()
-        if self.conn:
-            self.conn.close()
-
-    def get_connection(self, timeout=0):
-        r, w, e = select.select( [self.s], [], [], timeout)
-        for sock in r:
-            #incoming message from remote server
-            if sock == self.s:
-                conn, addr = self.s.accept() #accepts the connection
-                if self.debug:
-                    print("Connected by: ", addr) #prints the connection
-                conn.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
-                self.conn = conn
-                return conn
-        return None
-
-    def get_data(self, length=1024, timeout=None):
-        return get_data(self.conn, length, timeout)
-
-    def send(self, data):
-        return self.conn.sendall(data)
-
-    def jtagremote_server_recv(self, tdo):
-        data = self.get_data(1, 0) # read 1 byte, non-blocking
-        if data is None:
-            return None # no data read
-        data = bytes.decode(data)
-        if self.debug:
-            print ("jtagremote_server_recv", data)
-        # request to read TDO
-        if data == 'R':
-            self.send(str.encode(chr(ord('0') + tdo)))
-            return [] # no data
-        # decode tck, tms, tdi
-        data = ord(data) - ord('0')
-        # encode tck, tms and tdi as number from 0-7
-        tdi = 1 if (data & 1) else 0
-        tms = 1 if (data & 2) else 0
-        tck = 1 if (data & 4) else 0
-
-        return (tck, tms, tdi)
-
-
-
-class JTAGClient:
-    def __init__(self, debug=False):
-        self.debug = debug
-        HOST = 'localhost'
-        PORT = 44853
-        s = socket.socket(AF_INET, SOCK_STREAM)
-        s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
-        s.connect((HOST, PORT))
-        self.s = s
-
-    def close(self):
-        self.s.close()
-
-    def get_data(self, length=1024, timeout=None):
-        return get_data(self.s, length, timeout)
-
-    def send(self, data):
-        return self.s.sendall(data)
-
-    def jtagremote_client_send(self, to_send):
-        # encode tck, tms and tdi as number from 0-7
-        tck, tms, tdi = to_send
-        data = 0
-        if tdi: data |= 1
-        if tms: data |= 2
-        if tck: data |= 4
-        data = chr(ord('0') + data)
-        self.send(str.encode(data))
-        if self.debug:
-            print ("jtagremote_client_send", data)
-        # now read tdo
-        self.send(str.encode('R'))
-
-
-    def jtagremote_client_recv(self, timeout=None):
-        data = self.get_data(1, timeout) # read 1 byte, blocking
-        if data is None:
-            return None
-        if self.debug:
-            print ("client recv", data)
-        data = bytes.decode(data)
-        return ord(data) - ord('0') # subtract ASCII for "0" to give 0 or 1
+from soc.debug.jtagutils import JTAGServer, JTAGClient
 
 
 def test_clientserver_jtagremote():
index 74df1c51762546381b231ff386e03ca4c7891c62..45b4ffb9fb64b10b7092a57d72011a2792d3828f 100644 (file)
@@ -17,83 +17,8 @@ from nmigen import Memory, Signal, Module
 
 from nmigen.back.pysim import Simulator, Delay, Settle, Tick
 from nmutil.util import wrap
-
-def client_sync(dut):
-    tck = yield dut.cbus.tck
-    tms = yield dut.cbus.tms
-    tdi = yield dut.cbus.tdi
-    dut.c.jtagremote_client_send((tck, tms, tdi))
-    #print ("about to client recv")
-    while True:
-        tdo = dut.c.jtagremote_client_recv(timeout=0)
-        if tdo is not None:
-            break
-        yield
-    yield dut.cbus.tdo.eq(tdo)
-
-
-def tms_state_set(dut, bits):
-    for bit in bits:
-        yield dut.cbus.tck.eq(1)
-        yield dut.cbus.tms.eq(bit)
-        yield from client_sync(dut)
-        yield
-        yield dut.cbus.tck.eq(0)
-        yield from client_sync(dut)
-        yield
-        yield from client_sync(dut)
-    yield dut.cbus.tms.eq(0)
-    yield from client_sync(dut)
-
-
-def tms_data_getset(dut, tms, d_len, d_in=0):
-    res = 0
-    yield dut.cbus.tms.eq(tms)
-    for i in range(d_len):
-        tdi = 1 if (d_in & (1<<i)) else 0
-        yield dut.cbus.tck.eq(1)
-        yield from client_sync(dut)
-        res |= (1<<i) if (yield dut.bus.tdo) else 0
-        yield
-        yield from client_sync(dut)
-        yield dut.cbus.tdi.eq(tdi)
-        yield dut.cbus.tck.eq(0)
-        yield from client_sync(dut)
-        yield
-        yield from client_sync(dut)
-    yield dut.cbus.tms.eq(0)
-    yield from client_sync(dut)
-
-    return res
-
-
-def jtag_set_reset(dut):
-    yield from tms_state_set(dut, [1, 1, 1, 1, 1])
-
-def jtag_set_shift_dr(dut):
-    yield from tms_state_set(dut, [1, 0, 0])
-
-def jtag_set_shift_ir(dut):
-    yield from tms_state_set(dut, [1, 1, 0])
-
-def jtag_set_run(dut):
-    yield from tms_state_set(dut, [0])
-
-def jtag_set_idle(dut):
-    yield from tms_state_set(dut, [1, 1, 0])
-
-
-def jtag_read_write_reg(dut, addr, d_len, d_in=0):
-    yield from jtag_set_run(dut)
-    yield from jtag_set_shift_ir(dut)
-    yield from tms_data_getset(dut, 0, dut._ir_width, addr)
-    yield from jtag_set_idle(dut)
-
-    yield from jtag_set_shift_dr(dut)
-    result = yield from tms_data_getset(dut, 0, d_len, d_in)
-    yield from jtag_set_idle(dut)
-    return result
-
+from soc.debug.jtagutils import (jtag_read_write_reg,
+                                 jtag_srv, jtag_set_reset)
 
 # JTAG-ircodes for accessing DMI
 DMI_ADDR = 8
@@ -106,23 +31,6 @@ WB_READ = 6
 WB_WRRD = 7
 
 
-def jtag_srv(dut):
-    while not dut.stop:
-        # loop and receive data from client
-        tdo = yield dut.bus.tdo
-        #print ("server tdo data", tdo)
-        data = dut.s.jtagremote_server_recv(tdo)
-        #print ("server recv data", data)
-        if not data:
-            yield
-            continue
-        tck, tms, tdi = data
-        yield dut.bus.tck.eq(tck)
-        yield dut.bus.tms.eq(tms)
-        yield dut.bus.tdi.eq(tdi)
-        yield
-
-
 def jtag_sim(dut):
 
     ####### JTAGy stuff (IDCODE) ######
@@ -194,7 +102,7 @@ if __name__ == '__main__':
 
     # set up client-server on port 44843-something
     dut.s = JTAGServer()
-    if len(sys.argv) != 2 and sys.argv[1] != 'server':
+    if len(sys.argv) != 2 or sys.argv[1] != 'server':
         dut.c = JTAGClient()
         dut.s.get_connection()
     else:
@@ -226,7 +134,7 @@ if __name__ == '__main__':
     sim.add_clock(1e-6, domain="sync")      # standard clock
 
     sim.add_sync_process(wrap(jtag_srv(dut))) # jtag server
-    if len(sys.argv) != 2 and sys.argv[1] != 'server':
+    if len(sys.argv) != 2 or sys.argv[1] != 'server':
         sim.add_sync_process(wrap(jtag_sim(dut))) # actual jtag tester
     else:
         print ("running server only as requested, use openocd remote to test")