soc/tools: initialize wishbone remote control (for now only uart)
[litex.git] / litex / soc / tools / remote / server.py
1 import socket
2 import threading
3 import argparse
4
5 from litex.soc.tools.remote.etherbone import EtherbonePacket, EtherboneRecord, EtherboneWrites
6 from litex.soc.tools.remote.etherbone import EtherboneIPC
7
8
9 class RemoteServer(EtherboneIPC):
10 def __init__(self, comm, port=1234, csr_data_width=32):
11 self.comm = comm
12 self.port = port
13 self.csr_data_width = 32
14
15 def open(self):
16 if hasattr(self, "socket"):
17 return
18 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
19 self.socket.bind(("localhost", self.port))
20 self.socket.listen(1)
21 self.comm.open(self.csr_data_width)
22
23 def close(self):
24 self.comm.close()
25 if not hasattr(self, "socket"):
26 return
27 self.socket.close()
28 del self.socket
29
30 def _serve_thread(self):
31 while True:
32 client_socket, addr = self.socket.accept()
33 print("Connected with " + addr[0] + ":" + str(addr[1]))
34 try:
35 while True:
36 packet = self.receive_packet(client_socket)
37 if packet == 0:
38 break
39 packet = EtherbonePacket(packet)
40 packet.decode()
41
42 record = packet.records.pop()
43
44 # writes:
45 if record.writes != None:
46 self.comm.write(record.writes.base_addr, record.writes.get_datas())
47
48 # reads
49 if record.reads != None:
50 reads = []
51 for addr in record.reads.get_addrs():
52 reads.append(self.comm.read(addr))
53
54 record = EtherboneRecord()
55 record.writes = EtherboneWrites(datas=reads)
56 record.wcount = len(record.writes)
57
58 packet = EtherbonePacket()
59 packet.records = [record]
60 packet.encode()
61 self.send_packet(client_socket, packet)
62 finally:
63 print("Disconnect")
64 client_socket.close()
65
66 def start(self):
67 self.serve_thread = threading.Thread(target=self._serve_thread)
68 self.serve_thread.setDaemon(True)
69 self.serve_thread.start()
70
71 def join(self, writer_only=False):
72 if not hasattr(self, "serve_thread"):
73 return
74 self.serve_thread.join()
75
76 def _get_args():
77 parser = argparse.ArgumentParser()
78 parser.add_argument("--comm", default="uart", help="comm interface")
79 parser.add_argument("--port", default="2", help="UART port")
80 parser.add_argument("--baudrate", default=115200, help="UART baudrate")
81 parser.add_argument("--csr_data_width", default=32, help="CSR data_width")
82 return parser.parse_args()
83
84 def main():
85 args = _get_args()
86 if args.comm == "uart":
87 from litex.soc.tools.remote import CommUART
88 port = args.port if not args.port.isdigit() else int(args.port)
89 comm = CommUART(args.port if not args.port.isdigit() else int(args.port),
90 args.baudrate,
91 debug=False)
92 else:
93 raise NotImplementedError
94
95 server = RemoteServer(comm, csr_data_width=args.csr_data_width)
96 server.open()
97 server.start()
98 try:
99 server.join(True)
100 except KeyboardInterrupt: # FIXME
101 pass
102
103 if __name__ == "__main__":
104 main()