misc: Replaced master/slave terminology
[gem5.git] / src / dev / net / dist_iface.cc
1 /*
2 * Copyright (c) 2015-2016 ARM Limited
3 * All rights reserved
4 *
5 * The license below extends only to copyright in the software and shall
6 * not be construed as granting a license to any other intellectual
7 * property including but not limited to intellectual property relating
8 * to a hardware implementation of the functionality of the software
9 * licensed hereunder. You may use the software subject to the license
10 * terms below provided that you ensure that this notice is replicated
11 * unmodified and in its entirety in all distributions of the software,
12 * modified or unmodified, in source code or in binary form.
13 *
14 * Redistribution and use in source and binary forms, with or without
15 * modification, are permitted provided that the following conditions are
16 * met: redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer;
18 * redistributions in binary form must reproduce the above copyright
19 * notice, this list of conditions and the following disclaimer in the
20 * documentation and/or other materials provided with the distribution;
21 * neither the name of the copyright holders nor the names of its
22 * contributors may be used to endorse or promote products derived from
23 * this software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 */
37
38 /* @file
39 * The interface class for dist-gem5 simulations.
40 */
41
42 #include "dev/net/dist_iface.hh"
43
44 #include <queue>
45 #include <thread>
46
47 #include "base/random.hh"
48 #include "base/trace.hh"
49 #include "cpu/thread_context.hh"
50 #include "debug/DistEthernet.hh"
51 #include "debug/DistEthernetPkt.hh"
52 #include "dev/net/etherpkt.hh"
53 #include "sim/sim_exit.hh"
54 #include "sim/sim_object.hh"
55 #include "sim/system.hh"
56
57 using namespace std;
58 DistIface::Sync *DistIface::sync = nullptr;
59 System *DistIface::sys = nullptr;
60 DistIface::SyncEvent *DistIface::syncEvent = nullptr;
61 unsigned DistIface::distIfaceNum = 0;
62 unsigned DistIface::recvThreadsNum = 0;
63 DistIface *DistIface::primary = nullptr;
64 bool DistIface::isSwitch = false;
65
66 void
67 DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
68 {
69 if (start_tick < nextAt) {
70 nextAt = start_tick;
71 inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
72 }
73
74 if (repeat_tick == 0)
75 panic("Dist synchronisation interval must be greater than zero");
76
77 if (repeat_tick < nextRepeat) {
78 nextRepeat = repeat_tick;
79 inform("Dist synchronisation interval is changed to %lu.\n",
80 nextRepeat);
81 }
82 }
83
84 void
85 DistIface::Sync::abort()
86 {
87 std::unique_lock<std::mutex> sync_lock(lock);
88 waitNum = 0;
89 isAbort = true;
90 sync_lock.unlock();
91 cv.notify_one();
92 }
93
94 DistIface::SyncSwitch::SyncSwitch(int num_nodes)
95 {
96 numNodes = num_nodes;
97 waitNum = num_nodes;
98 numExitReq = 0;
99 numCkptReq = 0;
100 numStopSyncReq = 0;
101 doExit = false;
102 doCkpt = false;
103 doStopSync = false;
104 nextAt = std::numeric_limits<Tick>::max();
105 nextRepeat = std::numeric_limits<Tick>::max();
106 isAbort = false;
107 }
108
109 DistIface::SyncNode::SyncNode()
110 {
111 waitNum = 0;
112 needExit = ReqType::none;
113 needCkpt = ReqType::none;
114 needStopSync = ReqType::none;
115 doExit = false;
116 doCkpt = false;
117 doStopSync = false;
118 nextAt = std::numeric_limits<Tick>::max();
119 nextRepeat = std::numeric_limits<Tick>::max();
120 isAbort = false;
121 }
122
123 bool
124 DistIface::SyncNode::run(bool same_tick)
125 {
126 std::unique_lock<std::mutex> sync_lock(lock);
127 Header header;
128
129 assert(waitNum == 0);
130 assert(!isAbort);
131 waitNum = DistIface::recvThreadsNum;
132 // initiate the global synchronisation
133 header.msgType = MsgType::cmdSyncReq;
134 header.sendTick = curTick();
135 header.syncRepeat = nextRepeat;
136 header.needCkpt = needCkpt;
137 header.needStopSync = needStopSync;
138 if (needCkpt != ReqType::none)
139 needCkpt = ReqType::pending;
140 header.needExit = needExit;
141 if (needExit != ReqType::none)
142 needExit = ReqType::pending;
143 if (needStopSync != ReqType::none)
144 needStopSync = ReqType::pending;
145 DistIface::primary->sendCmd(header);
146 // now wait until all receiver threads complete the synchronisation
147 auto lf = [this]{ return waitNum == 0; };
148 cv.wait(sync_lock, lf);
149 // global synchronisation is done.
150 assert(isAbort || !same_tick || (nextAt == curTick()));
151 return !isAbort;
152 }
153
154
155 bool
156 DistIface::SyncSwitch::run(bool same_tick)
157 {
158 std::unique_lock<std::mutex> sync_lock(lock);
159 Header header;
160 // Wait for the sync requests from the nodes
161 if (waitNum > 0) {
162 auto lf = [this]{ return waitNum == 0; };
163 cv.wait(sync_lock, lf);
164 }
165 assert(waitNum == 0);
166 if (isAbort) // sync aborted
167 return false;
168 assert(!same_tick || (nextAt == curTick()));
169 waitNum = numNodes;
170 // Complete the global synchronisation
171 header.msgType = MsgType::cmdSyncAck;
172 header.sendTick = nextAt;
173 header.syncRepeat = nextRepeat;
174 if (doCkpt || numCkptReq == numNodes) {
175 doCkpt = true;
176 header.needCkpt = ReqType::immediate;
177 numCkptReq = 0;
178 } else {
179 header.needCkpt = ReqType::none;
180 }
181 if (doExit || numExitReq == numNodes) {
182 doExit = true;
183 header.needExit = ReqType::immediate;
184 } else {
185 header.needExit = ReqType::none;
186 }
187 if (doStopSync || numStopSyncReq == numNodes) {
188 doStopSync = true;
189 numStopSyncReq = 0;
190 header.needStopSync = ReqType::immediate;
191 } else {
192 header.needStopSync = ReqType::none;
193 }
194 DistIface::primary->sendCmd(header);
195 return true;
196 }
197
198 bool
199 DistIface::SyncSwitch::progress(Tick send_tick,
200 Tick sync_repeat,
201 ReqType need_ckpt,
202 ReqType need_exit,
203 ReqType need_stop_sync)
204 {
205 std::unique_lock<std::mutex> sync_lock(lock);
206 if (isAbort) // sync aborted
207 return false;
208 assert(waitNum > 0);
209
210 if (send_tick > nextAt)
211 nextAt = send_tick;
212 if (nextRepeat > sync_repeat)
213 nextRepeat = sync_repeat;
214
215 if (need_ckpt == ReqType::collective)
216 numCkptReq++;
217 else if (need_ckpt == ReqType::immediate)
218 doCkpt = true;
219 if (need_exit == ReqType::collective)
220 numExitReq++;
221 else if (need_exit == ReqType::immediate)
222 doExit = true;
223 if (need_stop_sync == ReqType::collective)
224 numStopSyncReq++;
225 else if (need_stop_sync == ReqType::immediate)
226 doStopSync = true;
227
228 waitNum--;
229 // Notify the simulation thread if the on-going sync is complete
230 if (waitNum == 0) {
231 sync_lock.unlock();
232 cv.notify_one();
233 }
234 // The receive thread must keep alive in the switch until the node
235 // closes the connection. Thus, we always return true here.
236 return true;
237 }
238
239 bool
240 DistIface::SyncNode::progress(Tick max_send_tick,
241 Tick next_repeat,
242 ReqType do_ckpt,
243 ReqType do_exit,
244 ReqType do_stop_sync)
245 {
246 std::unique_lock<std::mutex> sync_lock(lock);
247 if (isAbort) // sync aborted
248 return false;
249 assert(waitNum > 0);
250
251 nextAt = max_send_tick;
252 nextRepeat = next_repeat;
253 doCkpt = (do_ckpt != ReqType::none);
254 doExit = (do_exit != ReqType::none);
255 doStopSync = (do_stop_sync != ReqType::none);
256
257 waitNum--;
258 // Notify the simulation thread if the on-going sync is complete
259 if (waitNum == 0) {
260 sync_lock.unlock();
261 cv.notify_one();
262 }
263 // The receive thread must finish when simulation is about to exit
264 return !doExit;
265 }
266
267 void
268 DistIface::SyncNode::requestCkpt(ReqType req)
269 {
270 std::lock_guard<std::mutex> sync_lock(lock);
271 assert(req != ReqType::none);
272 if (needCkpt != ReqType::none)
273 warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
274 if (needCkpt == ReqType::none || req == ReqType::immediate)
275 needCkpt = req;
276 }
277
278 void
279 DistIface::SyncNode::requestExit(ReqType req)
280 {
281 std::lock_guard<std::mutex> sync_lock(lock);
282 assert(req != ReqType::none);
283 if (needExit != ReqType::none)
284 warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
285 if (needExit == ReqType::none || req == ReqType::immediate)
286 needExit = req;
287 }
288
289 void
290 DistIface::Sync::drainComplete()
291 {
292 if (doCkpt) {
293 // The first DistIface object called this right before writing the
294 // checkpoint. We need to drain the underlying physical network here.
295 // Note that other gem5 peers may enter this barrier at different
296 // ticks due to draining.
297 run(false);
298 // Only the "first" DistIface object has to perform the sync
299 doCkpt = false;
300 }
301 }
302
303 void
304 DistIface::SyncNode::serialize(CheckpointOut &cp) const
305 {
306 int need_exit = static_cast<int>(needExit);
307 SERIALIZE_SCALAR(need_exit);
308 }
309
310 void
311 DistIface::SyncNode::unserialize(CheckpointIn &cp)
312 {
313 int need_exit;
314 UNSERIALIZE_SCALAR(need_exit);
315 needExit = static_cast<ReqType>(need_exit);
316 }
317
318 void
319 DistIface::SyncSwitch::serialize(CheckpointOut &cp) const
320 {
321 SERIALIZE_SCALAR(numExitReq);
322 }
323
324 void
325 DistIface::SyncSwitch::unserialize(CheckpointIn &cp)
326 {
327 UNSERIALIZE_SCALAR(numExitReq);
328 }
329
330 void
331 DistIface::SyncEvent::start()
332 {
333 // Note that this may be called either from startup() or drainResume()
334
335 // At this point, all DistIface objects has already called Sync::init() so
336 // we have a local minimum of the start tick and repeat for the periodic
337 // sync.
338 repeat = DistIface::sync->nextRepeat;
339 // Do a global barrier to agree on a common repeat value (the smallest
340 // one from all participating nodes.
341 if (!DistIface::sync->run(false))
342 panic("DistIface::SyncEvent::start() aborted\n");
343
344 assert(!DistIface::sync->doCkpt);
345 assert(!DistIface::sync->doExit);
346 assert(!DistIface::sync->doStopSync);
347 assert(DistIface::sync->nextAt >= curTick());
348 assert(DistIface::sync->nextRepeat <= repeat);
349
350 if (curTick() == 0)
351 assert(!scheduled());
352
353 // Use the maximum of the current tick for all participating nodes or a
354 // user provided starting tick.
355 if (scheduled())
356 reschedule(DistIface::sync->nextAt);
357 else
358 schedule(DistIface::sync->nextAt);
359
360 inform("Dist sync scheduled at %lu and repeats %lu\n", when(),
361 DistIface::sync->nextRepeat);
362 }
363
364 void
365 DistIface::SyncEvent::process()
366 {
367 // We may not start a global periodic sync while draining before taking a
368 // checkpoint. This is due to the possibility that peer gem5 processes
369 // may not hit the same periodic sync before they complete draining and
370 // that would make this periodic sync clash with sync called from
371 // DistIface::serialize() by other gem5 processes.
372 // We would need a 'distributed drain' solution to eliminate this
373 // restriction.
374 // Note that if draining was not triggered by checkpointing then we are
375 // fine since no extra global sync will happen (i.e. all peer gem5 will
376 // hit this periodic sync eventually).
377 panic_if(_draining && DistIface::sync->doCkpt,
378 "Distributed sync is hit while draining");
379 /*
380 * Note that this is a global event so this process method will be called
381 * by only exactly one thread.
382 */
383 /*
384 * We hold the eventq lock at this point but the receiver thread may
385 * need the lock to schedule new recv events while waiting for the
386 * dist sync to complete.
387 * Note that the other simulation threads also release their eventq
388 * locks while waiting for us due to the global event semantics.
389 */
390 {
391 EventQueue::ScopedRelease sr(curEventQueue());
392 // we do a global sync here that is supposed to happen at the same
393 // tick in all gem5 peers
394 if (!DistIface::sync->run(true))
395 return; // global sync aborted
396 // global sync completed
397 }
398 if (DistIface::sync->doCkpt)
399 exitSimLoop("checkpoint");
400 if (DistIface::sync->doExit) {
401 exitSimLoop("exit request from gem5 peers");
402 return;
403 }
404 if (DistIface::sync->doStopSync) {
405 DistIface::sync->doStopSync = false;
406 inform("synchronization disabled at %lu\n", curTick());
407
408 // The switch node needs to wait for the next sync immediately.
409 if (DistIface::isSwitch) {
410 start();
411 } else {
412 // Wake up thread contexts on non-switch nodes.
413 for (auto *tc: primary->sys->threads) {
414 if (tc->status() == ThreadContext::Suspended)
415 tc->activate();
416 else
417 warn_once("Tried to wake up thread in dist-gem5, but it "
418 "was already awake!\n");
419 }
420 }
421 return;
422 }
423 // schedule the next periodic sync
424 repeat = DistIface::sync->nextRepeat;
425 schedule(curTick() + repeat);
426 }
427
428 void
429 DistIface::RecvScheduler::init(Event *recv_done, Tick link_delay)
430 {
431 // This is called from the receiver thread when it starts running. The new
432 // receiver thread shares the event queue with the simulation thread
433 // (associated with the simulated Ethernet link).
434 curEventQueue(eventManager->eventQueue());
435
436 recvDone = recv_done;
437 linkDelay = link_delay;
438 }
439
440 Tick
441 DistIface::RecvScheduler::calcReceiveTick(Tick send_tick,
442 Tick send_delay,
443 Tick prev_recv_tick)
444 {
445 Tick recv_tick = send_tick + send_delay + linkDelay;
446 // sanity check (we need atleast a send delay long window)
447 assert(recv_tick >= prev_recv_tick + send_delay);
448 panic_if(prev_recv_tick + send_delay > recv_tick,
449 "Receive window is smaller than send delay");
450 panic_if(recv_tick <= curTick(),
451 "Simulators out of sync - missed packet receive by %llu ticks"
452 "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
453 "linkDelay: %lu )",
454 curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
455 linkDelay);
456
457 return recv_tick;
458 }
459
460 void
461 DistIface::RecvScheduler::resumeRecvTicks()
462 {
463 // Schedule pending packets asap in case link speed/delay changed when
464 // restoring from the checkpoint.
465 // This may be done during unserialize except that curTick() is unknown
466 // so we call this during drainResume().
467 // If we are not restoring from a checkppint then link latency could not
468 // change so we just return.
469 if (!ckptRestore)
470 return;
471
472 std::vector<Desc> v;
473 while (!descQueue.empty()) {
474 Desc d = descQueue.front();
475 descQueue.pop();
476 d.sendTick = curTick();
477 d.sendDelay = d.packet->simLength; // assume 1 tick/byte max link speed
478 v.push_back(d);
479 }
480
481 for (auto &d : v)
482 descQueue.push(d);
483
484 if (recvDone->scheduled()) {
485 assert(!descQueue.empty());
486 eventManager->reschedule(recvDone, curTick());
487 } else {
488 assert(descQueue.empty() && v.empty());
489 }
490 ckptRestore = false;
491 }
492
493 void
494 DistIface::RecvScheduler::pushPacket(EthPacketPtr new_packet,
495 Tick send_tick,
496 Tick send_delay)
497 {
498 // Note : this is called from the receiver thread
499 curEventQueue()->lock();
500 Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
501
502 DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
503 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
504 send_tick, send_delay, linkDelay, recv_tick);
505 // Every packet must be sent and arrive in the same quantum
506 assert(send_tick > primary->syncEvent->when() -
507 primary->syncEvent->repeat);
508 // No packet may be scheduled for receive in the arrival quantum
509 assert(send_tick + send_delay + linkDelay > primary->syncEvent->when());
510
511 // Now we are about to schedule a recvDone event for the new data packet.
512 // We use the same recvDone object for all incoming data packets. Packet
513 // descriptors are saved in the ordered queue. The currently scheduled
514 // packet is always on the top of the queue.
515 // NOTE: we use the event queue lock to protect the receive desc queue,
516 // too, which is accessed both by the receiver thread and the simulation
517 // thread.
518 descQueue.emplace(new_packet, send_tick, send_delay);
519 if (descQueue.size() == 1) {
520 assert(!recvDone->scheduled());
521 eventManager->schedule(recvDone, recv_tick);
522 } else {
523 assert(recvDone->scheduled());
524 panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
525 "Out of order packet received (recv_tick: %lu top(): %lu\n",
526 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
527 }
528 curEventQueue()->unlock();
529 }
530
531 EthPacketPtr
532 DistIface::RecvScheduler::popPacket()
533 {
534 // Note : this is called from the simulation thread when a receive done
535 // event is being processed for the link. We assume that the thread holds
536 // the event queue queue lock when this is called!
537 EthPacketPtr next_packet = descQueue.front().packet;
538 descQueue.pop();
539
540 if (descQueue.size() > 0) {
541 Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
542 descQueue.front().sendDelay,
543 curTick());
544 eventManager->schedule(recvDone, recv_tick);
545 }
546 prevRecvTick = curTick();
547 return next_packet;
548 }
549
550 void
551 DistIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const
552 {
553 SERIALIZE_SCALAR(sendTick);
554 SERIALIZE_SCALAR(sendDelay);
555 packet->serialize("rxPacket", cp);
556 }
557
558 void
559 DistIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp)
560 {
561 UNSERIALIZE_SCALAR(sendTick);
562 UNSERIALIZE_SCALAR(sendDelay);
563 packet = std::make_shared<EthPacketData>();
564 packet->unserialize("rxPacket", cp);
565 }
566
567 void
568 DistIface::RecvScheduler::serialize(CheckpointOut &cp) const
569 {
570 SERIALIZE_SCALAR(prevRecvTick);
571 // serialize the receive desc queue
572 std::queue<Desc> tmp_queue(descQueue);
573 unsigned n_desc_queue = descQueue.size();
574 assert(tmp_queue.size() == descQueue.size());
575 SERIALIZE_SCALAR(n_desc_queue);
576 for (int i = 0; i < n_desc_queue; i++) {
577 tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
578 tmp_queue.pop();
579 }
580 assert(tmp_queue.empty());
581 }
582
583 void
584 DistIface::RecvScheduler::unserialize(CheckpointIn &cp)
585 {
586 assert(descQueue.size() == 0);
587 assert(!recvDone->scheduled());
588 assert(!ckptRestore);
589
590 UNSERIALIZE_SCALAR(prevRecvTick);
591 // unserialize the receive desc queue
592 unsigned n_desc_queue;
593 UNSERIALIZE_SCALAR(n_desc_queue);
594 for (int i = 0; i < n_desc_queue; i++) {
595 Desc recv_desc;
596 recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
597 descQueue.push(recv_desc);
598 }
599 ckptRestore = true;
600 }
601
602 DistIface::DistIface(unsigned dist_rank,
603 unsigned dist_size,
604 Tick sync_start,
605 Tick sync_repeat,
606 EventManager *em,
607 bool use_pseudo_op,
608 bool is_switch, int num_nodes) :
609 syncStart(sync_start), syncRepeat(sync_repeat),
610 recvThread(nullptr), recvScheduler(em), syncStartOnPseudoOp(use_pseudo_op),
611 rank(dist_rank), size(dist_size)
612 {
613 DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
614 isPrimary = false;
615 if (primary == nullptr) {
616 assert(sync == nullptr);
617 assert(syncEvent == nullptr);
618 isSwitch = is_switch;
619 if (is_switch)
620 sync = new SyncSwitch(num_nodes);
621 else
622 sync = new SyncNode();
623 syncEvent = new SyncEvent();
624 primary = this;
625 isPrimary = true;
626 }
627 distIfaceId = distIfaceNum;
628 distIfaceNum++;
629 }
630
631 DistIface::~DistIface()
632 {
633 assert(recvThread);
634 recvThread->join();
635 delete recvThread;
636 if (distIfaceNum-- == 0) {
637 assert(syncEvent);
638 delete syncEvent;
639 assert(sync);
640 delete sync;
641 }
642 if (this == primary)
643 primary = nullptr;
644 }
645
646 void
647 DistIface::packetOut(EthPacketPtr pkt, Tick send_delay)
648 {
649 Header header;
650
651 // Prepare a dist header packet for the Ethernet packet we want to
652 // send out.
653 header.msgType = MsgType::dataDescriptor;
654 header.sendTick = curTick();
655 header.sendDelay = send_delay;
656
657 header.dataPacketLength = pkt->length;
658 header.simLength = pkt->simLength;
659
660 // Send out the packet and the meta info.
661 sendPacket(header, pkt);
662
663 DPRINTF(DistEthernetPkt,
664 "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
665 pkt->length, send_delay);
666 }
667
668 void
669 DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
670 {
671 EthPacketPtr new_packet;
672 DistHeaderPkt::Header header;
673
674 // Initialize receive scheduler parameters
675 recvScheduler.init(recv_done, link_delay);
676
677 // Main loop to wait for and process any incoming message.
678 for (;;) {
679 // recvHeader() blocks until the next dist header packet comes in.
680 if (!recvHeader(header)) {
681 // We lost connection to the peer gem5 processes most likely
682 // because one of them called m5 exit. So we stop here.
683 // Grab the eventq lock to stop the simulation thread
684 curEventQueue()->lock();
685 exitSimLoop("connection to gem5 peer got closed");
686 curEventQueue()->unlock();
687 // The simulation thread may be blocked in processing an on-going
688 // global synchronisation. Abort the sync to give the simulation
689 // thread a chance to make progress and process the exit event.
690 sync->abort();
691 // Finish receiver thread
692 break;
693 }
694
695 // We got a valid dist header packet, let's process it
696 if (header.msgType == MsgType::dataDescriptor) {
697 recvPacket(header, new_packet);
698 recvScheduler.pushPacket(new_packet,
699 header.sendTick,
700 header.sendDelay);
701 } else {
702 // everything else must be synchronisation related command
703 if (!sync->progress(header.sendTick,
704 header.syncRepeat,
705 header.needCkpt,
706 header.needExit,
707 header.needStopSync))
708 // Finish receiver thread if simulation is about to exit
709 break;
710 }
711 }
712 }
713
714 void
715 DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
716 {
717 assert(recvThread == nullptr);
718
719 recvThread = new std::thread(&DistIface::recvThreadFunc,
720 this,
721 const_cast<Event *>(recv_done),
722 link_delay);
723 recvThreadsNum++;
724 }
725
726 DrainState
727 DistIface::drain()
728 {
729 DPRINTF(DistEthernet,"DistIFace::drain() called\n");
730 // This can be called multiple times in the same drain cycle.
731 if (this == primary)
732 syncEvent->draining(true);
733 return DrainState::Drained;
734 }
735
736 void
737 DistIface::drainResume() {
738 DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
739 if (this == primary)
740 syncEvent->draining(false);
741 recvScheduler.resumeRecvTicks();
742 }
743
744 void
745 DistIface::serialize(CheckpointOut &cp) const
746 {
747 // Drain the dist interface before the checkpoint is taken. We cannot call
748 // this as part of the normal drain cycle because this dist sync has to be
749 // called exactly once after the system is fully drained.
750 sync->drainComplete();
751
752 unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
753
754 SERIALIZE_SCALAR(rank_orig);
755 SERIALIZE_SCALAR(dist_iface_id_orig);
756
757 recvScheduler.serializeSection(cp, "recvScheduler");
758 if (this == primary) {
759 sync->serializeSection(cp, "Sync");
760 }
761 }
762
763 void
764 DistIface::unserialize(CheckpointIn &cp)
765 {
766 unsigned rank_orig, dist_iface_id_orig;
767 UNSERIALIZE_SCALAR(rank_orig);
768 UNSERIALIZE_SCALAR(dist_iface_id_orig);
769
770 panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
771 rank, rank_orig);
772 panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
773 "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
774 dist_iface_id_orig);
775
776 recvScheduler.unserializeSection(cp, "recvScheduler");
777 if (this == primary) {
778 sync->unserializeSection(cp, "Sync");
779 }
780 }
781
782 void
783 DistIface::init(const Event *done_event, Tick link_delay)
784 {
785 // Init hook for the underlaying message transport to setup/finalize
786 // communication channels
787 initTransport();
788
789 // Spawn a new receiver thread that will process messages
790 // coming in from peer gem5 processes.
791 // The receive thread will also schedule a (receive) doneEvent
792 // for each incoming data packet.
793 spawnRecvThread(done_event, link_delay);
794
795
796 // Adjust the periodic sync start and interval. Different DistIface
797 // might have different requirements. The singleton sync object
798 // will select the minimum values for both params.
799 assert(sync != nullptr);
800 sync->init(syncStart, syncRepeat);
801
802 // Initialize the seed for random generator to avoid the same sequence
803 // in all gem5 peer processes
804 assert(primary != nullptr);
805 if (this == primary)
806 random_mt.init(5489 * (rank+1) + 257);
807 }
808
809 void
810 DistIface::startup()
811 {
812 DPRINTF(DistEthernet, "DistIface::startup() started\n");
813 // Schedule synchronization unless we are not a switch in pseudo_op mode.
814 if (this == primary && (!syncStartOnPseudoOp || isSwitch))
815 syncEvent->start();
816 DPRINTF(DistEthernet, "DistIface::startup() done\n");
817 }
818
819 bool
820 DistIface::readyToCkpt(Tick delay, Tick period)
821 {
822 bool ret = true;
823 DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
824 "period:%lu\n", delay, period);
825 if (primary) {
826 if (delay == 0) {
827 inform("m5 checkpoint called with zero delay => triggering collaborative "
828 "checkpoint\n");
829 sync->requestCkpt(ReqType::collective);
830 } else {
831 inform("m5 checkpoint called with non-zero delay => triggering immediate "
832 "checkpoint (at the next sync)\n");
833 sync->requestCkpt(ReqType::immediate);
834 }
835 if (period != 0)
836 inform("Non-zero period for m5_ckpt is ignored in "
837 "distributed gem5 runs\n");
838 ret = false;
839 }
840 return ret;
841 }
842
843 void
844 DistIface::SyncNode::requestStopSync(ReqType req)
845 {
846 std::lock_guard<std::mutex> sync_lock(lock);
847 needStopSync = req;
848 }
849
850 void
851 DistIface::toggleSync(ThreadContext *tc)
852 {
853 // Unforunate that we have to populate the system pointer member this way.
854 primary->sys = tc->getSystemPtr();
855
856 // The invariant for both syncing and "unsyncing" is that all threads will
857 // stop executing intructions until the desired sync state has been reached
858 // for all nodes. This is the easiest way to prevent deadlock (in the case
859 // of "unsyncing") and causality errors (in the case of syncing).
860 if (primary->syncEvent->scheduled()) {
861 inform("Request toggling syncronization off\n");
862 primary->sync->requestStopSync(ReqType::collective);
863
864 // At this point, we have no clue when everyone will reach the sync
865 // stop point. Suspend execution of all local thread contexts.
866 // Dist-gem5 will reactivate all thread contexts when everyone has
867 // reached the sync stop point.
868 #if THE_ISA != NULL_ISA
869 for (auto *tc: primary->sys->threads) {
870 if (tc->status() == ThreadContext::Active)
871 tc->quiesce();
872 }
873 #endif
874 } else {
875 inform("Request toggling syncronization on\n");
876 primary->syncEvent->start();
877
878 // We need to suspend all CPUs until the sync point is reached by all
879 // nodes to prevent causality errors. We can also schedule CPU
880 // activation here, since we know exactly when the next sync will
881 // occur.
882 #if THE_ISA != NULL_ISA
883 for (auto *tc: primary->sys->threads) {
884 if (tc->status() == ThreadContext::Active)
885 tc->quiesceTick(primary->syncEvent->when() + 1);
886 }
887 #endif
888 }
889 }
890
891 bool
892 DistIface::readyToExit(Tick delay)
893 {
894 bool ret = true;
895 DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
896 delay);
897 if (primary) {
898 // To successfully coordinate an exit, all nodes must be synchronising
899 if (!primary->syncEvent->scheduled())
900 primary->syncEvent->start();
901
902 if (delay == 0) {
903 inform("m5 exit called with zero delay => triggering collaborative "
904 "exit\n");
905 sync->requestExit(ReqType::collective);
906 } else {
907 inform("m5 exit called with non-zero delay => triggering immediate "
908 "exit (at the next sync)\n");
909 sync->requestExit(ReqType::immediate);
910 }
911 ret = false;
912 }
913 return ret;
914 }
915
916 uint64_t
917 DistIface::rankParam()
918 {
919 uint64_t val;
920 if (primary) {
921 val = primary->rank;
922 } else {
923 warn("Dist-rank parameter is queried in single gem5 simulation.");
924 val = 0;
925 }
926 return val;
927 }
928
929 uint64_t
930 DistIface::sizeParam()
931 {
932 uint64_t val;
933 if (primary) {
934 val = primary->size;
935 } else {
936 warn("Dist-size parameter is queried in single gem5 simulation.");
937 val = 1;
938 }
939 return val;
940 }