2 * Copyright (c) 2015 ARM Limited
5 * The license below extends only to copyright in the software and shall
6 * not be construed as granting a license to any other intellectual
7 * property including but not limited to intellectual property relating
8 * to a hardware implementation of the functionality of the software
9 * licensed hereunder. You may use the software subject to the license
10 * terms below provided that you ensure that this notice is replicated
11 * unmodified and in its entirety in all distributions of the software,
12 * modified or unmodified, in source code or in binary form.
14 * Redistribution and use in source and binary forms, with or without
15 * modification, are permitted provided that the following conditions are
16 * met: redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer;
18 * redistributions in binary form must reproduce the above copyright
19 * notice, this list of conditions and the following disclaimer in the
20 * documentation and/or other materials provided with the distribution;
21 * neither the name of the copyright holders nor the names of its
22 * contributors may be used to endorse or promote products derived from
23 * this software without specific prior written permission.
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
37 * Authors: Gabor Dozsa
42 * TCP stream socket based interface class implementation for dist-gem5 runs.
45 #include "dev/net/tcp_iface.hh"
47 #include <arpa/inet.h>
49 #include <netinet/tcp.h>
50 #include <sys/socket.h>
51 #include <sys/types.h>
58 #include "base/types.hh"
59 #include "debug/DistEthernet.hh"
60 #include "debug/DistEthernetCmd.hh"
61 #include "sim/sim_exit.hh"
63 #if defined(__FreeBSD__)
64 #include <netinet/in.h>
68 // MSG_NOSIGNAL does not exists on OS X
69 #if defined(__APPLE__) || defined(__MACH__)
71 #define MSG_NOSIGNAL SO_NOSIGPIPE
77 std::vector
<std::pair
<TCPIface::NodeInfo
, int> > TCPIface::nodes
;
78 vector
<int> TCPIface::sockRegistry
;
79 int TCPIface::fdStatic
= -1;
80 bool TCPIface::anyListening
= false;
82 TCPIface::TCPIface(string server_name
, unsigned server_port
,
83 unsigned dist_rank
, unsigned dist_size
,
84 Tick sync_start
, Tick sync_repeat
,
85 EventManager
*em
, bool use_pseudo_op
, bool is_switch
,
87 DistIface(dist_rank
, dist_size
, sync_start
, sync_repeat
, em
, use_pseudo_op
,
88 is_switch
, num_nodes
), serverName(server_name
),
89 serverPort(server_port
), isSwitch(is_switch
), listening(false)
91 if (is_switch
&& isMaster
) {
92 while (!listen(serverPort
)) {
93 DPRINTF(DistEthernet
, "TCPIface(listen): Can't bind port %d\n",
97 inform("tcp_iface listening on port %d", serverPort
);
98 // Now accept the first connection requests from each compute node and
99 // store the node info. The compute nodes will then wait for ack
100 // messages. Ack messages will be sent by initTransport() in the
101 // appropriate order to make sure that every compute node is always
102 // connected to the same switch port.
104 for (int i
= 0; i
< size
; i
++) {
106 DPRINTF(DistEthernet
, "First connection, waiting for link info\n");
107 if (!recvTCP(sock
, &ni
, sizeof(ni
)))
108 panic("Failed to receive link info");
109 nodes
.push_back(make_pair(ni
, sock
));
115 TCPIface::listen(int port
)
118 panic("Socket already listening!");
120 struct sockaddr_in sockaddr
;
123 fdStatic
= socket(PF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
124 panic_if(fdStatic
< 0, "socket() failed: %s", strerror(errno
));
126 sockaddr
.sin_family
= PF_INET
;
127 sockaddr
.sin_addr
.s_addr
= INADDR_ANY
;
128 sockaddr
.sin_port
= htons(port
);
129 // finally clear sin_zero
130 memset(&sockaddr
.sin_zero
, 0, sizeof(sockaddr
.sin_zero
));
131 ret
= ::bind(fdStatic
, (struct sockaddr
*)&sockaddr
, sizeof (sockaddr
));
134 if (ret
== -1 && errno
!= EADDRINUSE
)
135 panic("ListenSocket(listen): bind() failed!");
139 if (::listen(fdStatic
, 24) == -1) {
140 if (errno
!= EADDRINUSE
)
141 panic("ListenSocket(listen): listen() failed!");
152 TCPIface::establishConnection()
154 static unsigned cur_rank
= 0;
155 static unsigned cur_id
= 0;
159 if (cur_id
== 0) { // first connection accepted in the ctor already
161 find_if(nodes
.begin(), nodes
.end(),
162 [](const pair
<NodeInfo
, int> &cn
) -> bool {
163 return cn
.first
.rank
== cur_rank
;
165 assert(iface0
!= nodes
.end());
166 assert(iface0
->first
.distIfaceId
== 0);
167 sock
= iface0
->second
;
169 } else { // additional connections from the same compute node
171 DPRINTF(DistEthernet
, "Next connection, waiting for link info\n");
172 if (!recvTCP(sock
, &ni
, sizeof(ni
)))
173 panic("Failed to receive link info");
174 assert(ni
.rank
== cur_rank
);
175 assert(ni
.distIfaceId
== cur_id
);
177 inform("Link okay (iface:%d -> (node:%d, iface:%d))",
178 distIfaceId
, ni
.rank
, ni
.distIfaceId
);
179 if (ni
.distIfaceId
< ni
.distIfaceNum
- 1) {
186 ni
.distIfaceId
= distIfaceId
;
187 ni
.distIfaceNum
= distIfaceNum
;
188 sendTCP(sock
, &ni
, sizeof(ni
));
189 } else { // this is not a switch
193 ni
.distIfaceId
= distIfaceId
;
194 ni
.distIfaceNum
= distIfaceNum
;
195 sendTCP(sock
, &ni
, sizeof(ni
));
196 DPRINTF(DistEthernet
, "Connected, waiting for ack (distIfaceId:%d\n",
198 if (!recvTCP(sock
, &ni
, sizeof(ni
)))
199 panic("Failed to receive ack");
200 assert(ni
.rank
== rank
);
201 inform("Link okay (iface:%d -> switch iface:%d)", distIfaceId
,
204 sockRegistry
.push_back(sock
);
210 struct sockaddr_in sockaddr
;
211 socklen_t slen
= sizeof (sockaddr
);
212 sock
= ::accept(fdStatic
, (struct sockaddr
*)&sockaddr
, &slen
);
215 if (setsockopt(sock
, IPPROTO_TCP
, TCP_NODELAY
, (char *)&i
,
217 warn("ListenSocket(accept): setsockopt() TCP_NODELAY failed!");
224 struct addrinfo addr_hint
, *addr_results
;
227 string port_str
= to_string(serverPort
);
229 sock
= socket(PF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
230 panic_if(sock
< 0, "socket() failed: %s", strerror(errno
));
233 if (setsockopt(sock
, IPPROTO_TCP
, TCP_NODELAY
, (char *)&fl
, sizeof(fl
)) < 0)
234 warn("ConnectSocket(connect): setsockopt() TCP_NODELAY failed!");
236 bzero(&addr_hint
, sizeof(addr_hint
));
237 addr_hint
.ai_family
= AF_INET
;
238 addr_hint
.ai_socktype
= SOCK_STREAM
;
239 addr_hint
.ai_protocol
= IPPROTO_TCP
;
241 ret
= getaddrinfo(serverName
.c_str(), port_str
.c_str(),
242 &addr_hint
, &addr_results
);
243 panic_if(ret
< 0, "getaddrinf() failed: %s", strerror(errno
));
245 DPRINTF(DistEthernet
, "Connecting to %s:%s\n",
246 serverName
.c_str(), port_str
.c_str());
248 ret
= ::connect(sock
, (struct sockaddr
*)(addr_results
->ai_addr
),
249 addr_results
->ai_addrlen
);
250 panic_if(ret
< 0, "connect() failed: %s", strerror(errno
));
252 freeaddrinfo(addr_results
);
255 TCPIface::~TCPIface()
264 TCPIface::sendTCP(int sock
, const void *buf
, unsigned length
)
268 ret
= ::send(sock
, buf
, length
, MSG_NOSIGNAL
);
270 if (errno
== ECONNRESET
|| errno
== EPIPE
) {
271 exitSimLoop("Message server closed connection, simulation "
274 panic("send() failed: %s", strerror(errno
));
277 panic_if(ret
!= length
, "send() failed");
281 TCPIface::recvTCP(int sock
, void *buf
, unsigned length
)
285 ret
= ::recv(sock
, buf
, length
, MSG_WAITALL
);
287 if (errno
== ECONNRESET
|| errno
== EPIPE
)
288 inform("recv(): %s", strerror(errno
));
290 panic("recv() failed: %s", strerror(errno
));
291 } else if (ret
== 0) {
292 inform("recv(): Connection closed");
293 } else if (ret
!= length
)
294 panic("recv() failed");
296 return (ret
== length
);
300 TCPIface::sendPacket(const Header
&header
, const EthPacketPtr
&packet
)
302 sendTCP(sock
, &header
, sizeof(header
));
303 sendTCP(sock
, packet
->data
, packet
->length
);
307 TCPIface::sendCmd(const Header
&header
)
309 DPRINTF(DistEthernetCmd
, "TCPIface::sendCmd() type: %d\n",
310 static_cast<int>(header
.msgType
));
311 // Global commands (i.e. sync request) are always sent by the master
312 // DistIface. The transfer method is simply implemented as point-to-point
314 for (auto s
: sockRegistry
)
315 sendTCP(s
, (void*)&header
, sizeof(header
));
319 TCPIface::recvHeader(Header
&header
)
321 bool ret
= recvTCP(sock
, &header
, sizeof(header
));
322 DPRINTF(DistEthernetCmd
, "TCPIface::recvHeader() type: %d ret: %d\n",
323 static_cast<int>(header
.msgType
), ret
);
328 TCPIface::recvPacket(const Header
&header
, EthPacketPtr
&packet
)
330 packet
= make_shared
<EthPacketData
>(header
.dataPacketLength
);
331 bool ret
= recvTCP(sock
, packet
->data
, header
.dataPacketLength
);
332 panic_if(!ret
, "Error while reading socket");
333 packet
->simLength
= header
.simLength
;
334 packet
->length
= header
.dataPacketLength
;
338 TCPIface::initTransport()
340 // We cannot setup the conections in the constructor because the number
341 // of dist interfaces (per process) is unknown until the (simobject) init
342 // phase. That information is necessary for global connection ordering.
343 establishConnection();