15f69f2ac9d87f380adba3e84b9f8bd47a0020af
[gem5.git] / src / dev / net / multi_iface.cc
1 /*
2 * Copyright (c) 2015 ARM Limited
3 * All rights reserved
4 *
5 * The license below extends only to copyright in the software and shall
6 * not be construed as granting a license to any other intellectual
7 * property including but not limited to intellectual property relating
8 * to a hardware implementation of the functionality of the software
9 * licensed hereunder. You may use the software subject to the license
10 * terms below provided that you ensure that this notice is replicated
11 * unmodified and in its entirety in all distributions of the software,
12 * modified or unmodified, in source code or in binary form.
13 *
14 * Redistribution and use in source and binary forms, with or without
15 * modification, are permitted provided that the following conditions are
16 * met: redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer;
18 * redistributions in binary form must reproduce the above copyright
19 * notice, this list of conditions and the following disclaimer in the
20 * documentation and/or other materials provided with the distribution;
21 * neither the name of the copyright holders nor the names of its
22 * contributors may be used to endorse or promote products derived from
23 * this software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 * Authors: Gabor Dozsa
38 */
39
40 /* @file
41 * The interface class for multi gem5 simulations.
42 */
43
44 #include "dev/net/multi_iface.hh"
45
46 #include <queue>
47 #include <thread>
48
49 #include "base/random.hh"
50 #include "base/trace.hh"
51 #include "debug/MultiEthernet.hh"
52 #include "debug/MultiEthernetPkt.hh"
53 #include "dev/net/etherpkt.hh"
54 #include "sim/sim_exit.hh"
55 #include "sim/sim_object.hh"
56
57
58 MultiIface::Sync *MultiIface::sync = nullptr;
59 MultiIface::SyncEvent *MultiIface::syncEvent = nullptr;
60 unsigned MultiIface::recvThreadsNum = 0;
61 MultiIface *MultiIface::master = nullptr;
62
63 bool
64 MultiIface::Sync::run(SyncTrigger t, Tick sync_tick)
65 {
66 std::unique_lock<std::mutex> sync_lock(lock);
67
68 trigger = t;
69 if (trigger != SyncTrigger::periodic) {
70 DPRINTF(MultiEthernet,"MultiIface::Sync::run() trigger:%d\n",
71 (unsigned)trigger);
72 }
73
74 switch (state) {
75 case SyncState::asyncCkpt:
76 switch (trigger) {
77 case SyncTrigger::ckpt:
78 assert(MultiIface::syncEvent->interrupted == false);
79 state = SyncState::busy;
80 break;
81 case SyncTrigger::periodic:
82 if (waitNum == 0) {
83 // So all recv threads got an async checkpoint request already
84 // and a simExit is scheduled at the end of the current tick
85 // (i.e. it is a periodic sync scheduled at the same tick as
86 // the simExit).
87 state = SyncState::idle;
88 DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted "
89 "due to async ckpt scheduled\n");
90 return false;
91 } else {
92 // we still need to wait for some receiver thread to get the
93 // aysnc ckpt request. We are going to proceed as 'interrupted'
94 // periodic sync.
95 state = SyncState::interrupted;
96 DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted "
97 "due to ckpt request is coming in\n");
98 }
99 break;
100 case SyncTrigger::atomic:
101 assert(trigger != SyncTrigger::atomic);
102 }
103 break;
104 case SyncState::idle:
105 state = SyncState::busy;
106 break;
107 // Only one sync can be active at any time
108 case SyncState::interrupted:
109 case SyncState::busy:
110 assert(state != SyncState::interrupted);
111 assert(state != SyncState::busy);
112 break;
113 }
114 // Kick-off the sync unless we are in the middle of an interrupted
115 // periodic sync
116 if (state != SyncState::interrupted) {
117 assert(waitNum == 0);
118 waitNum = MultiIface::recvThreadsNum;
119 // initiate the global synchronisation
120 assert(MultiIface::master != nullptr);
121 MultiIface::master->syncRaw(triggerToMsg[(unsigned)trigger], sync_tick);
122 }
123 // now wait until all receiver threads complete the synchronisation
124 auto lf = [this]{ return waitNum == 0; };
125 cv.wait(sync_lock, lf);
126
127 // we are done
128 assert(state == SyncState::busy || state == SyncState::interrupted);
129 bool ret = (state != SyncState::interrupted);
130 state = SyncState::idle;
131 return ret;
132 }
133
134 void
135 MultiIface::Sync::progress(MsgType msg)
136 {
137 std::unique_lock<std::mutex> sync_lock(lock);
138
139 switch (msg) {
140 case MsgType::cmdAtomicSyncAck:
141 assert(state == SyncState::busy && trigger == SyncTrigger::atomic);
142 break;
143 case MsgType::cmdPeriodicSyncAck:
144 assert(state == SyncState::busy && trigger == SyncTrigger::periodic);
145 break;
146 case MsgType::cmdCkptSyncAck:
147 assert(state == SyncState::busy && trigger == SyncTrigger::ckpt);
148 break;
149 case MsgType::cmdCkptSyncReq:
150 switch (state) {
151 case SyncState::busy:
152 if (trigger == SyncTrigger::ckpt) {
153 // We are already in a checkpoint sync but got another ckpt
154 // sync request. This may happen if two (or more) peer gem5
155 // processes try to start a ckpt nearly at the same time.
156 // Incrementing waitNum here (before decrementing it below)
157 // effectively results in ignoring this new ckpt sync request.
158 waitNum++;
159 break;
160 }
161 assert (waitNum == recvThreadsNum);
162 state = SyncState::interrupted;
163 // we need to fall over here to handle "recvThreadsNum == 1" case
164 case SyncState::interrupted:
165 assert(trigger == SyncTrigger::periodic);
166 assert(waitNum >= 1);
167 if (waitNum == 1) {
168 exitSimLoop("checkpoint");
169 }
170 break;
171 case SyncState::idle:
172 // There is no on-going sync so we got an async ckpt request. If we
173 // are the only receiver thread then we need to schedule the
174 // checkpoint. Otherwise, only change the state to 'asyncCkpt' and
175 // let the last receiver thread to schedule the checkpoint at the
176 // 'asyncCkpt' case.
177 // Note that a periodic or resume sync may start later and that can
178 // trigger a state change to 'interrupted' (so the checkpoint may
179 // get scheduled at 'interrupted' case finally).
180 assert(waitNum == 0);
181 state = SyncState::asyncCkpt;
182 waitNum = MultiIface::recvThreadsNum;
183 // we need to fall over here to handle "recvThreadsNum == 1" case
184 case SyncState::asyncCkpt:
185 assert(waitNum >= 1);
186 if (waitNum == 1)
187 exitSimLoop("checkpoint");
188 break;
189 default:
190 panic("Unexpected state for checkpoint request message");
191 break;
192 }
193 break;
194 default:
195 panic("Unknown msg type");
196 break;
197 }
198 waitNum--;
199 assert(state != SyncState::idle);
200 // Notify the simultaion thread if there is an on-going sync.
201 if (state != SyncState::asyncCkpt) {
202 sync_lock.unlock();
203 cv.notify_one();
204 }
205 }
206
207 void MultiIface::SyncEvent::start(Tick start, Tick interval)
208 {
209 assert(!scheduled());
210 if (interval == 0)
211 panic("Multi synchronisation period must be greater than zero");
212 repeat = interval;
213 schedule(start);
214 }
215
216 void
217 MultiIface::SyncEvent::adjust(Tick start_tick, Tick repeat_tick)
218 {
219 // The new multi interface may require earlier start of the
220 // synchronisation.
221 assert(scheduled() == true);
222 if (start_tick < when())
223 reschedule(start_tick);
224 // The new multi interface may require more frequent synchronisation.
225 if (repeat == 0)
226 panic("Multi synchronisation period must be greater than zero");
227 if (repeat < repeat_tick)
228 repeat = repeat_tick;
229 }
230
231 void
232 MultiIface::SyncEvent::process()
233 {
234 /*
235 * Note that this is a global event so this process method will be called
236 * by only exactly one thread.
237 */
238 // if we are draining the system then we must not start a periodic sync (as
239 // it is not sure that all peer gem5 will reach this tick before taking
240 // the checkpoint).
241 if (isDraining == true) {
242 assert(interrupted == false);
243 interrupted = true;
244 DPRINTF(MultiEthernet,"MultiIface::SyncEvent::process() interrupted "
245 "due to draining\n");
246 return;
247 }
248 if (interrupted == false)
249 scheduledAt = curTick();
250 /*
251 * We hold the eventq lock at this point but the receiver thread may
252 * need the lock to schedule new recv events while waiting for the
253 * multi sync to complete.
254 * Note that the other simulation threads also release their eventq
255 * locks while waiting for us due to the global event semantics.
256 */
257 curEventQueue()->unlock();
258 // we do a global sync here
259 interrupted = !MultiIface::sync->run(SyncTrigger::periodic, scheduledAt);
260 // Global sync completed or got interrupted.
261 // we are expected to exit with the eventq lock held
262 curEventQueue()->lock();
263 // schedule the next global sync event if this one completed. Otherwise
264 // (i.e. this one was interrupted by a checkpoint request), we will
265 // reschedule this one after the draining is complete.
266 if (!interrupted)
267 schedule(scheduledAt + repeat);
268 }
269
270 void MultiIface::SyncEvent::resume()
271 {
272 Tick sync_tick;
273 assert(!scheduled());
274 if (interrupted) {
275 assert(curTick() >= scheduledAt);
276 // We have to complete the interrupted periodic sync asap.
277 // Note that this sync might be interrupted now again with a checkpoint
278 // request from a peer gem5...
279 sync_tick = curTick();
280 schedule(sync_tick);
281 } else {
282 // So we completed the last periodic sync, let's find out the tick for
283 // next one
284 assert(curTick() > scheduledAt);
285 sync_tick = scheduledAt + repeat;
286 if (sync_tick < curTick())
287 panic("Cannot resume periodic synchronisation");
288 schedule(sync_tick);
289 }
290 DPRINTF(MultiEthernet,
291 "MultiIface::SyncEvent periodic sync resumed at %lld "
292 "(curTick:%lld)\n", sync_tick, curTick());
293 }
294
295 void MultiIface::SyncEvent::serialize(const std::string &base,
296 CheckpointOut &cp) const
297 {
298 // Save the periodic multi sync schedule information
299 paramOut(cp, base + ".periodicSyncRepeat", repeat);
300 paramOut(cp, base + ".periodicSyncInterrupted", interrupted);
301 paramOut(cp, base + ".periodicSyncAt", scheduledAt);
302 }
303
304 void MultiIface::SyncEvent::unserialize(const std::string &base,
305 CheckpointIn &cp)
306 {
307 paramIn(cp, base + ".periodicSyncRepeat", repeat);
308 paramIn(cp, base + ".periodicSyncInterrupted", interrupted);
309 paramIn(cp, base + ".periodicSyncAt", scheduledAt);
310 }
311
312 MultiIface::MultiIface(unsigned multi_rank,
313 Tick sync_start,
314 Tick sync_repeat,
315 EventManager *em) :
316 syncStart(sync_start), syncRepeat(sync_repeat),
317 recvThread(nullptr), eventManager(em), recvDone(nullptr),
318 scheduledRecvPacket(nullptr), linkDelay(0), rank(multi_rank)
319 {
320 DPRINTF(MultiEthernet, "MultiIface() ctor rank:%d\n",multi_rank);
321 if (master == nullptr) {
322 assert(sync == nullptr);
323 assert(syncEvent == nullptr);
324 sync = new Sync();
325 syncEvent = new SyncEvent();
326 master = this;
327 }
328 }
329
330 MultiIface::~MultiIface()
331 {
332 assert(recvThread);
333 delete recvThread;
334 if (this == master) {
335 assert(syncEvent);
336 delete syncEvent;
337 assert(sync);
338 delete sync;
339 }
340 }
341
342 void
343 MultiIface::packetOut(EthPacketPtr pkt, Tick send_delay)
344 {
345 MultiHeaderPkt::Header header_pkt;
346 unsigned address_length = MultiHeaderPkt::maxAddressLength();
347
348 // Prepare a multi header packet for the Ethernet packet we want to
349 // send out.
350 header_pkt.msgType = MsgType::dataDescriptor;
351 header_pkt.sendTick = curTick();
352 header_pkt.sendDelay = send_delay;
353
354 // Store also the source and destination addresses.
355 pkt->packAddress(header_pkt.srcAddress, header_pkt.dstAddress,
356 address_length);
357
358 header_pkt.dataPacketLength = pkt->size();
359
360 // Send out the multi hedare packet followed by the Ethernet packet.
361 sendRaw(&header_pkt, sizeof(header_pkt), header_pkt.dstAddress);
362 sendRaw(pkt->data, pkt->size(), header_pkt.dstAddress);
363 DPRINTF(MultiEthernetPkt,
364 "MultiIface::sendDataPacket() done size:%d send_delay:%llu "
365 "src:0x%02x%02x%02x%02x%02x%02x "
366 "dst:0x%02x%02x%02x%02x%02x%02x\n",
367 pkt->size(), send_delay,
368 header_pkt.srcAddress[0], header_pkt.srcAddress[1],
369 header_pkt.srcAddress[2], header_pkt.srcAddress[3],
370 header_pkt.srcAddress[4], header_pkt.srcAddress[5],
371 header_pkt.dstAddress[0], header_pkt.dstAddress[1],
372 header_pkt.dstAddress[2], header_pkt.dstAddress[3],
373 header_pkt.dstAddress[4], header_pkt.dstAddress[5]);
374 }
375
376 bool
377 MultiIface::recvHeader(MultiHeaderPkt::Header &header_pkt)
378 {
379 // Blocking receive of an incoming multi header packet.
380 return recvRaw((void *)&header_pkt, sizeof(header_pkt));
381 }
382
383 void
384 MultiIface::recvData(const MultiHeaderPkt::Header &header_pkt)
385 {
386 // We are here beacuse a header packet has been received implying
387 // that an Ethernet (data) packet is coming in next.
388 assert(header_pkt.msgType == MsgType::dataDescriptor);
389 // Allocate storage for the incoming Ethernet packet.
390 EthPacketPtr new_packet(new EthPacketData(header_pkt.dataPacketLength));
391 // Now execute the blocking receive and store the incoming data directly
392 // in the new EthPacketData object.
393 if (! recvRaw((void *)(new_packet->data), header_pkt.dataPacketLength))
394 panic("Missing data packet");
395
396 new_packet->length = header_pkt.dataPacketLength;
397 // Grab the event queue lock to schedule a new receive event for the
398 // data packet.
399 curEventQueue()->lock();
400 // Compute the receive tick. It includes the send delay and the
401 // simulated link delay.
402 Tick recv_tick = header_pkt.sendTick + header_pkt.sendDelay + linkDelay;
403 DPRINTF(MultiEthernetPkt, "MultiIface::recvThread() packet receive, "
404 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
405 header_pkt.sendTick, header_pkt.sendDelay, linkDelay, recv_tick);
406
407 if (recv_tick <= curTick()) {
408 panic("Simulators out of sync - missed packet receive by %llu ticks",
409 curTick() - recv_tick);
410 }
411 // Now we are about to schedule a recvDone event for the new data packet.
412 // We use the same recvDone object for all incoming data packets. If
413 // that is already scheduled - i.e. a receive event for a previous
414 // data packet is already pending - then we have to check whether the
415 // receive tick for the new packet is earlier than that of the currently
416 // pending event. Packets may arrive out-of-order with respect to
417 // simulated receive time. If that is the case, we need to re-schedule the
418 // recvDone event for the new packet. Otherwise, we save the packet
419 // pointer and the recv tick for the new packet in the recvQueue. See
420 // the implementation of the packetIn() method for comments on how this
421 // information is retrieved from the recvQueue by the simulation thread.
422 if (!recvDone->scheduled()) {
423 assert(recvQueue.size() == 0);
424 assert(scheduledRecvPacket == nullptr);
425 scheduledRecvPacket = new_packet;
426 eventManager->schedule(recvDone, recv_tick);
427 } else if (recvDone->when() > recv_tick) {
428 recvQueue.emplace(scheduledRecvPacket, recvDone->when());
429 eventManager->reschedule(recvDone, recv_tick);
430 scheduledRecvPacket = new_packet;
431 } else {
432 recvQueue.emplace(new_packet, recv_tick);
433 }
434 curEventQueue()->unlock();
435 }
436
437 void
438 MultiIface::recvThreadFunc()
439 {
440 EthPacketPtr new_packet;
441 MultiHeaderPkt::Header header;
442
443 // The new receiver thread shares the event queue with the simulation
444 // thread (associated with the simulated Ethernet link).
445 curEventQueue(eventManager->eventQueue());
446 // Main loop to wait for and process any incoming message.
447 for (;;) {
448 // recvHeader() blocks until the next multi header packet comes in.
449 if (!recvHeader(header)) {
450 // We lost connection to the peer gem5 processes most likely
451 // because one of them called m5 exit. So we stop here.
452 exit_message("info", 0, "Message server closed connection, "
453 "simulation is exiting");
454 }
455 // We got a valid multi header packet, let's process it
456 if (header.msgType == MsgType::dataDescriptor) {
457 recvData(header);
458 } else {
459 // everything else must be synchronisation related command
460 sync->progress(header.msgType);
461 }
462 }
463 }
464
465 EthPacketPtr
466 MultiIface::packetIn()
467 {
468 // We are called within the process() method of the recvDone event. We
469 // return the packet that triggered the current receive event.
470 // If there is further packets in the recvQueue, we also have to schedule
471 // the recvEvent for the next packet with the smallest receive tick.
472 // The priority queue container ensures that smallest receive tick is
473 // always on the top of the queue.
474 assert(scheduledRecvPacket != nullptr);
475 EthPacketPtr next_packet = scheduledRecvPacket;
476
477 if (! recvQueue.empty()) {
478 eventManager->schedule(recvDone, recvQueue.top().second);
479 scheduledRecvPacket = recvQueue.top().first;
480 recvQueue.pop();
481 } else {
482 scheduledRecvPacket = nullptr;
483 }
484
485 return next_packet;
486 }
487
488 void
489 MultiIface::spawnRecvThread(Event *recv_done, Tick link_delay)
490 {
491 assert(recvThread == nullptr);
492 // all receive thread must be spawned before simulation starts
493 assert(eventManager->eventQueue()->getCurTick() == 0);
494
495 recvDone = recv_done;
496 linkDelay = link_delay;
497
498 recvThread = new std::thread(&MultiIface::recvThreadFunc, this);
499
500 recvThreadsNum++;
501 }
502
503 DrainState
504 MultiIface::drain()
505 {
506 DPRINTF(MultiEthernet,"MultiIFace::drain() called\n");
507
508 // This can be called multiple times in the same drain cycle.
509 if (master == this) {
510 syncEvent->isDraining = true;
511 }
512
513 return DrainState::Drained;
514 }
515
516 void MultiIface::drainDone() {
517 if (master == this) {
518 assert(syncEvent->isDraining == true);
519 syncEvent->isDraining = false;
520 // We need to resume the interrupted periodic sync here now that the
521 // draining is done. If the last periodic sync completed before the
522 // checkpoint then the next one is already scheduled.
523 if (syncEvent->interrupted)
524 syncEvent->resume();
525 }
526 }
527
528 void MultiIface::serialize(const std::string &base, CheckpointOut &cp) const
529 {
530 // Drain the multi interface before the checkpoint is taken. We cannot call
531 // this as part of the normal drain cycle because this multi sync has to be
532 // called exactly once after the system is fully drained.
533 // Note that every peer will take a checkpoint but they may take it at
534 // different ticks.
535 // This sync request may interrupt an on-going periodic sync in some peers.
536 sync->run(SyncTrigger::ckpt, curTick());
537
538 // Save the periodic multi sync status
539 syncEvent->serialize(base, cp);
540
541 unsigned n_rx_packets = recvQueue.size();
542 if (scheduledRecvPacket != nullptr)
543 n_rx_packets++;
544
545 paramOut(cp, base + ".nRxPackets", n_rx_packets);
546
547 if (n_rx_packets > 0) {
548 assert(recvDone->scheduled());
549 scheduledRecvPacket->serialize(base + ".rxPacket[0]", cp);
550 }
551
552 for (unsigned i=1; i < n_rx_packets; i++) {
553 const RecvInfo recv_info = recvQueue.impl().at(i-1);
554 recv_info.first->serialize(base + csprintf(".rxPacket[%d]", i), cp);
555 Tick rx_tick = recv_info.second;
556 paramOut(cp, base + csprintf(".rxTick[%d]", i), rx_tick);
557 }
558 }
559
560 void MultiIface::unserialize(const std::string &base, CheckpointIn &cp)
561 {
562 assert(recvQueue.size() == 0);
563 assert(scheduledRecvPacket == nullptr);
564 assert(recvDone->scheduled() == false);
565
566 // restore periodic sync info
567 syncEvent->unserialize(base, cp);
568
569 unsigned n_rx_packets;
570 paramIn(cp, base + ".nRxPackets", n_rx_packets);
571
572 if (n_rx_packets > 0) {
573 scheduledRecvPacket = std::make_shared<EthPacketData>(16384);
574 scheduledRecvPacket->unserialize(base + ".rxPacket[0]", cp);
575 // Note: receive event will be scheduled when the link is unserialized
576 }
577
578 for (unsigned i=1; i < n_rx_packets; i++) {
579 EthPacketPtr rx_packet = std::make_shared<EthPacketData>(16384);
580 rx_packet->unserialize(base + csprintf(".rxPacket[%d]", i), cp);
581 Tick rx_tick = 0;
582 paramIn(cp, base + csprintf(".rxTick[%d]", i), rx_tick);
583 assert(rx_tick > 0);
584 recvQueue.emplace(rx_packet,rx_tick);
585 }
586 }
587
588 void MultiIface::initRandom()
589 {
590 // Initialize the seed for random generator to avoid the same sequence
591 // in all gem5 peer processes
592 assert(master != nullptr);
593 if (this == master)
594 random_mt.init(5489 * (rank+1) + 257);
595 }
596
597 void MultiIface::startPeriodicSync()
598 {
599 DPRINTF(MultiEthernet, "MultiIface:::initPeriodicSync started\n");
600 // Do a global sync here to ensure that peer gem5 processes are around
601 // (actually this may not be needed...)
602 sync->run(SyncTrigger::atomic, curTick());
603
604 // Start the periodic sync if it is a fresh simulation from scratch
605 if (curTick() == 0) {
606 if (this == master) {
607 syncEvent->start(syncStart, syncRepeat);
608 inform("Multi synchronisation activated: start at %lld, "
609 "repeat at every %lld ticks.\n",
610 syncStart, syncRepeat);
611 } else {
612 // In case another multiIface object requires different schedule
613 // for periodic sync than the master does.
614 syncEvent->adjust(syncStart, syncRepeat);
615 }
616 } else {
617 // Schedule the next periodic sync if resuming from a checkpoint
618 if (this == master)
619 syncEvent->resume();
620 }
621 DPRINTF(MultiEthernet, "MultiIface::initPeriodicSync done\n");
622 }