2 * Copyright (c) 2015 ARM Limited
5 * The license below extends only to copyright in the software and shall
6 * not be construed as granting a license to any other intellectual
7 * property including but not limited to intellectual property relating
8 * to a hardware implementation of the functionality of the software
9 * licensed hereunder. You may use the software subject to the license
10 * terms below provided that you ensure that this notice is replicated
11 * unmodified and in its entirety in all distributions of the software,
12 * modified or unmodified, in source code or in binary form.
14 * Copyright (c) 2008 The Regents of The University of Michigan
15 * All rights reserved.
17 * Redistribution and use in source and binary forms, with or without
18 * modification, are permitted provided that the following conditions are
19 * met: redistributions of source code must retain the above copyright
20 * notice, this list of conditions and the following disclaimer;
21 * redistributions in binary form must reproduce the above copyright
22 * notice, this list of conditions and the following disclaimer in the
23 * documentation and/or other materials provided with the distribution;
24 * neither the name of the copyright holders nor the names of its
25 * contributors may be used to endorse or promote products derived from
26 * this software without specific prior written permission.
28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40 * Authors: Gabor Dozsa
45 * Message server implementation using TCP stream sockets for parallel gem5
48 #include <arpa/inet.h>
49 #include <sys/socket.h>
50 #include <sys/types.h>
56 #include "tcp_server.hh"
60 // Some basic macros for information and error reporting.
61 #define PRINTF(...) fprintf(stderr, __VA_ARGS__)
64 static bool debugSetup
= true;
65 static bool debugPeriodic
= false;
66 static bool debugSync
= true;
67 static bool debugPkt
= false;
68 #define DPRINTF(v,...) if (v) PRINTF(__VA_ARGS__)
70 #define DPRINTF(v,...)
73 #define inform(...) do { PRINTF("info: "); \
74 PRINTF(__VA_ARGS__); } while(0)
76 #define panic(...) do { PRINTF("panic: "); \
77 PRINTF(__VA_ARGS__); \
78 PRINTF("\n[%s:%s], line %d\n", \
79 __FUNCTION__, __FILE__, __LINE__); \
82 TCPServer
*TCPServer::instance
= nullptr;
84 TCPServer::Channel::Channel() : fd(-1), isAlive(false), state(SyncState::idle
)
86 MultiHeaderPkt::clearAddress(address
);
90 TCPServer::Channel::recvRaw(void *buf
, unsigned size
) const
93 // This is a blocking receive.
94 n
= recv(fd
, buf
, size
, MSG_WAITALL
);
97 panic("read() failed:%s", strerror(errno
));
98 else if (n
> 0 && n
< size
)
99 // the recv() call should wait for the full message
100 panic("read() failed");
106 TCPServer::Channel::sendRaw(const void *buf
, unsigned size
) const
109 n
= send(fd
, buf
, size
, MSG_NOSIGNAL
);
111 panic("write() failed:%s", strerror(errno
));
113 panic("write() failed");
116 void TCPServer::Channel::updateAddress(const AddressType
&new_address
)
118 // check if the known address has changed (e.g. the client reconfigured
120 if (MultiHeaderPkt::isAddressEqual(address
, new_address
))
123 // So we have to update the address. Note that we always
124 // store the same address as key in the map but the ordering
125 // may change so we need to erase and re-insert it again.
126 auto info
= TCPServer::instance
->addressMap
.find(&address
);
127 if (info
!= TCPServer::instance
->addressMap
.end()) {
128 TCPServer::instance
->addressMap
.erase(info
);
131 MultiHeaderPkt::copyAddress(address
, new_address
);
132 TCPServer::instance
->addressMap
[&address
] = this;
136 TCPServer::Channel::headerPktIn()
141 n
= recvRaw(&hdr_pkt
, sizeof(hdr_pkt
));
144 // EOF - nothing to do here, we will handle this as a POLLRDHUP event
149 if (hdr_pkt
.msgType
== MsgType::dataDescriptor
) {
150 updateAddress(hdr_pkt
.srcAddress
);
151 TCPServer::instance
->xferData(hdr_pkt
, *this);
153 processCmd(hdr_pkt
.msgType
, hdr_pkt
.sendTick
);
157 void TCPServer::Channel::processCmd(MsgType cmd
, Tick send_tick
)
160 case MsgType::cmdAtomicSyncReq
:
161 DPRINTF(debugSync
,"Atomic sync request (rank:%d)\n",rank
);
162 assert(state
== SyncState::idle
);
163 state
= SyncState::atomic
;
164 TCPServer::instance
->syncTryComplete(SyncState::atomic
,
165 MsgType::cmdAtomicSyncAck
);
167 case MsgType::cmdPeriodicSyncReq
:
168 DPRINTF(debugPeriodic
,"PERIODIC sync request (at %ld)\n",send_tick
);
170 if (TCPServer::instance
->periodicSyncTick() == 0) {
171 TCPServer::instance
->periodicSyncTick(send_tick
);
172 } else if ( TCPServer::instance
->periodicSyncTick() != send_tick
) {
173 panic("Out of order periodic sync request - rank:%d "
174 "(send_tick:%ld ongoing:%ld)", rank
, send_tick
,
175 TCPServer::instance
->periodicSyncTick());
178 case SyncState::idle
:
179 state
= SyncState::periodic
;
180 TCPServer::instance
->syncTryComplete(SyncState::periodic
,
181 MsgType::cmdPeriodicSyncAck
);
183 case SyncState::asyncCkpt
:
184 // An async ckpt request has already been sent to this client and
185 // that will interrupt this periodic sync. We can simply drop this
189 panic("Unexpected state for periodic sync request (rank:%d)",
194 case MsgType::cmdCkptSyncReq
:
195 DPRINTF(debugSync
, "CKPT sync request (rank:%d)\n",rank
);
197 case SyncState::idle
:
198 TCPServer::instance
->ckptPropagate(*this);
199 // we fall through here to complete #clients==1 case
200 case SyncState::asyncCkpt
:
201 state
= SyncState::ckpt
;
202 TCPServer::instance
->syncTryComplete(SyncState::ckpt
,
203 MsgType::cmdCkptSyncAck
);
206 panic("Unexpected state for ckpt sync request (rank:%d)", rank
);
211 panic("Unexpected header packet (rank:%d)",rank
);
216 TCPServer::TCPServer(unsigned clients_num
,
217 unsigned listen_port
,
220 assert(instance
== nullptr);
221 construct(clients_num
, listen_port
, timeout_in_sec
);
225 TCPServer::~TCPServer()
227 for (auto &c
: clientsPollFd
)
232 TCPServer::construct(unsigned clients_num
, unsigned port
, int timeout_in_sec
)
234 int listen_sock
, new_sock
, ret
;
236 struct sockaddr_in server_addr
, client_addr
;
237 struct pollfd new_pollfd
;
240 DPRINTF(debugSetup
, "Start listening on port %u ...\n", port
);
242 listen_sock
= socket(AF_INET
, SOCK_STREAM
, 0);
244 panic("socket() failed:%s", strerror(errno
));
246 bzero(&server_addr
, sizeof(server_addr
));
247 server_addr
.sin_family
= AF_INET
;
248 server_addr
.sin_addr
.s_addr
= INADDR_ANY
;
249 server_addr
.sin_port
= htons(port
);
250 if (bind(listen_sock
, (struct sockaddr
*) &server_addr
,
251 sizeof(server_addr
)) < 0)
252 panic("bind() failed:%s", strerror(errno
));
253 listen(listen_sock
, 10);
255 clientsPollFd
.reserve(clients_num
);
256 clientsChannel
.reserve(clients_num
);
258 new_pollfd
.events
= POLLIN
| POLLRDHUP
;
259 new_pollfd
.revents
= 0;
260 while (clientsPollFd
.size() < clients_num
) {
261 new_pollfd
.fd
= listen_sock
;
262 ret
= poll(&new_pollfd
, 1, timeout_in_sec
*1000);
264 panic("Timeout while waiting for clients to connect");
265 assert(ret
== 1 && new_pollfd
.revents
== POLLIN
);
266 client_len
= sizeof(client_addr
);
267 new_sock
= accept(listen_sock
,
268 (struct sockaddr
*) &client_addr
,
271 panic("accept() failed:%s", strerror(errno
));
272 new_pollfd
.fd
= new_sock
;
273 new_pollfd
.revents
= 0;
274 clientsPollFd
.push_back(new_pollfd
);
275 new_channel
.fd
= new_sock
;
276 new_channel
.isAlive
= true;
277 new_channel
.recvRaw(&new_channel
.rank
, sizeof(new_channel
.rank
));
278 clientsChannel
.push_back(new_channel
);
280 DPRINTF(debugSetup
, "New client connection addr:%u port:%hu rank:%d\n",
281 client_addr
.sin_addr
.s_addr
, client_addr
.sin_port
,
284 ret
= close(listen_sock
);
287 DPRINTF(debugSetup
, "Setup complete\n");
294 unsigned num_active_clients
= clientsPollFd
.size();
296 DPRINTF(debugSetup
, "Entering run() loop\n");
297 while (num_active_clients
== clientsPollFd
.size()) {
298 nfd
= poll(&clientsPollFd
[0], clientsPollFd
.size(), -1);
300 panic("poll() failed:%s", strerror(errno
));
302 for (unsigned i
= 0, n
= 0;
303 i
< clientsPollFd
.size() && (signed)n
< nfd
;
305 struct pollfd
&pfd
= clientsPollFd
[i
];
307 if (pfd
.revents
& POLLERR
)
308 panic("poll() returned POLLERR");
309 if (pfd
.revents
& POLLIN
) {
310 clientsChannel
[i
].headerPktIn();
312 if (pfd
.revents
& POLLRDHUP
) {
313 // One gem5 process exited or aborted. Either way, we
314 // assume the full simulation should stop now (either
315 // because m5 exit was called or a serious error
316 // occurred.) So we quit the run loop here and close all
317 // sockets to notify the remaining peer gem5 processes.
319 clientsChannel
[i
].isAlive
= false;
320 num_active_clients
--;
321 DPRINTF(debugSetup
, "POLLRDHUP event");
324 if ((signed)n
== nfd
)
329 DPRINTF(debugSetup
, "Exiting run() loop\n");
333 TCPServer::xferData(const Header
&hdr_pkt
, const Channel
&src
)
336 assert(hdr_pkt
.dataPacketLength
<= sizeof(packetBuffer
));
337 n
= src
.recvRaw(packetBuffer
, hdr_pkt
.dataPacketLength
);
340 panic("recvRaw() failed");
341 DPRINTF(debugPkt
, "Incoming data packet (from rank %d) "
342 "src:0x%02x%02x%02x%02x%02x%02x "
343 "dst:0x%02x%02x%02x%02x%02x%02x\n",
345 hdr_pkt
.srcAddress
[0],
346 hdr_pkt
.srcAddress
[1],
347 hdr_pkt
.srcAddress
[2],
348 hdr_pkt
.srcAddress
[3],
349 hdr_pkt
.srcAddress
[4],
350 hdr_pkt
.srcAddress
[5],
351 hdr_pkt
.dstAddress
[0],
352 hdr_pkt
.dstAddress
[1],
353 hdr_pkt
.dstAddress
[2],
354 hdr_pkt
.dstAddress
[3],
355 hdr_pkt
.dstAddress
[4],
356 hdr_pkt
.dstAddress
[5]);
357 // Now try to figure out the destination client(s).
358 auto dst_info
= addressMap
.find(&hdr_pkt
.dstAddress
);
360 // First handle the multicast/broadcast or unknonw destination case. These
361 // all trigger a broadcast of the packet to all clients.
362 if (MultiHeaderPkt::isUnicastAddress(hdr_pkt
.dstAddress
) == false ||
363 dst_info
== addressMap
.end()) {
365 for (auto const &c
: clientsChannel
) {
366 if (c
.isAlive
&& &c
!=&src
) {
367 c
.sendRaw(&hdr_pkt
, sizeof(hdr_pkt
));
368 c
.sendRaw(packetBuffer
, hdr_pkt
.dataPacketLength
);
373 inform("Broadcast/multicast packet dropped\n");
376 // It is a unicast address with a known destination
377 Channel
*dst
= dst_info
->second
;
379 dst
->sendRaw(&hdr_pkt
, sizeof(hdr_pkt
));
380 dst
->sendRaw(packetBuffer
, hdr_pkt
.dataPacketLength
);
381 DPRINTF(debugPkt
, "Unicast packet sent (to rank %d)\n",dst
->rank
);
383 inform("Unicast packet dropped (destination exited)\n");
389 TCPServer::syncTryComplete(SyncState st
, MsgType ack
)
391 // Check if the barrieris complete. If so then notify all the clients.
392 for (auto &c
: clientsChannel
) {
393 if (c
.isAlive
&& (c
.state
!= st
)) {
394 // sync not complete yet, stop here
398 // Sync complete, send out the acks
399 MultiHeaderPkt::Header hdr_pkt
;
400 hdr_pkt
.msgType
= ack
;
401 for (auto &c
: clientsChannel
) {
403 c
.sendRaw(&hdr_pkt
, sizeof(hdr_pkt
));
404 c
.state
= SyncState::idle
;
407 // Reset periodic send tick
408 _periodicSyncTick
= 0;
409 DPRINTF(st
== SyncState::periodic
? debugPeriodic
: debugSync
,
414 TCPServer::ckptPropagate(Channel
&ch
)
416 // Channel ch got a ckpt request that needs to be propagated to the other
418 MultiHeaderPkt::Header hdr_pkt
;
419 hdr_pkt
.msgType
= MsgType::cmdCkptSyncReq
;
420 for (auto &c
: clientsChannel
) {
421 if (c
.isAlive
&& (&c
!= &ch
)) {
423 case SyncState::idle
:
424 case SyncState::periodic
:
425 c
.sendRaw(&hdr_pkt
, sizeof(hdr_pkt
));
426 c
.state
= SyncState::asyncCkpt
;
429 panic("Unexpected state for ckpt sync request propagation "
430 "(rank:%d)\n",c
.rank
);
437 int main(int argc
, char *argv
[])
440 int clients_num
= -1, listen_port
= -1;
441 int first_arg
= 1, timeout_in_sec
= 60;
443 if (argc
> 1 && string(argv
[1]).compare("-debug") == 0) {
450 panic("We need two command line args (number of clients and tcp listen"
453 clients_num
= atoi(argv
[first_arg
]);
454 listen_port
= atoi(argv
[first_arg
+ 1]);
456 server
= new TCPServer(clients_num
, listen_port
, timeout_in_sec
);