soc/tools/remoter/server: fix exit on KeyboardInterrupt
[litex.git] / litex / soc / tools / remote / server.py
1 import socket
2 import threading
3 import argparse
4
5 from litex.soc.tools.remote.etherbone import EtherbonePacket, EtherboneRecord, EtherboneWrites
6 from litex.soc.tools.remote.etherbone import EtherboneIPC
7
8
9 class RemoteServer(EtherboneIPC):
10 def __init__(self, comm, port=1234):
11 self.comm = comm
12 self.port = port
13
14 def open(self):
15 if hasattr(self, "socket"):
16 return
17 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
18 self.socket.bind(("localhost", self.port))
19 self.socket.listen(1)
20 self.comm.open()
21
22 def close(self):
23 self.comm.close()
24 if not hasattr(self, "socket"):
25 return
26 self.socket.close()
27 del self.socket
28
29 def _serve_thread(self):
30 while True:
31 client_socket, addr = self.socket.accept()
32 print("Connected with " + addr[0] + ":" + str(addr[1]))
33 try:
34 while True:
35 packet = self.receive_packet(client_socket)
36 if packet == 0:
37 break
38 packet = EtherbonePacket(packet)
39 packet.decode()
40
41 record = packet.records.pop()
42
43 # writes:
44 if record.writes != None:
45 self.comm.write(record.writes.base_addr, record.writes.get_datas())
46
47 # reads
48 if record.reads != None:
49 reads = []
50 for addr in record.reads.get_addrs():
51 reads.append(self.comm.read(addr))
52
53 record = EtherboneRecord()
54 record.writes = EtherboneWrites(datas=reads)
55 record.wcount = len(record.writes)
56
57 packet = EtherbonePacket()
58 packet.records = [record]
59 packet.encode()
60 self.send_packet(client_socket, packet)
61 finally:
62 print("Disconnect")
63 client_socket.close()
64
65 def start(self):
66 self.serve_thread = threading.Thread(target=self._serve_thread)
67 self.serve_thread.setDaemon(True)
68 self.serve_thread.start()
69
70
71 def _get_args():
72 parser = argparse.ArgumentParser()
73 parser.add_argument("--comm", default="uart", help="comm interface")
74 parser.add_argument("--port", default="2", help="UART port")
75 parser.add_argument("--baudrate", default=115200, help="UART baudrate")
76 return parser.parse_args()
77
78 def main():
79 print("LiteX remote server")
80 args = _get_args()
81 if args.comm == "uart":
82 from litex.soc.tools.remote import CommUART
83 print("Using CommUART, port: {} / baudrate: {}".format(args.port, args.baudrate))
84 comm = CommUART(args.port if not args.port.isdigit() else int(args.port),
85 args.baudrate,
86 debug=False)
87 else:
88 raise NotImplementedError
89
90 server = RemoteServer(comm)
91 server.open()
92 server.start()
93 try:
94 import time
95 while True: time.sleep(100)
96 except KeyboardInterrupt:
97 pass
98
99 if __name__ == "__main__":
100 main()