ruby: give access to cache tag/data latencies from SLICC
[gem5.git] / util / multi / tcp_server.cc
1 /*
2 * Copyright (c) 2015 ARM Limited
3 * All rights reserved
4 *
5 * The license below extends only to copyright in the software and shall
6 * not be construed as granting a license to any other intellectual
7 * property including but not limited to intellectual property relating
8 * to a hardware implementation of the functionality of the software
9 * licensed hereunder. You may use the software subject to the license
10 * terms below provided that you ensure that this notice is replicated
11 * unmodified and in its entirety in all distributions of the software,
12 * modified or unmodified, in source code or in binary form.
13 *
14 * Copyright (c) 2008 The Regents of The University of Michigan
15 * All rights reserved.
16 *
17 * Redistribution and use in source and binary forms, with or without
18 * modification, are permitted provided that the following conditions are
19 * met: redistributions of source code must retain the above copyright
20 * notice, this list of conditions and the following disclaimer;
21 * redistributions in binary form must reproduce the above copyright
22 * notice, this list of conditions and the following disclaimer in the
23 * documentation and/or other materials provided with the distribution;
24 * neither the name of the copyright holders nor the names of its
25 * contributors may be used to endorse or promote products derived from
26 * this software without specific prior written permission.
27 *
28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 *
40 * Authors: Gabor Dozsa
41 */
42
43
44 /* @file
45 * Message server implementation using TCP stream sockets for parallel gem5
46 * runs.
47 */
48 #include <arpa/inet.h>
49 #include <sys/socket.h>
50 #include <sys/types.h>
51 #include <unistd.h>
52
53 #include <cstdio>
54 #include <cstdlib>
55
56 #include "tcp_server.hh"
57
58 using namespace std;
59
60 // Some basic macros for information and error reporting.
61 #define PRINTF(...) fprintf(stderr, __VA_ARGS__)
62
63 #ifdef DEBUG
64 static bool debugSetup = true;
65 static bool debugPeriodic = false;
66 static bool debugSync = true;
67 static bool debugPkt = false;
68 #define DPRINTF(v,...) if (v) PRINTF(__VA_ARGS__)
69 #else
70 #define DPRINTF(v,...)
71 #endif
72
73 #define inform(...) do { PRINTF("info: "); \
74 PRINTF(__VA_ARGS__); } while(0)
75
76 #define panic(...) do { PRINTF("panic: "); \
77 PRINTF(__VA_ARGS__); \
78 PRINTF("\n[%s:%s], line %d\n", \
79 __FUNCTION__, __FILE__, __LINE__); \
80 exit(-1); } while(0)
81
82 TCPServer *TCPServer::instance = nullptr;
83
84 TCPServer::Channel::Channel() : fd(-1), isAlive(false), state(SyncState::idle)
85 {
86 MultiHeaderPkt::clearAddress(address);
87 }
88
89 unsigned
90 TCPServer::Channel::recvRaw(void *buf, unsigned size) const
91 {
92 ssize_t n;
93 // This is a blocking receive.
94 n = recv(fd, buf, size, MSG_WAITALL);
95
96 if (n < 0)
97 panic("read() failed:%s", strerror(errno));
98 else if (n > 0 && n < size)
99 // the recv() call should wait for the full message
100 panic("read() failed");
101
102 return n;
103 }
104
105 void
106 TCPServer::Channel::sendRaw(const void *buf, unsigned size) const
107 {
108 ssize_t n;
109 n = send(fd, buf, size, MSG_NOSIGNAL);
110 if (n < 0)
111 panic("write() failed:%s", strerror(errno));
112 else if (n != size)
113 panic("write() failed");
114 }
115
116 void TCPServer::Channel::updateAddress(const AddressType &new_address)
117 {
118 // check if the known address has changed (e.g. the client reconfigured
119 // its Ethernet NIC)
120 if (MultiHeaderPkt::isAddressEqual(address, new_address))
121 return;
122
123 // So we have to update the address. Note that we always
124 // store the same address as key in the map but the ordering
125 // may change so we need to erase and re-insert it again.
126 auto info = TCPServer::instance->addressMap.find(&address);
127 if (info != TCPServer::instance->addressMap.end()) {
128 TCPServer::instance->addressMap.erase(info);
129 }
130
131 MultiHeaderPkt::copyAddress(address, new_address);
132 TCPServer::instance->addressMap[&address] = this;
133 }
134
135 void
136 TCPServer::Channel::headerPktIn()
137 {
138 ssize_t n;
139 Header hdr_pkt;
140
141 n = recvRaw(&hdr_pkt, sizeof(hdr_pkt));
142
143 if (n == 0) {
144 // EOF - nothing to do here, we will handle this as a POLLRDHUP event
145 // in the main loop.
146 return;
147 }
148
149 if (hdr_pkt.msgType == MsgType::dataDescriptor) {
150 updateAddress(hdr_pkt.srcAddress);
151 TCPServer::instance->xferData(hdr_pkt, *this);
152 } else {
153 processCmd(hdr_pkt.msgType, hdr_pkt.sendTick);
154 }
155 }
156
157 void TCPServer::Channel::processCmd(MsgType cmd, Tick send_tick)
158 {
159 switch (cmd) {
160 case MsgType::cmdAtomicSyncReq:
161 DPRINTF(debugSync,"Atomic sync request (rank:%d)\n",rank);
162 assert(state == SyncState::idle);
163 state = SyncState::atomic;
164 TCPServer::instance->syncTryComplete(SyncState::atomic,
165 MsgType::cmdAtomicSyncAck);
166 break;
167 case MsgType::cmdPeriodicSyncReq:
168 DPRINTF(debugPeriodic,"PERIODIC sync request (at %ld)\n",send_tick);
169 // sanity check
170 if (TCPServer::instance->periodicSyncTick() == 0) {
171 TCPServer::instance->periodicSyncTick(send_tick);
172 } else if ( TCPServer::instance->periodicSyncTick() != send_tick) {
173 panic("Out of order periodic sync request - rank:%d "
174 "(send_tick:%ld ongoing:%ld)", rank, send_tick,
175 TCPServer::instance->periodicSyncTick());
176 }
177 switch (state) {
178 case SyncState::idle:
179 state = SyncState::periodic;
180 TCPServer::instance->syncTryComplete(SyncState::periodic,
181 MsgType::cmdPeriodicSyncAck);
182 break;
183 case SyncState::asyncCkpt:
184 // An async ckpt request has already been sent to this client and
185 // that will interrupt this periodic sync. We can simply drop this
186 // message.
187 break;
188 default:
189 panic("Unexpected state for periodic sync request (rank:%d)",
190 rank);
191 break;
192 }
193 break;
194 case MsgType::cmdCkptSyncReq:
195 DPRINTF(debugSync, "CKPT sync request (rank:%d)\n",rank);
196 switch (state) {
197 case SyncState::idle:
198 TCPServer::instance->ckptPropagate(*this);
199 // we fall through here to complete #clients==1 case
200 case SyncState::asyncCkpt:
201 state = SyncState::ckpt;
202 TCPServer::instance->syncTryComplete(SyncState::ckpt,
203 MsgType::cmdCkptSyncAck);
204 break;
205 default:
206 panic("Unexpected state for ckpt sync request (rank:%d)", rank);
207 break;
208 }
209 break;
210 default:
211 panic("Unexpected header packet (rank:%d)",rank);
212 break;
213 }
214 }
215
216 TCPServer::TCPServer(unsigned clients_num,
217 unsigned listen_port,
218 int timeout_in_sec)
219 {
220 assert(instance == nullptr);
221 construct(clients_num, listen_port, timeout_in_sec);
222 instance = this;
223 }
224
225 TCPServer::~TCPServer()
226 {
227 for (auto &c : clientsPollFd)
228 close(c.fd);
229 }
230
231 void
232 TCPServer::construct(unsigned clients_num, unsigned port, int timeout_in_sec)
233 {
234 int listen_sock, new_sock, ret;
235 unsigned client_len;
236 struct sockaddr_in server_addr, client_addr;
237 struct pollfd new_pollfd;
238 Channel new_channel;
239
240 DPRINTF(debugSetup, "Start listening on port %u ...\n", port);
241
242 listen_sock = socket(AF_INET, SOCK_STREAM, 0);
243 if (listen_sock < 0)
244 panic("socket() failed:%s", strerror(errno));
245
246 bzero(&server_addr, sizeof(server_addr));
247 server_addr.sin_family = AF_INET;
248 server_addr.sin_addr.s_addr = INADDR_ANY;
249 server_addr.sin_port = htons(port);
250 if (bind(listen_sock, (struct sockaddr *) &server_addr,
251 sizeof(server_addr)) < 0)
252 panic("bind() failed:%s", strerror(errno));
253 listen(listen_sock, 10);
254
255 clientsPollFd.reserve(clients_num);
256 clientsChannel.reserve(clients_num);
257
258 new_pollfd.events = POLLIN | POLLRDHUP;
259 new_pollfd.revents = 0;
260 while (clientsPollFd.size() < clients_num) {
261 new_pollfd.fd = listen_sock;
262 ret = poll(&new_pollfd, 1, timeout_in_sec*1000);
263 if (ret == 0)
264 panic("Timeout while waiting for clients to connect");
265 assert(ret == 1 && new_pollfd.revents == POLLIN);
266 client_len = sizeof(client_addr);
267 new_sock = accept(listen_sock,
268 (struct sockaddr *) &client_addr,
269 &client_len);
270 if (new_sock < 0)
271 panic("accept() failed:%s", strerror(errno));
272 new_pollfd.fd = new_sock;
273 new_pollfd.revents = 0;
274 clientsPollFd.push_back(new_pollfd);
275 new_channel.fd = new_sock;
276 new_channel.isAlive = true;
277 new_channel.recvRaw(&new_channel.rank, sizeof(new_channel.rank));
278 clientsChannel.push_back(new_channel);
279
280 DPRINTF(debugSetup, "New client connection addr:%u port:%hu rank:%d\n",
281 client_addr.sin_addr.s_addr, client_addr.sin_port,
282 new_channel.rank);
283 }
284 ret = close(listen_sock);
285 assert(ret == 0);
286
287 DPRINTF(debugSetup, "Setup complete\n");
288 }
289
290 void
291 TCPServer::run()
292 {
293 int nfd;
294 unsigned num_active_clients = clientsPollFd.size();
295
296 DPRINTF(debugSetup, "Entering run() loop\n");
297 while (num_active_clients == clientsPollFd.size()) {
298 nfd = poll(&clientsPollFd[0], clientsPollFd.size(), -1);
299 if (nfd == -1)
300 panic("poll() failed:%s", strerror(errno));
301
302 for (unsigned i = 0, n = 0;
303 i < clientsPollFd.size() && (signed)n < nfd;
304 i++) {
305 struct pollfd &pfd = clientsPollFd[i];
306 if (pfd.revents) {
307 if (pfd.revents & POLLERR)
308 panic("poll() returned POLLERR");
309 if (pfd.revents & POLLIN) {
310 clientsChannel[i].headerPktIn();
311 }
312 if (pfd.revents & POLLRDHUP) {
313 // One gem5 process exited or aborted. Either way, we
314 // assume the full simulation should stop now (either
315 // because m5 exit was called or a serious error
316 // occurred.) So we quit the run loop here and close all
317 // sockets to notify the remaining peer gem5 processes.
318 pfd.events = 0;
319 clientsChannel[i].isAlive = false;
320 num_active_clients--;
321 DPRINTF(debugSetup, "POLLRDHUP event");
322 }
323 n++;
324 if ((signed)n == nfd)
325 break;
326 }
327 }
328 }
329 DPRINTF(debugSetup, "Exiting run() loop\n");
330 }
331
332 void
333 TCPServer::xferData(const Header &hdr_pkt, const Channel &src)
334 {
335 unsigned n;
336 assert(hdr_pkt.dataPacketLength <= sizeof(packetBuffer));
337 n = src.recvRaw(packetBuffer, hdr_pkt.dataPacketLength);
338
339 if (n == 0)
340 panic("recvRaw() failed");
341 DPRINTF(debugPkt, "Incoming data packet (from rank %d) "
342 "src:0x%02x%02x%02x%02x%02x%02x "
343 "dst:0x%02x%02x%02x%02x%02x%02x\n",
344 src.rank,
345 hdr_pkt.srcAddress[0],
346 hdr_pkt.srcAddress[1],
347 hdr_pkt.srcAddress[2],
348 hdr_pkt.srcAddress[3],
349 hdr_pkt.srcAddress[4],
350 hdr_pkt.srcAddress[5],
351 hdr_pkt.dstAddress[0],
352 hdr_pkt.dstAddress[1],
353 hdr_pkt.dstAddress[2],
354 hdr_pkt.dstAddress[3],
355 hdr_pkt.dstAddress[4],
356 hdr_pkt.dstAddress[5]);
357 // Now try to figure out the destination client(s).
358 auto dst_info = addressMap.find(&hdr_pkt.dstAddress);
359
360 // First handle the multicast/broadcast or unknonw destination case. These
361 // all trigger a broadcast of the packet to all clients.
362 if (MultiHeaderPkt::isUnicastAddress(hdr_pkt.dstAddress) == false ||
363 dst_info == addressMap.end()) {
364 unsigned n = 0;
365 for (auto const &c: clientsChannel) {
366 if (c.isAlive && &c!=&src) {
367 c.sendRaw(&hdr_pkt, sizeof(hdr_pkt));
368 c.sendRaw(packetBuffer, hdr_pkt.dataPacketLength);
369 n++;
370 }
371 }
372 if (n == 0) {
373 inform("Broadcast/multicast packet dropped\n");
374 }
375 } else {
376 // It is a unicast address with a known destination
377 Channel *dst = dst_info->second;
378 if (dst->isAlive) {
379 dst->sendRaw(&hdr_pkt, sizeof(hdr_pkt));
380 dst->sendRaw(packetBuffer, hdr_pkt.dataPacketLength);
381 DPRINTF(debugPkt, "Unicast packet sent (to rank %d)\n",dst->rank);
382 } else {
383 inform("Unicast packet dropped (destination exited)\n");
384 }
385 }
386 }
387
388 void
389 TCPServer::syncTryComplete(SyncState st, MsgType ack)
390 {
391 // Check if the barrieris complete. If so then notify all the clients.
392 for (auto &c : clientsChannel) {
393 if (c.isAlive && (c.state != st)) {
394 // sync not complete yet, stop here
395 return;
396 }
397 }
398 // Sync complete, send out the acks
399 MultiHeaderPkt::Header hdr_pkt;
400 hdr_pkt.msgType = ack;
401 for (auto &c : clientsChannel) {
402 if (c.isAlive) {
403 c.sendRaw(&hdr_pkt, sizeof(hdr_pkt));
404 c.state = SyncState::idle;
405 }
406 }
407 // Reset periodic send tick
408 _periodicSyncTick = 0;
409 DPRINTF(st == SyncState::periodic ? debugPeriodic : debugSync,
410 "Sync COMPLETE\n");
411 }
412
413 void
414 TCPServer::ckptPropagate(Channel &ch)
415 {
416 // Channel ch got a ckpt request that needs to be propagated to the other
417 // clients
418 MultiHeaderPkt::Header hdr_pkt;
419 hdr_pkt.msgType = MsgType::cmdCkptSyncReq;
420 for (auto &c : clientsChannel) {
421 if (c.isAlive && (&c != &ch)) {
422 switch (c.state) {
423 case SyncState::idle:
424 case SyncState::periodic:
425 c.sendRaw(&hdr_pkt, sizeof(hdr_pkt));
426 c.state = SyncState::asyncCkpt;
427 break;
428 default:
429 panic("Unexpected state for ckpt sync request propagation "
430 "(rank:%d)\n",c.rank);
431 break;
432 }
433 }
434 }
435 }
436
437 int main(int argc, char *argv[])
438 {
439 TCPServer *server;
440 int clients_num = -1, listen_port = -1;
441 int first_arg = 1, timeout_in_sec = 60;
442
443 if (argc > 1 && string(argv[1]).compare("-debug") == 0) {
444 timeout_in_sec = -1;
445 first_arg++;
446 argc--;
447 }
448
449 if (argc != 3)
450 panic("We need two command line args (number of clients and tcp listen"
451 " port");
452
453 clients_num = atoi(argv[first_arg]);
454 listen_port = atoi(argv[first_arg + 1]);
455
456 server = new TCPServer(clients_num, listen_port, timeout_in_sec);
457
458 server->run();
459
460 delete server;
461
462 return 0;
463 }