From 5dec4e07b89786aa67ce64aadeeb14c81b3977b3 Mon Sep 17 00:00:00 2001 From: Gabor Dozsa Date: Thu, 7 Jan 2016 16:33:47 -0600 Subject: [PATCH] dev: Distributed Ethernet link for distributed gem5 simulations Distributed gem5 (abbreviated dist-gem5) is the result of the convergence effort between multi-gem5 and pd-gem5 (from Univ. of Wisconsin). It relies on the base multi-gem5 infrastructure for packet forwarding, synchronisation and checkpointing but combines those with the elaborated network switch model from pd-gem5. --HG-- rename : src/dev/net/multi_etherlink.cc => src/dev/net/dist_etherlink.cc rename : src/dev/net/multi_etherlink.hh => src/dev/net/dist_etherlink.hh rename : src/dev/net/multi_iface.cc => src/dev/net/dist_iface.cc rename : src/dev/net/multi_iface.hh => src/dev/net/dist_iface.hh rename : src/dev/net/multi_packet.hh => src/dev/net/dist_packet.hh --- src/dev/net/Ethernet.py | 15 +- src/dev/net/SConscript | 12 +- .../{multi_etherlink.cc => dist_etherlink.cc} | 139 +-- .../{multi_etherlink.hh => dist_etherlink.hh} | 73 +- src/dev/net/dist_iface.cc | 808 ++++++++++++++++++ src/dev/net/dist_iface.hh | 595 +++++++++++++ .../net/{multi_packet.hh => dist_packet.hh} | 90 +- src/dev/net/etherpkt.cc | 16 - src/dev/net/etherpkt.hh | 12 - src/dev/net/multi_iface.cc | 622 -------------- src/dev/net/multi_iface.hh | 492 ----------- src/dev/net/tcp_iface.cc | 265 +++++- src/dev/net/tcp_iface.hh | 77 +- src/sim/global_event.hh | 2 +- .../multi_packet.cc => sim/initparam_keys.hh} | 82 +- src/sim/pseudo_inst.cc | 25 +- util/multi/Makefile | 63 -- util/multi/tcp_server.cc | 463 ---------- util/multi/tcp_server.hh | 254 ------ 19 files changed, 1879 insertions(+), 2226 deletions(-) rename src/dev/net/{multi_etherlink.cc => dist_etherlink.cc} (60%) rename src/dev/net/{multi_etherlink.hh => dist_etherlink.hh} (77%) create mode 100644 src/dev/net/dist_iface.cc create mode 100644 src/dev/net/dist_iface.hh rename src/dev/net/{multi_packet.hh => dist_packet.hh} (57%) delete mode 100644 src/dev/net/multi_iface.cc delete mode 100644 src/dev/net/multi_iface.hh rename src/{dev/net/multi_packet.cc => sim/initparam_keys.hh} (64%) delete mode 100644 util/multi/Makefile delete mode 100644 util/multi/tcp_server.cc delete mode 100644 util/multi/tcp_server.hh diff --git a/src/dev/net/Ethernet.py b/src/dev/net/Ethernet.py index 9859857a0..5f878ea10 100644 --- a/src/dev/net/Ethernet.py +++ b/src/dev/net/Ethernet.py @@ -58,19 +58,22 @@ class EtherLink(EtherObject): speed = Param.NetworkBandwidth('1Gbps', "link speed") dump = Param.EtherDump(NULL, "dump object") -class MultiEtherLink(EtherObject): - type = 'MultiEtherLink' - cxx_header = "dev/net/multi_etherlink.hh" +class DistEtherLink(EtherObject): + type = 'DistEtherLink' + cxx_header = "dev/net/dist_etherlink.hh" int0 = SlavePort("interface 0") delay = Param.Latency('0us', "packet transmit delay") delay_var = Param.Latency('0ns', "packet transmit delay variability") speed = Param.NetworkBandwidth('1Gbps', "link speed") dump = Param.EtherDump(NULL, "dump object") - multi_rank = Param.UInt32('0', "Rank of the this gem5 process (multi run)") - sync_start = Param.Latency('5200000000000t', "first multi sync barrier") - sync_repeat = Param.Latency('10us', "multi sync barrier repeat") + dist_rank = Param.UInt32('0', "Rank of this gem5 process (dist run)") + dist_size = Param.UInt32('1', "Number of gem5 processes (dist run)") + sync_start = Param.Latency('5200000000000t', "first dist sync barrier") + sync_repeat = Param.Latency('10us', "dist sync barrier repeat") server_name = Param.String('localhost', "Message server name") server_port = Param.UInt32('2200', "Message server port") + is_switch = Param.Bool(False, "true if this a link in etherswitch") + num_nodes = Param.UInt32('2', "Number of simulate nodes") class EtherBus(EtherObject): type = 'EtherBus' diff --git a/src/dev/net/SConscript b/src/dev/net/SConscript index f529a1b2a..9ad8eec92 100644 --- a/src/dev/net/SConscript +++ b/src/dev/net/SConscript @@ -70,14 +70,14 @@ DebugFlag('EthernetIntr') DebugFlag('EthernetPIO') DebugFlag('EthernetSM') -# Multi gem5 -Source('multi_packet.cc') -Source('multi_iface.cc') -Source('multi_etherlink.cc') +# Dist gem5 +Source('dist_iface.cc') +Source('dist_etherlink.cc') Source('tcp_iface.cc') -DebugFlag('MultiEthernet') -DebugFlag('MultiEthernetPkt') +DebugFlag('DistEthernet') +DebugFlag('DistEthernetPkt') +DebugFlag('DistEthernetCmd') # Ethernet controllers Source('i8254xGBe.cc') diff --git a/src/dev/net/multi_etherlink.cc b/src/dev/net/dist_etherlink.cc similarity index 60% rename from src/dev/net/multi_etherlink.cc rename to src/dev/net/dist_etherlink.cc index cf4300ddf..a793739f8 100644 --- a/src/dev/net/multi_etherlink.cc +++ b/src/dev/net/dist_etherlink.cc @@ -38,10 +38,10 @@ */ /* @file - * Device module for a full duplex ethernet link for multi gem5 simulations. + * Device module for a full duplex ethernet link for dist gem5 simulations. */ -#include "dev/net/multi_etherlink.hh" +#include "dev/net/dist_etherlink.hh" #include #include @@ -54,15 +54,15 @@ #include "base/random.hh" #include "base/trace.hh" +#include "debug/DistEthernet.hh" +#include "debug/DistEthernetPkt.hh" #include "debug/EthernetData.hh" -#include "debug/MultiEthernet.hh" -#include "debug/MultiEthernetPkt.hh" +#include "dev/net/dist_iface.hh" #include "dev/net/etherdump.hh" #include "dev/net/etherint.hh" #include "dev/net/etherlink.hh" #include "dev/net/etherobject.hh" #include "dev/net/etherpkt.hh" -#include "dev/net/multi_iface.hh" #include "dev/net/tcp_iface.hh" #include "params/EtherLink.hh" #include "sim/core.hh" @@ -71,33 +71,45 @@ using namespace std; -MultiEtherLink::MultiEtherLink(const Params *p) - : EtherObject(p) +DistEtherLink::DistEtherLink(const Params *p) + : EtherObject(p), linkDelay(p->delay) { - DPRINTF(MultiEthernet,"MultiEtherLink::MultiEtherLink() " - "link delay:%llu\n", p->delay); + DPRINTF(DistEthernet,"DistEtherLink::DistEtherLink() " + "link delay:%llu ticksPerByte:%f\n", p->delay, p->speed); txLink = new TxLink(name() + ".link0", this, p->speed, p->delay_var, p->dump); rxLink = new RxLink(name() + ".link1", this, p->delay, p->dump); - // create the multi (TCP) interface to talk to the peer gem5 processes. - multiIface = new TCPIface(p->server_name, p->server_port, p->multi_rank, - p->sync_start, p->sync_repeat, this); + Tick sync_repeat; + if (p->sync_repeat != 0) { + if (p->sync_repeat != p->delay) + warn("DistEtherLink(): sync_repeat is %lu and linkdelay is %lu", + p->sync_repeat, p->delay); + sync_repeat = p->sync_repeat; + } else { + sync_repeat = p->delay; + } + + // create the dist (TCP) interface to talk to the peer gem5 processes. + distIface = new TCPIface(p->server_name, p->server_port, + p->dist_rank, p->dist_size, + p->sync_start, sync_repeat, this, p->is_switch, + p->num_nodes); - localIface = new LocalIface(name() + ".int0", txLink, rxLink, multiIface); + localIface = new LocalIface(name() + ".int0", txLink, rxLink, distIface); } -MultiEtherLink::~MultiEtherLink() +DistEtherLink::~DistEtherLink() { delete txLink; delete rxLink; delete localIface; - delete multiIface; + delete distIface; } EtherInt* -MultiEtherLink::getEthPort(const std::string &if_name, int idx) +DistEtherLink::getEthPort(const std::string &if_name, int idx) { if (if_name != "int0") { return nullptr; @@ -107,66 +119,55 @@ MultiEtherLink::getEthPort(const std::string &if_name, int idx) return localIface; } -void MultiEtherLink::memWriteback() -{ - DPRINTF(MultiEthernet,"MultiEtherLink::memWriteback() called\n"); - multiIface->drainDone(); -} - void -MultiEtherLink::serialize(CheckpointOut &cp) const +DistEtherLink::serialize(CheckpointOut &cp) const { - multiIface->serialize("multiIface", cp); - txLink->serialize("txLink", cp); - rxLink->serialize("rxLink", cp); + distIface->serializeSection(cp, "distIface"); + txLink->serializeSection(cp, "txLink"); + rxLink->serializeSection(cp, "rxLink"); } void -MultiEtherLink::unserialize(CheckpointIn &cp) +DistEtherLink::unserialize(CheckpointIn &cp) { - multiIface->unserialize("multiIface", cp); - txLink->unserialize("txLink", cp); - rxLink->unserialize("rxLink", cp); + distIface->unserializeSection(cp, "distIface"); + txLink->unserializeSection(cp, "txLink"); + rxLink->unserializeSection(cp, "rxLink"); } void -MultiEtherLink::init() +DistEtherLink::init() { - DPRINTF(MultiEthernet,"MultiEtherLink::init() called\n"); - multiIface->initRandom(); + DPRINTF(DistEthernet,"DistEtherLink::init() called\n"); + distIface->init(rxLink->doneEvent(), linkDelay); } void -MultiEtherLink::startup() +DistEtherLink::startup() { - DPRINTF(MultiEthernet,"MultiEtherLink::startup() called\n"); - multiIface->startPeriodicSync(); + DPRINTF(DistEthernet,"DistEtherLink::startup() called\n"); + distIface->startup(); } void -MultiEtherLink::RxLink::setMultiInt(MultiIface *m) +DistEtherLink::RxLink::setDistInt(DistIface *m) { - assert(!multiIface); - multiIface = m; - // Spawn a new receiver thread that will process messages - // coming in from peer gem5 processes. - // The receive thread will also schedule a (receive) doneEvent - // for each incoming data packet. - multiIface->spawnRecvThread(&doneEvent, linkDelay); + assert(!distIface); + distIface = m; } void -MultiEtherLink::RxLink::rxDone() +DistEtherLink::RxLink::rxDone() { assert(!busy()); // retrieve the packet that triggered the receive done event - packet = multiIface->packetIn(); + packet = distIface->packetIn(); if (dump) dump->dump(packet); - DPRINTF(MultiEthernetPkt, "MultiEtherLink::MultiLink::rxDone() " + DPRINTF(DistEthernetPkt, "DistEtherLink::DistLink::rxDone() " "packet received: len=%d\n", packet->length); DDUMP(EthernetData, packet->data, packet->length); @@ -176,7 +177,7 @@ MultiEtherLink::RxLink::rxDone() } void -MultiEtherLink::TxLink::txDone() +DistEtherLink::TxLink::txDone() { if (dump) dump->dump(packet); @@ -188,10 +189,10 @@ MultiEtherLink::TxLink::txDone() } bool -MultiEtherLink::TxLink::transmit(EthPacketPtr pkt) +DistEtherLink::TxLink::transmit(EthPacketPtr pkt) { if (busy()) { - DPRINTF(MultiEthernet, "packet not sent, link busy\n"); + DPRINTF(DistEthernet, "packet not sent, link busy\n"); return false; } @@ -201,8 +202,8 @@ MultiEtherLink::TxLink::transmit(EthPacketPtr pkt) delay += random_mt.random(0, delayVar); // send the packet to the peers - assert(multiIface); - multiIface->packetOut(pkt, delay); + assert(distIface); + distIface->packetOut(pkt, delay); // schedule the send done event parent->schedule(doneEvent, curTick() + delay); @@ -211,56 +212,56 @@ MultiEtherLink::TxLink::transmit(EthPacketPtr pkt) } void -MultiEtherLink::Link::serialize(const string &base, CheckpointOut &cp) const +DistEtherLink::Link::serialize(CheckpointOut &cp) const { bool packet_exists = (packet != nullptr); - paramOut(cp, base + ".packet_exists", packet_exists); + SERIALIZE_SCALAR(packet_exists); if (packet_exists) - packet->serialize(base + ".packet", cp); + packet->serialize("packet", cp); bool event_scheduled = event->scheduled(); - paramOut(cp, base + ".event_scheduled", event_scheduled); + SERIALIZE_SCALAR(event_scheduled); if (event_scheduled) { Tick event_time = event->when(); - paramOut(cp, base + ".event_time", event_time); + SERIALIZE_SCALAR(event_time); } } void -MultiEtherLink::Link::unserialize(const string &base, CheckpointIn &cp) +DistEtherLink::Link::unserialize(CheckpointIn &cp) { bool packet_exists; - paramIn(cp, base + ".packet_exists", packet_exists); + UNSERIALIZE_SCALAR(packet_exists); if (packet_exists) { packet = make_shared(16384); - packet->unserialize(base + ".packet", cp); + packet->unserialize("packet", cp); } bool event_scheduled; - paramIn(cp, base + ".event_scheduled", event_scheduled); + UNSERIALIZE_SCALAR(event_scheduled); if (event_scheduled) { Tick event_time; - paramIn(cp, base + ".event_time", event_time); + UNSERIALIZE_SCALAR(event_time); parent->schedule(*event, event_time); } } -MultiEtherLink::LocalIface::LocalIface(const std::string &name, +DistEtherLink::LocalIface::LocalIface(const std::string &name, TxLink *tx, RxLink *rx, - MultiIface *m) : + DistIface *m) : EtherInt(name), txLink(tx) { tx->setLocalInt(this); rx->setLocalInt(this); - tx->setMultiInt(m); - rx->setMultiInt(m); + tx->setDistInt(m); + rx->setDistInt(m); } -MultiEtherLink * -MultiEtherLinkParams::create() +DistEtherLink * +DistEtherLinkParams::create() { - return new MultiEtherLink(this); + return new DistEtherLink(this); } diff --git a/src/dev/net/multi_etherlink.hh b/src/dev/net/dist_etherlink.hh similarity index 77% rename from src/dev/net/multi_etherlink.hh rename to src/dev/net/dist_etherlink.hh index 0a3e39bd7..e8218941a 100644 --- a/src/dev/net/multi_etherlink.hh +++ b/src/dev/net/dist_etherlink.hh @@ -38,30 +38,30 @@ */ /* @file - * Device module for a full duplex ethernet link for multi gem5 simulations. + * Device module for a full duplex ethernet link for dist gem5 simulations. * - * See comments in dev/multi_iface.hh for a generic description of multi + * See comments in dev/net/dist_iface.hh for a generic description of dist * gem5 simulations. * * This class is meant to be a drop in replacement for the EtherLink class for - * multi gem5 runs. + * dist gem5 runs. * */ -#ifndef __DEV_NET_MULTIETHERLINK_HH__ -#define __DEV_NET_MULTIETHERLINK_HH__ +#ifndef __DEV_DIST_ETHERLINK_HH__ +#define __DEV_DIST_ETHERLINK_HH__ #include #include "dev/net/etherlink.hh" -#include "params/MultiEtherLink.hh" +#include "params/DistEtherLink.hh" -class MultiIface; +class DistIface; class EthPacketData; /** * Model for a fixed bandwidth full duplex ethernet link. */ -class MultiEtherLink : public EtherObject +class DistEtherLink : public EtherObject { protected: class LocalIface; @@ -72,22 +72,22 @@ class MultiEtherLink : public EtherObject * The link will encapsulate and transfer Ethernet packets to/from * the message server. */ - class Link + class Link : public Serializable { protected: std::string objName; - MultiEtherLink *parent; + DistEtherLink *parent; LocalIface *localIface; EtherDump *dump; - MultiIface *multiIface; + DistIface *distIface; Event *event; EthPacketPtr packet; public: - Link(const std::string &name, MultiEtherLink *p, + Link(const std::string &name, DistEtherLink *p, EtherDump *d, Event *e) : objName(name), parent(p), localIface(nullptr), dump(d), - multiIface(nullptr), event(e) {} + distIface(nullptr), event(e) {} ~Link() {} @@ -95,8 +95,8 @@ class MultiEtherLink : public EtherObject bool busy() const { return (bool)packet; } void setLocalInt(LocalIface *i) { assert(!localIface); localIface=i; } - void serialize(const std::string &base, CheckpointOut &cp) const; - void unserialize(const std::string &base, CheckpointIn &cp); + void serialize(CheckpointOut &cp) const override; + void unserialize(CheckpointIn &cp) override; }; /** @@ -123,17 +123,17 @@ class MultiEtherLink : public EtherObject DoneEvent doneEvent; public: - TxLink(const std::string &name, MultiEtherLink *p, + TxLink(const std::string &name, DistEtherLink *p, double invBW, Tick delay_var, EtherDump *d) : Link(name, p, d, &doneEvent), ticksPerByte(invBW), delayVar(delay_var), doneEvent(this) {} ~TxLink() {} /** - * Register the multi interface to be used to talk to the + * Register the dist interface to be used to talk to the * peer gem5 processes. */ - void setMultiInt(MultiIface *m) { assert(!multiIface); multiIface=m; } + void setDistInt(DistIface *m) { assert(!distIface); distIface=m; } /** * Initiate sending of a packet via this link. @@ -158,23 +158,27 @@ class MultiEtherLink : public EtherObject /** * Receive done callback method. Called from doneEvent. */ - void rxDone() ; + void rxDone(); typedef EventWrapper DoneEvent; friend void DoneEvent::process(); - DoneEvent doneEvent; + DoneEvent _doneEvent; public: - RxLink(const std::string &name, MultiEtherLink *p, + RxLink(const std::string &name, DistEtherLink *p, Tick delay, EtherDump *d) : - Link(name, p, d, &doneEvent), - linkDelay(delay), doneEvent(this) {} + Link(name, p, d, &_doneEvent), + linkDelay(delay), _doneEvent(this) {} ~RxLink() {} /** - * Register our multi interface to talk to the peer gem5 processes. + * Register our dist interface to talk to the peer gem5 processes. + */ + void setDistInt(DistIface *m); + /** + * Done events will be scheduled by DistIface (so we need the accessor) */ - void setMultiInt(MultiIface *m); + const DoneEvent *doneEvent() const { return &_doneEvent; } }; /** @@ -187,7 +191,7 @@ class MultiEtherLink : public EtherObject public: LocalIface(const std::string &name, TxLink *tx, RxLink *rx, - MultiIface *m); + DistIface *m); bool recvPacket(EthPacketPtr pkt) { return txLink->transmit(pkt); } void sendDone() { peer->sendDone(); } @@ -199,7 +203,7 @@ class MultiEtherLink : public EtherObject /** * Interface to talk to the peer gem5 processes. */ - MultiIface *multiIface; + DistIface *distIface; /** * Send link */ @@ -210,10 +214,12 @@ class MultiEtherLink : public EtherObject RxLink *rxLink; LocalIface *localIface; + Tick linkDelay; + public: - typedef MultiEtherLinkParams Params; - MultiEtherLink(const Params *p); - ~MultiEtherLink(); + typedef DistEtherLinkParams Params; + DistEtherLink(const Params *p); + ~DistEtherLink(); const Params * params() const @@ -224,12 +230,11 @@ class MultiEtherLink : public EtherObject virtual EtherInt *getEthPort(const std::string &if_name, int idx) override; - void memWriteback() override; - void init() override; - void startup() override; + virtual void init() override; + virtual void startup() override; void serialize(CheckpointOut &cp) const override; void unserialize(CheckpointIn &cp) override; }; -#endif // __DEV_NET_MULTIETHERLINK_HH__ +#endif // __DEV_DIST_ETHERLINK_HH__ diff --git a/src/dev/net/dist_iface.cc b/src/dev/net/dist_iface.cc new file mode 100644 index 000000000..45ce651a9 --- /dev/null +++ b/src/dev/net/dist_iface.cc @@ -0,0 +1,808 @@ +/* + * Copyright (c) 2015 ARM Limited + * All rights reserved + * + * The license below extends only to copyright in the software and shall + * not be construed as granting a license to any other intellectual + * property including but not limited to intellectual property relating + * to a hardware implementation of the functionality of the software + * licensed hereunder. You may use the software subject to the license + * terms below provided that you ensure that this notice is replicated + * unmodified and in its entirety in all distributions of the software, + * modified or unmodified, in source code or in binary form. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer; + * redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution; + * neither the name of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Authors: Gabor Dozsa + */ + +/* @file + * The interface class for dist-gem5 simulations. + */ + +#include "dev/net/dist_iface.hh" + +#include +#include + +#include "base/random.hh" +#include "base/trace.hh" +#include "debug/DistEthernet.hh" +#include "debug/DistEthernetPkt.hh" +#include "dev/net/etherpkt.hh" +#include "sim/sim_exit.hh" +#include "sim/sim_object.hh" + +using namespace std; +DistIface::Sync *DistIface::sync = nullptr; +DistIface::SyncEvent *DistIface::syncEvent = nullptr; +unsigned DistIface::distIfaceNum = 0; +unsigned DistIface::recvThreadsNum = 0; +DistIface *DistIface::master = nullptr; + +void +DistIface::Sync::init(Tick start_tick, Tick repeat_tick) +{ + if (start_tick < firstAt) { + firstAt = start_tick; + inform("Next dist synchronisation tick is changed to %lu.\n", nextAt); + } + + if (repeat_tick == 0) + panic("Dist synchronisation interval must be greater than zero"); + + if (repeat_tick < nextRepeat) { + nextRepeat = repeat_tick; + inform("Dist synchronisation interval is changed to %lu.\n", + nextRepeat); + } +} + +DistIface::SyncSwitch::SyncSwitch(int num_nodes) +{ + numNodes = num_nodes; + waitNum = num_nodes; + numExitReq = 0; + numCkptReq = 0; + doExit = false; + doCkpt = false; + firstAt = std::numeric_limits::max(); + nextAt = 0; + nextRepeat = std::numeric_limits::max(); +} + +DistIface::SyncNode::SyncNode() +{ + waitNum = 0; + needExit = ReqType::none; + needCkpt = ReqType::none; + doExit = false; + doCkpt = false; + firstAt = std::numeric_limits::max(); + nextAt = 0; + nextRepeat = std::numeric_limits::max(); +} + +void +DistIface::SyncNode::run(bool same_tick) +{ + std::unique_lock sync_lock(lock); + Header header; + + assert(waitNum == 0); + waitNum = DistIface::recvThreadsNum; + // initiate the global synchronisation + header.msgType = MsgType::cmdSyncReq; + header.sendTick = curTick(); + header.syncRepeat = nextRepeat; + header.needCkpt = needCkpt; + if (needCkpt != ReqType::none) + needCkpt = ReqType::pending; + header.needExit = needExit; + if (needExit != ReqType::none) + needExit = ReqType::pending; + DistIface::master->sendCmd(header); + // now wait until all receiver threads complete the synchronisation + auto lf = [this]{ return waitNum == 0; }; + cv.wait(sync_lock, lf); + // global synchronisation is done + assert(!same_tick || (nextAt == curTick())); +} + + +void +DistIface::SyncSwitch::run(bool same_tick) +{ + std::unique_lock sync_lock(lock); + Header header; + // Wait for the sync requests from the nodes + if (waitNum > 0) { + auto lf = [this]{ return waitNum == 0; }; + cv.wait(sync_lock, lf); + } + assert(waitNum == 0); + assert(!same_tick || (nextAt == curTick())); + waitNum = numNodes; + // Complete the global synchronisation + header.msgType = MsgType::cmdSyncAck; + header.sendTick = nextAt; + header.syncRepeat = nextRepeat; + if (doCkpt || numCkptReq == numNodes) { + doCkpt = true; + header.needCkpt = ReqType::immediate; + numCkptReq = 0; + } else { + header.needCkpt = ReqType::none; + } + if (doExit || numExitReq == numNodes) { + doExit = true; + header.needExit = ReqType::immediate; + } else { + header.needExit = ReqType::none; + } + DistIface::master->sendCmd(header); +} + +void +DistIface::SyncSwitch::progress(Tick send_tick, + Tick sync_repeat, + ReqType need_ckpt, + ReqType need_exit) +{ + std::unique_lock sync_lock(lock); + assert(waitNum > 0); + + if (send_tick > nextAt) + nextAt = send_tick; + if (nextRepeat > sync_repeat) + nextRepeat = sync_repeat; + + if (need_ckpt == ReqType::collective) + numCkptReq++; + else if (need_ckpt == ReqType::immediate) + doCkpt = true; + if (need_exit == ReqType::collective) + numExitReq++; + else if (need_exit == ReqType::immediate) + doExit = true; + + waitNum--; + // Notify the simulation thread if the on-going sync is complete + if (waitNum == 0) { + sync_lock.unlock(); + cv.notify_one(); + } +} + +void +DistIface::SyncNode::progress(Tick max_send_tick, + Tick next_repeat, + ReqType do_ckpt, + ReqType do_exit) +{ + std::unique_lock sync_lock(lock); + assert(waitNum > 0); + + nextAt = max_send_tick; + nextRepeat = next_repeat; + doCkpt = (do_ckpt != ReqType::none); + doExit = (do_exit != ReqType::none); + + waitNum--; + // Notify the simulation thread if the on-going sync is complete + if (waitNum == 0) { + sync_lock.unlock(); + cv.notify_one(); + } +} + +void +DistIface::SyncNode::requestCkpt(ReqType req) +{ + std::lock_guard sync_lock(lock); + assert(req != ReqType::none); + if (needCkpt != ReqType::none) + warn("Ckpt requested multiple times (req:%d)\n", static_cast(req)); + if (needCkpt == ReqType::none || req == ReqType::immediate) + needCkpt = req; +} + +void +DistIface::SyncNode::requestExit(ReqType req) +{ + std::lock_guard sync_lock(lock); + assert(req != ReqType::none); + if (needExit != ReqType::none) + warn("Exit requested multiple times (req:%d)\n", static_cast(req)); + if (needExit == ReqType::none || req == ReqType::immediate) + needExit = req; +} + +void +DistIface::Sync::drainComplete() +{ + if (doCkpt) { + // The first DistIface object called this right before writing the + // checkpoint. We need to drain the underlying physical network here. + // Note that other gem5 peers may enter this barrier at different + // ticks due to draining. + run(false); + // Only the "first" DistIface object has to perform the sync + doCkpt = false; + } +} + +void +DistIface::SyncNode::serialize(CheckpointOut &cp) const +{ + int need_exit = static_cast(needExit); + SERIALIZE_SCALAR(need_exit); +} + +void +DistIface::SyncNode::unserialize(CheckpointIn &cp) +{ + int need_exit; + UNSERIALIZE_SCALAR(need_exit); + needExit = static_cast(need_exit); +} + +void +DistIface::SyncSwitch::serialize(CheckpointOut &cp) const +{ + SERIALIZE_SCALAR(numExitReq); +} + +void +DistIface::SyncSwitch::unserialize(CheckpointIn &cp) +{ + UNSERIALIZE_SCALAR(numExitReq); +} + +void +DistIface::SyncEvent::start() +{ + // Note that this may be called either from startup() or drainResume() + + // At this point, all DistIface objects has already called Sync::init() so + // we have a local minimum of the start tick and repeat for the periodic + // sync. + Tick firstAt = DistIface::sync->firstAt; + repeat = DistIface::sync->nextRepeat; + // Do a global barrier to agree on a common repeat value (the smallest + // one from all participating nodes + DistIface::sync->run(curTick() == 0); + + assert(!DistIface::sync->doCkpt); + assert(!DistIface::sync->doExit); + assert(DistIface::sync->nextAt >= curTick()); + assert(DistIface::sync->nextRepeat <= repeat); + + // if this is called at tick 0 then we use the config start param otherwise + // the maximum of the current tick of all participating nodes + if (curTick() == 0) { + assert(!scheduled()); + assert(DistIface::sync->nextAt == 0); + schedule(firstAt); + } else { + if (scheduled()) + reschedule(DistIface::sync->nextAt); + else + schedule(DistIface::sync->nextAt); + } + inform("Dist sync scheduled at %lu and repeats %lu\n", when(), + DistIface::sync->nextRepeat); +} + +void +DistIface::SyncEvent::process() +{ + // We may not start a global periodic sync while draining before taking a + // checkpoint. This is due to the possibility that peer gem5 processes + // may not hit the same periodic sync before they complete draining and + // that would make this periodic sync clash with sync called from + // DistIface::serialize() by other gem5 processes. + // We would need a 'distributed drain' solution to eliminate this + // restriction. + // Note that if draining was not triggered by checkpointing then we are + // fine since no extra global sync will happen (i.e. all peer gem5 will + // hit this periodic sync eventually). + panic_if(_draining && DistIface::sync->doCkpt, + "Distributed sync is hit while draining"); + /* + * Note that this is a global event so this process method will be called + * by only exactly one thread. + */ + /* + * We hold the eventq lock at this point but the receiver thread may + * need the lock to schedule new recv events while waiting for the + * dist sync to complete. + * Note that the other simulation threads also release their eventq + * locks while waiting for us due to the global event semantics. + */ + { + EventQueue::ScopedRelease sr(curEventQueue()); + // we do a global sync here that is supposed to happen at the same + // tick in all gem5 peers + DistIface::sync->run(true); + // global sync completed + } + if (DistIface::sync->doCkpt) + exitSimLoop("checkpoint"); + if (DistIface::sync->doExit) + exitSimLoop("exit request from gem5 peers"); + + // schedule the next periodic sync + repeat = DistIface::sync->nextRepeat; + schedule(curTick() + repeat); +} + +void +DistIface::RecvScheduler::init(Event *recv_done, Tick link_delay) +{ + // This is called from the receiver thread when it starts running. The new + // receiver thread shares the event queue with the simulation thread + // (associated with the simulated Ethernet link). + curEventQueue(eventManager->eventQueue()); + + recvDone = recv_done; + linkDelay = link_delay; +} + +Tick +DistIface::RecvScheduler::calcReceiveTick(Tick send_tick, + Tick send_delay, + Tick prev_recv_tick) +{ + Tick recv_tick = send_tick + send_delay + linkDelay; + // sanity check (we need atleast a send delay long window) + assert(recv_tick >= prev_recv_tick + send_delay); + panic_if(prev_recv_tick + send_delay > recv_tick, + "Receive window is smaller than send delay"); + panic_if(recv_tick <= curTick(), + "Simulators out of sync - missed packet receive by %llu ticks" + "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu " + "linkDelay: %lu )", + curTick() - recv_tick, prev_recv_tick, send_tick, send_delay, + linkDelay); + + return recv_tick; +} + +void +DistIface::RecvScheduler::resumeRecvTicks() +{ + // Schedule pending packets asap in case link speed/delay changed when + // restoring from the checkpoint. + // This may be done during unserialize except that curTick() is unknown + // so we call this during drainResume(). + // If we are not restoring from a checkppint then link latency could not + // change so we just return. + if (!ckptRestore) + return; + + std::vector v; + while (!descQueue.empty()) { + Desc d = descQueue.front(); + descQueue.pop(); + d.sendTick = curTick(); + d.sendDelay = d.packet->size(); // assume 1 tick/byte max link speed + v.push_back(d); + } + + for (auto &d : v) + descQueue.push(d); + + if (recvDone->scheduled()) { + assert(!descQueue.empty()); + eventManager->reschedule(recvDone, curTick()); + } else { + assert(descQueue.empty() && v.empty()); + } + ckptRestore = false; +} + +void +DistIface::RecvScheduler::pushPacket(EthPacketPtr new_packet, + Tick send_tick, + Tick send_delay) +{ + // Note : this is called from the receiver thread + curEventQueue()->lock(); + Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick); + + DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket " + "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n", + send_tick, send_delay, linkDelay, recv_tick); + // Every packet must be sent and arrive in the same quantum + assert(send_tick > master->syncEvent->when() - + master->syncEvent->repeat); + // No packet may be scheduled for receive in the arrival quantum + assert(send_tick + send_delay + linkDelay > master->syncEvent->when()); + + // Now we are about to schedule a recvDone event for the new data packet. + // We use the same recvDone object for all incoming data packets. Packet + // descriptors are saved in the ordered queue. The currently scheduled + // packet is always on the top of the queue. + // NOTE: we use the event queue lock to protect the receive desc queue, + // too, which is accessed both by the receiver thread and the simulation + // thread. + descQueue.emplace(new_packet, send_tick, send_delay); + if (descQueue.size() == 1) { + assert(!recvDone->scheduled()); + eventManager->schedule(recvDone, recv_tick); + } else { + assert(recvDone->scheduled()); + panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick, + "Out of order packet received (recv_tick: %lu top(): %lu\n", + recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay); + } + curEventQueue()->unlock(); +} + +EthPacketPtr +DistIface::RecvScheduler::popPacket() +{ + // Note : this is called from the simulation thread when a receive done + // event is being processed for the link. We assume that the thread holds + // the event queue queue lock when this is called! + EthPacketPtr next_packet = descQueue.front().packet; + descQueue.pop(); + + if (descQueue.size() > 0) { + Tick recv_tick = calcReceiveTick(descQueue.front().sendTick, + descQueue.front().sendDelay, + curTick()); + eventManager->schedule(recvDone, recv_tick); + } + prevRecvTick = curTick(); + return next_packet; +} + +void +DistIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const +{ + SERIALIZE_SCALAR(sendTick); + SERIALIZE_SCALAR(sendDelay); + packet->serialize("rxPacket", cp); +} + +void +DistIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp) +{ + UNSERIALIZE_SCALAR(sendTick); + UNSERIALIZE_SCALAR(sendDelay); + packet = std::make_shared(16384); + packet->unserialize("rxPacket", cp); +} + +void +DistIface::RecvScheduler::serialize(CheckpointOut &cp) const +{ + SERIALIZE_SCALAR(prevRecvTick); + // serialize the receive desc queue + std::queue tmp_queue(descQueue); + unsigned n_desc_queue = descQueue.size(); + assert(tmp_queue.size() == descQueue.size()); + SERIALIZE_SCALAR(n_desc_queue); + for (int i = 0; i < n_desc_queue; i++) { + tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i)); + tmp_queue.pop(); + } + assert(tmp_queue.empty()); +} + +void +DistIface::RecvScheduler::unserialize(CheckpointIn &cp) +{ + assert(descQueue.size() == 0); + assert(recvDone->scheduled() == false); + assert(ckptRestore == false); + + UNSERIALIZE_SCALAR(prevRecvTick); + // unserialize the receive desc queue + unsigned n_desc_queue; + UNSERIALIZE_SCALAR(n_desc_queue); + for (int i = 0; i < n_desc_queue; i++) { + Desc recv_desc; + recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i)); + descQueue.push(recv_desc); + } + ckptRestore = true; +} + +DistIface::DistIface(unsigned dist_rank, + unsigned dist_size, + Tick sync_start, + Tick sync_repeat, + EventManager *em, + bool is_switch, int num_nodes) : + syncStart(sync_start), syncRepeat(sync_repeat), + recvThread(nullptr), recvScheduler(em), + rank(dist_rank), size(dist_size) +{ + DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank); + isMaster = false; + if (master == nullptr) { + assert(sync == nullptr); + assert(syncEvent == nullptr); + if (is_switch) + sync = new SyncSwitch(num_nodes); + else + sync = new SyncNode(); + syncEvent = new SyncEvent(); + master = this; + isMaster = true; + } + distIfaceId = distIfaceNum; + distIfaceNum++; +} + +DistIface::~DistIface() +{ + assert(recvThread); + delete recvThread; + if (this == master) { + assert(syncEvent); + delete syncEvent; + assert(sync); + delete sync; + master = nullptr; + } +} + +void +DistIface::packetOut(EthPacketPtr pkt, Tick send_delay) +{ + Header header; + + // Prepare a dist header packet for the Ethernet packet we want to + // send out. + header.msgType = MsgType::dataDescriptor; + header.sendTick = curTick(); + header.sendDelay = send_delay; + + header.dataPacketLength = pkt->size(); + + // Send out the packet and the meta info. + sendPacket(header, pkt); + + DPRINTF(DistEthernetPkt, + "DistIface::sendDataPacket() done size:%d send_delay:%llu\n", + pkt->size(), send_delay); +} + +void +DistIface::recvThreadFunc(Event *recv_done, Tick link_delay) +{ + EthPacketPtr new_packet; + DistHeaderPkt::Header header; + + // Initialize receive scheduler parameters + recvScheduler.init(recv_done, link_delay); + + // Main loop to wait for and process any incoming message. + for (;;) { + // recvHeader() blocks until the next dist header packet comes in. + if (!recvHeader(header)) { + // We lost connection to the peer gem5 processes most likely + // because one of them called m5 exit. So we stop here. + // Grab the eventq lock to stop the simulation thread + curEventQueue()->lock(); + exit_message("info", + 0, + "Message server closed connection, " + "simulation is exiting"); + } + + // We got a valid dist header packet, let's process it + if (header.msgType == MsgType::dataDescriptor) { + recvPacket(header, new_packet); + recvScheduler.pushPacket(new_packet, + header.sendTick, + header.sendDelay); + } else { + // everything else must be synchronisation related command + sync->progress(header.sendTick, + header.syncRepeat, + header.needCkpt, + header.needExit); + } + } +} + +void +DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay) +{ + assert(recvThread == nullptr); + + recvThread = new std::thread(&DistIface::recvThreadFunc, + this, + const_cast(recv_done), + link_delay); + recvThreadsNum++; +} + +DrainState +DistIface::drain() +{ + DPRINTF(DistEthernet,"DistIFace::drain() called\n"); + // This can be called multiple times in the same drain cycle. + if (this == master) + syncEvent->draining(true); + return DrainState::Drained; +} + +void +DistIface::drainResume() { + DPRINTF(DistEthernet,"DistIFace::drainResume() called\n"); + if (this == master) + syncEvent->draining(false); + recvScheduler.resumeRecvTicks(); +} + +void +DistIface::serialize(CheckpointOut &cp) const +{ + // Drain the dist interface before the checkpoint is taken. We cannot call + // this as part of the normal drain cycle because this dist sync has to be + // called exactly once after the system is fully drained. + sync->drainComplete(); + + unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId; + + SERIALIZE_SCALAR(rank_orig); + SERIALIZE_SCALAR(dist_iface_id_orig); + + recvScheduler.serializeSection(cp, "recvScheduler"); + if (this == master) { + sync->serializeSection(cp, "Sync"); + } +} + +void +DistIface::unserialize(CheckpointIn &cp) +{ + unsigned rank_orig, dist_iface_id_orig; + UNSERIALIZE_SCALAR(rank_orig); + UNSERIALIZE_SCALAR(dist_iface_id_orig); + + panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)", + rank, rank_orig); + panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch " + "at resume (distIfaceId=%d, orig=%d)", distIfaceId, + dist_iface_id_orig); + + recvScheduler.unserializeSection(cp, "recvScheduler"); + if (this == master) { + sync->unserializeSection(cp, "Sync"); + } +} + +void +DistIface::init(const Event *done_event, Tick link_delay) +{ + // Init hook for the underlaying message transport to setup/finalize + // communication channels + initTransport(); + + // Spawn a new receiver thread that will process messages + // coming in from peer gem5 processes. + // The receive thread will also schedule a (receive) doneEvent + // for each incoming data packet. + spawnRecvThread(done_event, link_delay); + + + // Adjust the periodic sync start and interval. Different DistIface + // might have different requirements. The singleton sync object + // will select the minimum values for both params. + assert(sync != nullptr); + sync->init(syncStart, syncRepeat); + + // Initialize the seed for random generator to avoid the same sequence + // in all gem5 peer processes + assert(master != nullptr); + if (this == master) + random_mt.init(5489 * (rank+1) + 257); +} + +void +DistIface::startup() +{ + DPRINTF(DistEthernet, "DistIface::startup() started\n"); + if (this == master) + syncEvent->start(); + DPRINTF(DistEthernet, "DistIface::startup() done\n"); +} + +bool +DistIface::readyToCkpt(Tick delay, Tick period) +{ + bool ret = true; + DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu " + "period:%lu\n", delay, period); + if (master) { + if (delay == 0) { + inform("m5 checkpoint called with zero delay => triggering collaborative " + "checkpoint\n"); + sync->requestCkpt(ReqType::collective); + } else { + inform("m5 checkpoint called with non-zero delay => triggering immediate " + "checkpoint (at the next sync)\n"); + sync->requestCkpt(ReqType::immediate); + } + if (period != 0) + inform("Non-zero period for m5_ckpt is ignored in " + "distributed gem5 runs\n"); + ret = false; + } + return ret; +} + +bool +DistIface::readyToExit(Tick delay) +{ + bool ret = true; + DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n", + delay); + if (master) { + if (delay == 0) { + inform("m5 exit called with zero delay => triggering collaborative " + "exit\n"); + sync->requestExit(ReqType::collective); + } else { + inform("m5 exit called with non-zero delay => triggering immediate " + "exit (at the next sync)\n"); + sync->requestExit(ReqType::immediate); + } + ret = false; + } + return ret; +} + +uint64_t +DistIface::rankParam() +{ + uint64_t val; + if (master) { + val = master->rank; + } else { + warn("Dist-rank parameter is queried in single gem5 simulation."); + val = 0; + } + return val; +} + +uint64_t +DistIface::sizeParam() +{ + uint64_t val; + if (master) { + val = master->size; + } else { + warn("Dist-size parameter is queried in single gem5 simulation."); + val = 1; + } + return val; +} diff --git a/src/dev/net/dist_iface.hh b/src/dev/net/dist_iface.hh new file mode 100644 index 000000000..e69211fb8 --- /dev/null +++ b/src/dev/net/dist_iface.hh @@ -0,0 +1,595 @@ +/* + * Copyright (c) 2015 ARM Limited + * All rights reserved + * + * The license below extends only to copyright in the software and shall + * not be construed as granting a license to any other intellectual + * property including but not limited to intellectual property relating + * to a hardware implementation of the functionality of the software + * licensed hereunder. You may use the software subject to the license + * terms below provided that you ensure that this notice is replicated + * unmodified and in its entirety in all distributions of the software, + * modified or unmodified, in source code or in binary form. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer; + * redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution; + * neither the name of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Authors: Gabor Dozsa + */ + +/* @file + * The interface class for dist gem5 simulations. + * + * dist-gem5 is an extension to gem5 to enable parallel simulation of a + * distributed system (e.g. simulation of a pool of machines + * connected by Ethernet links). A dist gem5 run consists of seperate gem5 + * processes running in parallel. Each gem5 process executes + * the simulation of a component of the simulated distributed system. + * (An example component can be a dist-core board with an Ethernet NIC.) + * The DistIface class below provides services to transfer data and + * control messages among the gem5 processes. The main such services are + * as follows. + * + * 1. Send a data packet coming from a simulated Ethernet link. The packet + * will be transferred to (all) the target(s) gem5 processes. The send + * operation is always performed by the simulation thread, i.e. the gem5 + * thread that is processing the event queue associated with the simulated + * Ethernet link. + * + * 2. Spawn a receiver thread to process messages coming in from the + * from other gem5 processes. Each simulated Ethernet link has its own + * associated receiver thread. The receiver thread saves the incoming packet + * and schedule an appropriate receive event in the event queue. + * + * 3. Schedule a global barrier event periodically to keep the gem5 + * processes in sync. + * Periodic barrier event to keep peer gem5 processes in sync. The basic idea + * is that no gem5 process can go ahead further than the simulated link + * transmission delay to ensure that a corresponding receive event can always + * be scheduled for any message coming in from a peer gem5 process. + * + * + * + * This interface is an abstract class. It can work with various low level + * send/receive service implementations (e.g. TCP/IP, MPI,...). A TCP + * stream socket version is implemented in src/dev/net/tcp_iface.[hh,cc]. + */ +#ifndef __DEV_DIST_IFACE_HH__ +#define __DEV_DIST_IFACE_HH__ + +#include +#include +#include +#include +#include + +#include "dev/net/dist_packet.hh" +#include "dev/net/etherpkt.hh" +#include "sim/core.hh" +#include "sim/drain.hh" +#include "sim/global_event.hh" +#include "sim/serialize.hh" + +class EventManager; + +/** + * The interface class to talk to peer gem5 processes. + */ +class DistIface : public Drainable, public Serializable +{ + public: + typedef DistHeaderPkt::Header Header; + + protected: + typedef DistHeaderPkt::MsgType MsgType; + typedef DistHeaderPkt::ReqType ReqType; + + private: + class SyncEvent; + /** @class Sync + * This class implements global sync operations among gem5 peer processes. + * + * @note This class is used as a singleton object (shared by all DistIface + * objects). + */ + class Sync : public Serializable + { + protected: + /** + * The lock to protect access to the Sync object. + */ + std::mutex lock; + /** + * Condition variable for the simulation thread to wait on + * until all receiver threads completes the current global + * synchronisation. + */ + std::condition_variable cv; + /** + * Number of receiver threads that not yet completed the current global + * synchronisation. + */ + unsigned waitNum; + /** + * Flag is set if exit is permitted upon sync completion + */ + bool doExit; + /** + * Flag is set if taking a ckpt is permitted upon sync completion + */ + bool doCkpt; + /** + * The repeat value for the next periodic sync + */ + Tick nextRepeat; + /** + * Tick for the very first periodic sync + */ + Tick firstAt; + /** + * Tick for the next periodic sync (if the event is not scheduled yet) + */ + Tick nextAt; + + friend class SyncEvent; + + public: + /** + * Initialize periodic sync params. + * + * @param start Start tick for dist synchronisation + * @param repeat Frequency of dist synchronisation + * + */ + void init(Tick start, Tick repeat); + /** + * Core method to perform a full dist sync. + */ + virtual void run(bool same_tick) = 0; + /** + * Callback when the receiver thread gets a sync ack message. + */ + virtual void progress(Tick send_tick, + Tick next_repeat, + ReqType do_ckpt, + ReqType do_exit) = 0; + + virtual void requestCkpt(ReqType req) = 0; + virtual void requestExit(ReqType req) = 0; + + void drainComplete(); + + virtual void serialize(CheckpointOut &cp) const override = 0; + virtual void unserialize(CheckpointIn &cp) override = 0; + }; + + class SyncNode: public Sync + { + private: + /** + * Exit requested + */ + ReqType needExit; + /** + * Ckpt requested + */ + ReqType needCkpt; + + public: + + SyncNode(); + ~SyncNode() {} + void run(bool same_tick) override; + void progress(Tick max_req_tick, + Tick next_repeat, + ReqType do_ckpt, + ReqType do_exit) override; + + void requestCkpt(ReqType req) override; + void requestExit(ReqType req) override; + + void serialize(CheckpointOut &cp) const override; + void unserialize(CheckpointIn &cp) override; + }; + + class SyncSwitch: public Sync + { + private: + /** + * Counter for recording exit requests + */ + unsigned numExitReq; + /** + * Counter for recording ckpt requests + */ + unsigned numCkptReq; + /** + * Number of connected simulated nodes + */ + unsigned numNodes; + + public: + SyncSwitch(int num_nodes); + ~SyncSwitch() {} + + void run(bool same_tick) override; + void progress(Tick max_req_tick, + Tick next_repeat, + ReqType do_ckpt, + ReqType do_exit) override; + + void requestCkpt(ReqType) override { + panic("Switch requested checkpoint"); + } + void requestExit(ReqType) override { + panic("Switch requested exit"); + } + + void serialize(CheckpointOut &cp) const override; + void unserialize(CheckpointIn &cp) override; + }; + + /** + * The global event to schedule periodic dist sync. It is used as a + * singleton object. + * + * The periodic synchronisation works as follows. + * 1. A SyncEvent is scheduled as a global event when startup() is + * called. + * 2. The process() method of the SyncEvent initiates a new barrier + * for each simulated Ethernet link. + * 3. Simulation thread(s) then waits until all receiver threads + * complete the ongoing barrier. The global sync event is done. + */ + class SyncEvent : public GlobalSyncEvent + { + private: + /** + * Flag to set when the system is draining + */ + bool _draining; + public: + /** + * Only the firstly instantiated DistIface object will + * call this constructor. + */ + SyncEvent() : GlobalSyncEvent(Sim_Exit_Pri, 0), _draining(false) {} + + ~SyncEvent() {} + /** + * Schedule the first periodic sync event. + */ + void start(); + /** + * This is a global event so process() will only be called by + * exactly one simulation thread. (See further comments in the .cc + * file.) + */ + void process() override; + + bool draining() const { return _draining; } + void draining(bool fl) { _draining = fl; } + }; + /** + * Class to encapsulate information about data packets received. + + * @note The main purpose of the class to take care of scheduling receive + * done events for the simulated network link and store incoming packets + * until they can be received by the simulated network link. + */ + class RecvScheduler : public Serializable + { + private: + /** + * Received packet descriptor. This information is used by the receive + * thread to schedule receive events and by the simulation thread to + * process those events. + */ + struct Desc : public Serializable + { + EthPacketPtr packet; + Tick sendTick; + Tick sendDelay; + + Desc() : sendTick(0), sendDelay(0) {} + Desc(EthPacketPtr p, Tick s, Tick d) : + packet(p), sendTick(s), sendDelay(d) {} + Desc(const Desc &d) : + packet(d.packet), sendTick(d.sendTick), sendDelay(d.sendDelay) {} + + void serialize(CheckpointOut &cp) const override; + void unserialize(CheckpointIn &cp) override; + }; + /** + * The queue to store the receive descriptors. + */ + std::queue descQueue; + /** + * The tick when the most recent receive event was processed. + * + * @note This information is necessary to simulate possible receiver + * link contention when calculating the receive tick for the next + * incoming data packet (see the calcReceiveTick() method) + */ + Tick prevRecvTick; + /** + * The receive done event for the simulated Ethernet link. + * + * @note This object is constructed by the simulated network link. We + * schedule this object for each incoming data packet. + */ + Event *recvDone; + /** + * The link delay in ticks for the simulated Ethernet link. + * + * @note This value is used for calculating the receive ticks for + * incoming data packets. + */ + Tick linkDelay; + /** + * The event manager associated with the simulated Ethernet link. + * + * @note It is used to access the event queue for scheduling receive + * done events for the link. + */ + EventManager *eventManager; + /** + * Calculate the tick to schedule the next receive done event. + * + * @param send_tick The tick the packet was sent. + * @param send_delay The simulated delay at the sender side. + * @param prev_recv_tick Tick when the last receive event was + * processed. + * + * @note This method tries to take into account possible receiver link + * contention and adjust receive tick for the incoming packets + * accordingly. + */ + Tick calcReceiveTick(Tick send_tick, + Tick send_delay, + Tick prev_recv_tick); + + /** + * Flag to set if receive ticks for pending packets need to be + * recalculated due to changed link latencies at a resume + */ + bool ckptRestore; + + public: + /** + * Scheduler for the incoming data packets. + * + * @param em The event manager associated with the simulated Ethernet + * link. + */ + RecvScheduler(EventManager *em) : + prevRecvTick(0), recvDone(nullptr), linkDelay(0), + eventManager(em), ckptRestore(false) {} + + /** + * Initialize network link parameters. + * + * @note This method is called from the receiver thread (see + * recvThreadFunc()). + */ + void init(Event *recv_done, Tick link_delay); + /** + * Fetch the next packet that is to be received by the simulated network + * link. + * + * @note This method is called from the process() method of the receive + * done event associated with the network link. + */ + EthPacketPtr popPacket(); + /** + * Push a newly arrived packet into the desc queue. + */ + void pushPacket(EthPacketPtr new_packet, + Tick send_tick, + Tick send_delay); + + void serialize(CheckpointOut &cp) const override; + void unserialize(CheckpointIn &cp) override; + /** + * Adjust receive ticks for pending packets when restoring from a + * checkpoint + * + * @note Link speed and delay parameters may change at resume. + */ + void resumeRecvTicks(); + }; + /** + * Tick to schedule the first dist sync event. + * This is just as optimization : we do not need any dist sync + * event until the simulated NIC is brought up by the OS. + */ + Tick syncStart; + /** + * Frequency of dist sync events in ticks. + */ + Tick syncRepeat; + /** + * Receiver thread pointer. + * Each DistIface object must have exactly one receiver thread. + */ + std::thread *recvThread; + /** + * Meta information about data packets received. + */ + RecvScheduler recvScheduler; + + protected: + /** + * The rank of this process among the gem5 peers. + */ + unsigned rank; + /** + * The number of gem5 processes comprising this dist simulation. + */ + unsigned size; + /** + * Number of DistIface objects (i.e. dist links in this gem5 process) + */ + static unsigned distIfaceNum; + /** + * Unique id for the dist link + */ + unsigned distIfaceId; + + bool isMaster; + + private: + /** + * Number of receiver threads (in this gem5 process) + */ + static unsigned recvThreadsNum; + /** + * The singleton Sync object to perform dist synchronisation. + */ + static Sync *sync; + /** + * The singleton SyncEvent object to schedule periodic dist sync. + */ + static SyncEvent *syncEvent; + /** + * The very first DistIface object created becomes the master. We need + * a master to co-ordinate the global synchronisation. + */ + static DistIface *master; + + private: + /** + * Send out a data packet to the remote end. + * @param header Meta info about the packet (which needs to be transferred + * to the destination alongside the packet). + * @param packet Pointer to the packet to send. + */ + virtual void sendPacket(const Header &header, const EthPacketPtr &packet) = 0; + /** + * Send out a control command to the remote end. + * @param header Meta info describing the command (e.g. sync request) + */ + virtual void sendCmd(const Header &header) = 0; + /** + * Receive a header (i.e. meta info describing a data packet or a control command) + * from the remote end. + * @param header The meta info structure to store the incoming header. + */ + virtual bool recvHeader(Header &header) = 0; + /** + * Receive a packet from the remote end. + * @param header Meta info about the incoming packet (obtanied by a previous + * call to the recvHedaer() method). + * @param Pointer to packet received. + */ + virtual void recvPacket(const Header &header, EthPacketPtr &packet) = 0; + /** + * Init hook for the underlaying transport + */ + virtual void initTransport() = 0; + /** + * spawn the receiver thread. + * @param recv_done The receive done event associated with the simulated + * Ethernet link. + * @param link_delay The link delay for the simulated Ethernet link. + */ + void spawnRecvThread(const Event *recv_done, Tick link_delay); + /** + * The function executed by a receiver thread. + */ + void recvThreadFunc(Event *recv_done, Tick link_delay); + + public: + + /** + * ctor + * @param dist_rank Rank of this gem5 process within the dist run + * @param sync_start Start tick for dist synchronisation + * @param sync_repeat Frequency for dist synchronisation + * @param em The event manager associated with the simulated Ethernet link + */ + DistIface(unsigned dist_rank, + unsigned dist_size, + Tick sync_start, + Tick sync_repeat, + EventManager *em, + bool is_switch, + int num_nodes); + + virtual ~DistIface(); + /** + * Send out an Ethernet packet. + * @param pkt The Ethernet packet to send. + * @param send_delay The delay in ticks for the send completion event. + */ + void packetOut(EthPacketPtr pkt, Tick send_delay); + /** + * Fetch the packet scheduled to be received next by the simulated + * network link. + * + * @note This method is called within the process() method of the link + * receive done event. It also schedules the next receive event if the + * receive queue is not empty. + */ + EthPacketPtr packetIn() { return recvScheduler.popPacket(); } + + DrainState drain() override; + void drainResume() override; + void init(const Event *e, Tick link_delay); + void startup(); + + void serialize(CheckpointOut &cp) const override; + void unserialize(CheckpointIn &cp) override; + /** + * Initiate the exit from the simulation. + * @param delay Delay param from the m5 exit command. If Delay is zero + * then a collaborative exit is requested (i.e. all nodes have to call + * this method before the distributed simulation can exit). If Delay is + * not zero then exit is requested asap (and it will happen at the next + * sync tick). + * @return False if we are in distributed mode (i.e. exit can happen only + * at sync), True otherwise. + */ + static bool readyToExit(Tick delay); + /** + * Initiate taking a checkpoint + * @param delay Delay param from the m5 checkpoint command. If Delay is + * zero then a collaborative checkpoint is requested (i.e. all nodes have + * to call this method before the checkpoint can be taken). If Delay is + * not zero then a checkpoint is requested asap (and it will happen at the + * next sync tick). + * @return False if we are in dist mode (i.e. exit can happen only at + * sync), True otherwise. + */ + static bool readyToCkpt(Tick delay, Tick period); + /** + * Getter for the dist rank param. + */ + static uint64_t rankParam(); + /** + * Getter for the dist size param. + */ + static uint64_t sizeParam(); + }; + +#endif diff --git a/src/dev/net/multi_packet.hh b/src/dev/net/dist_packet.hh similarity index 57% rename from src/dev/net/multi_packet.hh rename to src/dev/net/dist_packet.hh index 3d8e85dfa..4c079c44a 100644 --- a/src/dev/net/multi_packet.hh +++ b/src/dev/net/dist_packet.hh @@ -38,93 +38,69 @@ */ /* @file - * Header packet class for multi gem5 runs. + * Header packet class for dist-gem5 runs. * - * For a high level description about multi gem5 see comments in - * header file multi_iface.hh. + * For a high level description about dist-gem5 see comments in + * header file dist_iface.hh. * - * The MultiHeaderPkt class defines the format of message headers - * sent among gem5 processes during a multi gem5 simulation. A header packet + * The DistHeaderPkt class defines the format of message headers + * sent among gem5 processes during a dist gem5 simulation. A header packet * can either carry the description of data packet (i.e. a simulated Ethernet * packet) or a synchronisation related control command. In case of * data packet description, the corresponding data packet always follows * the header packet back-to-back. */ -#ifndef __DEV_NET_MULTI_PACKET_HH__ -#define __DEV_NET_MULTI_PACKET_HH__ +#ifndef __DEV_DIST_PACKET_HH__ +#define __DEV_DIST_PACKET_HH__ #include #include "base/types.hh" -class MultiHeaderPkt +class DistHeaderPkt { private: - MultiHeaderPkt() {} - ~MultiHeaderPkt() {} + DistHeaderPkt() {} + ~DistHeaderPkt() {} public: + enum class ReqType { immediate, collective, pending, none }; /** - * Simply type to help with calculating space requirements for - * the corresponding header field. - */ - typedef uint8_t AddressType[6]; - - /** - * The msg type defines what informarion a multi header packet carries. + * The msg type defines what information a dist header packet carries. */ enum class MsgType { dataDescriptor, - cmdPeriodicSyncReq, - cmdPeriodicSyncAck, - cmdCkptSyncReq, - cmdCkptSyncAck, - cmdAtomicSyncReq, - cmdAtomicSyncAck, + cmdSyncReq, + cmdSyncAck, unknown }; struct Header { /** - * The msg type field is valid for all header packets. In case of - * a synchronisation control command this is the only valid field. + * The msg type field is valid for all header packets. + * + * @note senderRank is used with data packets while collFlags are used + * by sync ack messages to trigger collective ckpt or exit events. */ MsgType msgType; Tick sendTick; - Tick sendDelay; - /** - * Actual length of the simulated Ethernet packet. - */ - unsigned dataPacketLength; - /** - * Source MAC address. - */ - AddressType srcAddress; - /** - * Destination MAC address. - */ - AddressType dstAddress; + union { + Tick sendDelay; + Tick syncRepeat; + }; + union { + /** + * Actual length of the simulated Ethernet packet. + */ + unsigned dataPacketLength; + struct { + ReqType needCkpt; + ReqType needExit; + }; + }; }; - - static unsigned maxAddressLength(); - - /** - * Static functions for manipulating and comparing MAC addresses. - */ - static void clearAddress(AddressType &addr); - static bool isAddressEqual(const AddressType &addr1, - const AddressType &addr2); - static bool isAddressLess(const AddressType &addr1, - const AddressType &addr2); - - static void copyAddress(AddressType &dest, - const AddressType &src); - - static bool isUnicastAddress(const AddressType &addr); - static bool isMulticastAddress(const AddressType &addr); - static bool isBroadcastAddress(const AddressType &addr); }; -#endif // __DEV_NET_MULTI_PACKET_HH__ +#endif diff --git a/src/dev/net/etherpkt.cc b/src/dev/net/etherpkt.cc index a16f572c5..f06af3306 100644 --- a/src/dev/net/etherpkt.cc +++ b/src/dev/net/etherpkt.cc @@ -53,19 +53,3 @@ EthPacketData::unserialize(const string &base, CheckpointIn &cp) arrayParamIn(cp, base + ".data", data, length); } -void -EthPacketData::packAddress(uint8_t *src_addr, - uint8_t *dst_addr, - unsigned &nbytes) -{ - Net::EthHdr *hdr = (Net::EthHdr *)data; - assert(hdr->src().size() == hdr->dst().size()); - if (nbytes < hdr->src().size()) - panic("EthPacketData::packAddress() Buffer overflow"); - - memcpy(dst_addr, hdr->dst().bytes(), hdr->dst().size()); - memcpy(src_addr, hdr->src().bytes(), hdr->src().size()); - - nbytes = hdr->src().size(); -} - diff --git a/src/dev/net/etherpkt.hh b/src/dev/net/etherpkt.hh index 4119578c3..457563293 100644 --- a/src/dev/net/etherpkt.hh +++ b/src/dev/net/etherpkt.hh @@ -71,18 +71,6 @@ class EthPacketData ~EthPacketData() { if (data) delete [] data; } public: - /** - * This function pulls out the MAC source and destination addresses from - * the packet data and stores them in the caller specified buffers. - * - * @param src_addr The buffer to store the source MAC address. - * @param dst_addr The buffer to store the destination MAC address. - * @param length This is an inout parameter. The caller stores in this - * the size of the address buffers. On return, this will contain the - * actual address size stored in the buffers. (We assume that source - * address size is equal to that of the destination address.) - */ - void packAddress(uint8_t *src_addr, uint8_t *dst_addr, unsigned &length); void serialize(const std::string &base, CheckpointOut &cp) const; void unserialize(const std::string &base, CheckpointIn &cp); diff --git a/src/dev/net/multi_iface.cc b/src/dev/net/multi_iface.cc deleted file mode 100644 index 15f69f2ac..000000000 --- a/src/dev/net/multi_iface.cc +++ /dev/null @@ -1,622 +0,0 @@ -/* - * Copyright (c) 2015 ARM Limited - * All rights reserved - * - * The license below extends only to copyright in the software and shall - * not be construed as granting a license to any other intellectual - * property including but not limited to intellectual property relating - * to a hardware implementation of the functionality of the software - * licensed hereunder. You may use the software subject to the license - * terms below provided that you ensure that this notice is replicated - * unmodified and in its entirety in all distributions of the software, - * modified or unmodified, in source code or in binary form. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer; - * redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution; - * neither the name of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * Authors: Gabor Dozsa - */ - -/* @file - * The interface class for multi gem5 simulations. - */ - -#include "dev/net/multi_iface.hh" - -#include -#include - -#include "base/random.hh" -#include "base/trace.hh" -#include "debug/MultiEthernet.hh" -#include "debug/MultiEthernetPkt.hh" -#include "dev/net/etherpkt.hh" -#include "sim/sim_exit.hh" -#include "sim/sim_object.hh" - - -MultiIface::Sync *MultiIface::sync = nullptr; -MultiIface::SyncEvent *MultiIface::syncEvent = nullptr; -unsigned MultiIface::recvThreadsNum = 0; -MultiIface *MultiIface::master = nullptr; - -bool -MultiIface::Sync::run(SyncTrigger t, Tick sync_tick) -{ - std::unique_lock sync_lock(lock); - - trigger = t; - if (trigger != SyncTrigger::periodic) { - DPRINTF(MultiEthernet,"MultiIface::Sync::run() trigger:%d\n", - (unsigned)trigger); - } - - switch (state) { - case SyncState::asyncCkpt: - switch (trigger) { - case SyncTrigger::ckpt: - assert(MultiIface::syncEvent->interrupted == false); - state = SyncState::busy; - break; - case SyncTrigger::periodic: - if (waitNum == 0) { - // So all recv threads got an async checkpoint request already - // and a simExit is scheduled at the end of the current tick - // (i.e. it is a periodic sync scheduled at the same tick as - // the simExit). - state = SyncState::idle; - DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted " - "due to async ckpt scheduled\n"); - return false; - } else { - // we still need to wait for some receiver thread to get the - // aysnc ckpt request. We are going to proceed as 'interrupted' - // periodic sync. - state = SyncState::interrupted; - DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted " - "due to ckpt request is coming in\n"); - } - break; - case SyncTrigger::atomic: - assert(trigger != SyncTrigger::atomic); - } - break; - case SyncState::idle: - state = SyncState::busy; - break; - // Only one sync can be active at any time - case SyncState::interrupted: - case SyncState::busy: - assert(state != SyncState::interrupted); - assert(state != SyncState::busy); - break; - } - // Kick-off the sync unless we are in the middle of an interrupted - // periodic sync - if (state != SyncState::interrupted) { - assert(waitNum == 0); - waitNum = MultiIface::recvThreadsNum; - // initiate the global synchronisation - assert(MultiIface::master != nullptr); - MultiIface::master->syncRaw(triggerToMsg[(unsigned)trigger], sync_tick); - } - // now wait until all receiver threads complete the synchronisation - auto lf = [this]{ return waitNum == 0; }; - cv.wait(sync_lock, lf); - - // we are done - assert(state == SyncState::busy || state == SyncState::interrupted); - bool ret = (state != SyncState::interrupted); - state = SyncState::idle; - return ret; -} - -void -MultiIface::Sync::progress(MsgType msg) -{ - std::unique_lock sync_lock(lock); - - switch (msg) { - case MsgType::cmdAtomicSyncAck: - assert(state == SyncState::busy && trigger == SyncTrigger::atomic); - break; - case MsgType::cmdPeriodicSyncAck: - assert(state == SyncState::busy && trigger == SyncTrigger::periodic); - break; - case MsgType::cmdCkptSyncAck: - assert(state == SyncState::busy && trigger == SyncTrigger::ckpt); - break; - case MsgType::cmdCkptSyncReq: - switch (state) { - case SyncState::busy: - if (trigger == SyncTrigger::ckpt) { - // We are already in a checkpoint sync but got another ckpt - // sync request. This may happen if two (or more) peer gem5 - // processes try to start a ckpt nearly at the same time. - // Incrementing waitNum here (before decrementing it below) - // effectively results in ignoring this new ckpt sync request. - waitNum++; - break; - } - assert (waitNum == recvThreadsNum); - state = SyncState::interrupted; - // we need to fall over here to handle "recvThreadsNum == 1" case - case SyncState::interrupted: - assert(trigger == SyncTrigger::periodic); - assert(waitNum >= 1); - if (waitNum == 1) { - exitSimLoop("checkpoint"); - } - break; - case SyncState::idle: - // There is no on-going sync so we got an async ckpt request. If we - // are the only receiver thread then we need to schedule the - // checkpoint. Otherwise, only change the state to 'asyncCkpt' and - // let the last receiver thread to schedule the checkpoint at the - // 'asyncCkpt' case. - // Note that a periodic or resume sync may start later and that can - // trigger a state change to 'interrupted' (so the checkpoint may - // get scheduled at 'interrupted' case finally). - assert(waitNum == 0); - state = SyncState::asyncCkpt; - waitNum = MultiIface::recvThreadsNum; - // we need to fall over here to handle "recvThreadsNum == 1" case - case SyncState::asyncCkpt: - assert(waitNum >= 1); - if (waitNum == 1) - exitSimLoop("checkpoint"); - break; - default: - panic("Unexpected state for checkpoint request message"); - break; - } - break; - default: - panic("Unknown msg type"); - break; - } - waitNum--; - assert(state != SyncState::idle); - // Notify the simultaion thread if there is an on-going sync. - if (state != SyncState::asyncCkpt) { - sync_lock.unlock(); - cv.notify_one(); - } -} - -void MultiIface::SyncEvent::start(Tick start, Tick interval) -{ - assert(!scheduled()); - if (interval == 0) - panic("Multi synchronisation period must be greater than zero"); - repeat = interval; - schedule(start); -} - -void -MultiIface::SyncEvent::adjust(Tick start_tick, Tick repeat_tick) -{ - // The new multi interface may require earlier start of the - // synchronisation. - assert(scheduled() == true); - if (start_tick < when()) - reschedule(start_tick); - // The new multi interface may require more frequent synchronisation. - if (repeat == 0) - panic("Multi synchronisation period must be greater than zero"); - if (repeat < repeat_tick) - repeat = repeat_tick; -} - -void -MultiIface::SyncEvent::process() -{ - /* - * Note that this is a global event so this process method will be called - * by only exactly one thread. - */ - // if we are draining the system then we must not start a periodic sync (as - // it is not sure that all peer gem5 will reach this tick before taking - // the checkpoint). - if (isDraining == true) { - assert(interrupted == false); - interrupted = true; - DPRINTF(MultiEthernet,"MultiIface::SyncEvent::process() interrupted " - "due to draining\n"); - return; - } - if (interrupted == false) - scheduledAt = curTick(); - /* - * We hold the eventq lock at this point but the receiver thread may - * need the lock to schedule new recv events while waiting for the - * multi sync to complete. - * Note that the other simulation threads also release their eventq - * locks while waiting for us due to the global event semantics. - */ - curEventQueue()->unlock(); - // we do a global sync here - interrupted = !MultiIface::sync->run(SyncTrigger::periodic, scheduledAt); - // Global sync completed or got interrupted. - // we are expected to exit with the eventq lock held - curEventQueue()->lock(); - // schedule the next global sync event if this one completed. Otherwise - // (i.e. this one was interrupted by a checkpoint request), we will - // reschedule this one after the draining is complete. - if (!interrupted) - schedule(scheduledAt + repeat); -} - -void MultiIface::SyncEvent::resume() -{ - Tick sync_tick; - assert(!scheduled()); - if (interrupted) { - assert(curTick() >= scheduledAt); - // We have to complete the interrupted periodic sync asap. - // Note that this sync might be interrupted now again with a checkpoint - // request from a peer gem5... - sync_tick = curTick(); - schedule(sync_tick); - } else { - // So we completed the last periodic sync, let's find out the tick for - // next one - assert(curTick() > scheduledAt); - sync_tick = scheduledAt + repeat; - if (sync_tick < curTick()) - panic("Cannot resume periodic synchronisation"); - schedule(sync_tick); - } - DPRINTF(MultiEthernet, - "MultiIface::SyncEvent periodic sync resumed at %lld " - "(curTick:%lld)\n", sync_tick, curTick()); -} - -void MultiIface::SyncEvent::serialize(const std::string &base, - CheckpointOut &cp) const -{ - // Save the periodic multi sync schedule information - paramOut(cp, base + ".periodicSyncRepeat", repeat); - paramOut(cp, base + ".periodicSyncInterrupted", interrupted); - paramOut(cp, base + ".periodicSyncAt", scheduledAt); -} - -void MultiIface::SyncEvent::unserialize(const std::string &base, - CheckpointIn &cp) -{ - paramIn(cp, base + ".periodicSyncRepeat", repeat); - paramIn(cp, base + ".periodicSyncInterrupted", interrupted); - paramIn(cp, base + ".periodicSyncAt", scheduledAt); -} - -MultiIface::MultiIface(unsigned multi_rank, - Tick sync_start, - Tick sync_repeat, - EventManager *em) : - syncStart(sync_start), syncRepeat(sync_repeat), - recvThread(nullptr), eventManager(em), recvDone(nullptr), - scheduledRecvPacket(nullptr), linkDelay(0), rank(multi_rank) -{ - DPRINTF(MultiEthernet, "MultiIface() ctor rank:%d\n",multi_rank); - if (master == nullptr) { - assert(sync == nullptr); - assert(syncEvent == nullptr); - sync = new Sync(); - syncEvent = new SyncEvent(); - master = this; - } -} - -MultiIface::~MultiIface() -{ - assert(recvThread); - delete recvThread; - if (this == master) { - assert(syncEvent); - delete syncEvent; - assert(sync); - delete sync; - } -} - -void -MultiIface::packetOut(EthPacketPtr pkt, Tick send_delay) -{ - MultiHeaderPkt::Header header_pkt; - unsigned address_length = MultiHeaderPkt::maxAddressLength(); - - // Prepare a multi header packet for the Ethernet packet we want to - // send out. - header_pkt.msgType = MsgType::dataDescriptor; - header_pkt.sendTick = curTick(); - header_pkt.sendDelay = send_delay; - - // Store also the source and destination addresses. - pkt->packAddress(header_pkt.srcAddress, header_pkt.dstAddress, - address_length); - - header_pkt.dataPacketLength = pkt->size(); - - // Send out the multi hedare packet followed by the Ethernet packet. - sendRaw(&header_pkt, sizeof(header_pkt), header_pkt.dstAddress); - sendRaw(pkt->data, pkt->size(), header_pkt.dstAddress); - DPRINTF(MultiEthernetPkt, - "MultiIface::sendDataPacket() done size:%d send_delay:%llu " - "src:0x%02x%02x%02x%02x%02x%02x " - "dst:0x%02x%02x%02x%02x%02x%02x\n", - pkt->size(), send_delay, - header_pkt.srcAddress[0], header_pkt.srcAddress[1], - header_pkt.srcAddress[2], header_pkt.srcAddress[3], - header_pkt.srcAddress[4], header_pkt.srcAddress[5], - header_pkt.dstAddress[0], header_pkt.dstAddress[1], - header_pkt.dstAddress[2], header_pkt.dstAddress[3], - header_pkt.dstAddress[4], header_pkt.dstAddress[5]); -} - -bool -MultiIface::recvHeader(MultiHeaderPkt::Header &header_pkt) -{ - // Blocking receive of an incoming multi header packet. - return recvRaw((void *)&header_pkt, sizeof(header_pkt)); -} - -void -MultiIface::recvData(const MultiHeaderPkt::Header &header_pkt) -{ - // We are here beacuse a header packet has been received implying - // that an Ethernet (data) packet is coming in next. - assert(header_pkt.msgType == MsgType::dataDescriptor); - // Allocate storage for the incoming Ethernet packet. - EthPacketPtr new_packet(new EthPacketData(header_pkt.dataPacketLength)); - // Now execute the blocking receive and store the incoming data directly - // in the new EthPacketData object. - if (! recvRaw((void *)(new_packet->data), header_pkt.dataPacketLength)) - panic("Missing data packet"); - - new_packet->length = header_pkt.dataPacketLength; - // Grab the event queue lock to schedule a new receive event for the - // data packet. - curEventQueue()->lock(); - // Compute the receive tick. It includes the send delay and the - // simulated link delay. - Tick recv_tick = header_pkt.sendTick + header_pkt.sendDelay + linkDelay; - DPRINTF(MultiEthernetPkt, "MultiIface::recvThread() packet receive, " - "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n", - header_pkt.sendTick, header_pkt.sendDelay, linkDelay, recv_tick); - - if (recv_tick <= curTick()) { - panic("Simulators out of sync - missed packet receive by %llu ticks", - curTick() - recv_tick); - } - // Now we are about to schedule a recvDone event for the new data packet. - // We use the same recvDone object for all incoming data packets. If - // that is already scheduled - i.e. a receive event for a previous - // data packet is already pending - then we have to check whether the - // receive tick for the new packet is earlier than that of the currently - // pending event. Packets may arrive out-of-order with respect to - // simulated receive time. If that is the case, we need to re-schedule the - // recvDone event for the new packet. Otherwise, we save the packet - // pointer and the recv tick for the new packet in the recvQueue. See - // the implementation of the packetIn() method for comments on how this - // information is retrieved from the recvQueue by the simulation thread. - if (!recvDone->scheduled()) { - assert(recvQueue.size() == 0); - assert(scheduledRecvPacket == nullptr); - scheduledRecvPacket = new_packet; - eventManager->schedule(recvDone, recv_tick); - } else if (recvDone->when() > recv_tick) { - recvQueue.emplace(scheduledRecvPacket, recvDone->when()); - eventManager->reschedule(recvDone, recv_tick); - scheduledRecvPacket = new_packet; - } else { - recvQueue.emplace(new_packet, recv_tick); - } - curEventQueue()->unlock(); -} - -void -MultiIface::recvThreadFunc() -{ - EthPacketPtr new_packet; - MultiHeaderPkt::Header header; - - // The new receiver thread shares the event queue with the simulation - // thread (associated with the simulated Ethernet link). - curEventQueue(eventManager->eventQueue()); - // Main loop to wait for and process any incoming message. - for (;;) { - // recvHeader() blocks until the next multi header packet comes in. - if (!recvHeader(header)) { - // We lost connection to the peer gem5 processes most likely - // because one of them called m5 exit. So we stop here. - exit_message("info", 0, "Message server closed connection, " - "simulation is exiting"); - } - // We got a valid multi header packet, let's process it - if (header.msgType == MsgType::dataDescriptor) { - recvData(header); - } else { - // everything else must be synchronisation related command - sync->progress(header.msgType); - } - } -} - -EthPacketPtr -MultiIface::packetIn() -{ - // We are called within the process() method of the recvDone event. We - // return the packet that triggered the current receive event. - // If there is further packets in the recvQueue, we also have to schedule - // the recvEvent for the next packet with the smallest receive tick. - // The priority queue container ensures that smallest receive tick is - // always on the top of the queue. - assert(scheduledRecvPacket != nullptr); - EthPacketPtr next_packet = scheduledRecvPacket; - - if (! recvQueue.empty()) { - eventManager->schedule(recvDone, recvQueue.top().second); - scheduledRecvPacket = recvQueue.top().first; - recvQueue.pop(); - } else { - scheduledRecvPacket = nullptr; - } - - return next_packet; -} - -void -MultiIface::spawnRecvThread(Event *recv_done, Tick link_delay) -{ - assert(recvThread == nullptr); - // all receive thread must be spawned before simulation starts - assert(eventManager->eventQueue()->getCurTick() == 0); - - recvDone = recv_done; - linkDelay = link_delay; - - recvThread = new std::thread(&MultiIface::recvThreadFunc, this); - - recvThreadsNum++; -} - -DrainState -MultiIface::drain() -{ - DPRINTF(MultiEthernet,"MultiIFace::drain() called\n"); - - // This can be called multiple times in the same drain cycle. - if (master == this) { - syncEvent->isDraining = true; - } - - return DrainState::Drained; -} - -void MultiIface::drainDone() { - if (master == this) { - assert(syncEvent->isDraining == true); - syncEvent->isDraining = false; - // We need to resume the interrupted periodic sync here now that the - // draining is done. If the last periodic sync completed before the - // checkpoint then the next one is already scheduled. - if (syncEvent->interrupted) - syncEvent->resume(); - } -} - -void MultiIface::serialize(const std::string &base, CheckpointOut &cp) const -{ - // Drain the multi interface before the checkpoint is taken. We cannot call - // this as part of the normal drain cycle because this multi sync has to be - // called exactly once after the system is fully drained. - // Note that every peer will take a checkpoint but they may take it at - // different ticks. - // This sync request may interrupt an on-going periodic sync in some peers. - sync->run(SyncTrigger::ckpt, curTick()); - - // Save the periodic multi sync status - syncEvent->serialize(base, cp); - - unsigned n_rx_packets = recvQueue.size(); - if (scheduledRecvPacket != nullptr) - n_rx_packets++; - - paramOut(cp, base + ".nRxPackets", n_rx_packets); - - if (n_rx_packets > 0) { - assert(recvDone->scheduled()); - scheduledRecvPacket->serialize(base + ".rxPacket[0]", cp); - } - - for (unsigned i=1; i < n_rx_packets; i++) { - const RecvInfo recv_info = recvQueue.impl().at(i-1); - recv_info.first->serialize(base + csprintf(".rxPacket[%d]", i), cp); - Tick rx_tick = recv_info.second; - paramOut(cp, base + csprintf(".rxTick[%d]", i), rx_tick); - } -} - -void MultiIface::unserialize(const std::string &base, CheckpointIn &cp) -{ - assert(recvQueue.size() == 0); - assert(scheduledRecvPacket == nullptr); - assert(recvDone->scheduled() == false); - - // restore periodic sync info - syncEvent->unserialize(base, cp); - - unsigned n_rx_packets; - paramIn(cp, base + ".nRxPackets", n_rx_packets); - - if (n_rx_packets > 0) { - scheduledRecvPacket = std::make_shared(16384); - scheduledRecvPacket->unserialize(base + ".rxPacket[0]", cp); - // Note: receive event will be scheduled when the link is unserialized - } - - for (unsigned i=1; i < n_rx_packets; i++) { - EthPacketPtr rx_packet = std::make_shared(16384); - rx_packet->unserialize(base + csprintf(".rxPacket[%d]", i), cp); - Tick rx_tick = 0; - paramIn(cp, base + csprintf(".rxTick[%d]", i), rx_tick); - assert(rx_tick > 0); - recvQueue.emplace(rx_packet,rx_tick); - } -} - -void MultiIface::initRandom() -{ - // Initialize the seed for random generator to avoid the same sequence - // in all gem5 peer processes - assert(master != nullptr); - if (this == master) - random_mt.init(5489 * (rank+1) + 257); -} - -void MultiIface::startPeriodicSync() -{ - DPRINTF(MultiEthernet, "MultiIface:::initPeriodicSync started\n"); - // Do a global sync here to ensure that peer gem5 processes are around - // (actually this may not be needed...) - sync->run(SyncTrigger::atomic, curTick()); - - // Start the periodic sync if it is a fresh simulation from scratch - if (curTick() == 0) { - if (this == master) { - syncEvent->start(syncStart, syncRepeat); - inform("Multi synchronisation activated: start at %lld, " - "repeat at every %lld ticks.\n", - syncStart, syncRepeat); - } else { - // In case another multiIface object requires different schedule - // for periodic sync than the master does. - syncEvent->adjust(syncStart, syncRepeat); - } - } else { - // Schedule the next periodic sync if resuming from a checkpoint - if (this == master) - syncEvent->resume(); - } - DPRINTF(MultiEthernet, "MultiIface::initPeriodicSync done\n"); -} diff --git a/src/dev/net/multi_iface.hh b/src/dev/net/multi_iface.hh deleted file mode 100644 index f8ce2abf7..000000000 --- a/src/dev/net/multi_iface.hh +++ /dev/null @@ -1,492 +0,0 @@ -/* - * Copyright (c) 2015 ARM Limited - * All rights reserved - * - * The license below extends only to copyright in the software and shall - * not be construed as granting a license to any other intellectual - * property including but not limited to intellectual property relating - * to a hardware implementation of the functionality of the software - * licensed hereunder. You may use the software subject to the license - * terms below provided that you ensure that this notice is replicated - * unmodified and in its entirety in all distributions of the software, - * modified or unmodified, in source code or in binary form. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer; - * redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution; - * neither the name of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * Authors: Gabor Dozsa - */ - -/* @file - * The interface class for multi gem5 simulations. - * - * Multi gem5 is an extension to gem5 to enable parallel simulation of a - * distributed system (e.g. simulation of a pool of machines - * connected by Ethernet links). A multi gem5 run consists of seperate gem5 - * processes running in parallel. Each gem5 process executes - * the simulation of a component of the simulated distributed system. - * (An example component can be a multi-core board with an Ethernet NIC.) - * The MultiIface class below provides services to transfer data and - * control messages among the gem5 processes. The main such services are - * as follows. - * - * 1. Send a data packet coming from a simulated Ethernet link. The packet - * will be transferred to (all) the target(s) gem5 processes. The send - * operation is always performed by the simulation thread, i.e. the gem5 - * thread that is processing the event queue associated with the simulated - * Ethernet link. - * - * 2. Spawn a receiver thread to process messages coming in from the - * from other gem5 processes. Each simulated Ethernet link has its own - * associated receiver thread. The receiver thread saves the incoming packet - * and schedule an appropriate receive event in the event queue. - * - * 3. Schedule a global barrier event periodically to keep the gem5 - * processes in sync. - * Periodic barrier event to keep peer gem5 processes in sync. The basic idea - * is that no gem5 process can go ahead further than the simulated link - * transmission delay to ensure that a corresponding receive event can always - * be scheduled for any message coming in from a peer gem5 process. - * - * - * - * This interface is an abstract class (sendRaw() and recvRaw() - * methods are pure virtual). It can work with various low level - * send/receive service implementations (e.g. TCP/IP, MPI,...). A TCP - * stream socket version is implemented in dev/src/tcp_iface.[hh,cc]. - */ -#ifndef __DEV_NET_MULTI_IFACE_HH__ -#define __DEV_NET_MULTI_IFACE_HH__ - -#include -#include -#include -#include -#include - -#include "dev/net/etherpkt.hh" -#include "dev/net/multi_packet.hh" -#include "sim/core.hh" -#include "sim/drain.hh" -#include "sim/global_event.hh" - -class EventManager; - -/** - * The interface class to talk to peer gem5 processes. - */ -class MultiIface : public Drainable -{ - public: - /*! - * The possible reasons a multi sync among gem5 peers is needed for. - */ - enum - class SyncTrigger { - periodic, /*!< Regular periodic sync. This can be interrupted by a - checkpoint sync request */ - ckpt, /*!< sync before taking a checkpoint */ - atomic /*!< sync that cannot be interrupted (e.g. sync at startup) */ - }; - - private: - typedef MultiHeaderPkt::MsgType MsgType; - - /** Sync State-Machine - \dot - digraph Sync { - node [shape=box, fontsize=10]; - idle -> busy - [ label="new trigger\n by run()" fontsize=8 ]; - busy -> busy - [ label="new message by progress():\n(msg == SyncAck &&\nwaitNum > 1) || \n(msg==CkptSyncReq &&\ntrigger == ckpt)" fontsize=8 ]; - busy -> idle - [ label="new message by progress():\n(msg == SyncAck &&\nwaitNum == 1)" fontsize=8 ]; - busy -> interrupted - [ label="new message by progress():\n(msg == CkptSyncReq &&\ntrigger == periodic)" fontsize=8 ]; - idle -> asyncCkpt - [ label="new message by progress():\nmsg == CkptSyncReq" fontsize=8 ]; - asyncCkpt -> asyncCkpt - [ label="new message by progress():\nmsg == CkptSyncReq" fontsize=8 ]; - asyncCkpt -> busy - [ label="new trigger by run():\ntrigger == ckpt" fontsize=8 ]; - asyncCkpt -> idle - [ label="new trigger by run():\n(trigger == periodic &&\nwaitNum == 0) " fontsize=8 ]; - asyncCkpt -> interrupted - [ label="new trigger by run():\n(trigger == periodic &&\nwaitNum > 0) " fontsize=8 ]; - interrupted -> interrupted - [ label="new message by progress():\n(msg == CkptSyncReq &&\nwaitNum > 1)" fontsize=8 ]; - interrupted -> idle - [ label="new message by progress():\n(msg == CkptSyncReq &&\nwaitNum == 1)" fontsize=8 ]; - } - \enddot - */ - /** @class Sync - * This class implements global sync operations among gem5 peer processes. - * - * @note This class is used as a singleton object (shared by all MultiIface - * objects). - */ - class Sync - { - private: - /*! - * Internal state of the sync singleton object. - */ - enum class SyncState { - busy, /*!< There is an on-going sync. */ - interrupted, /*!< An on-going periodic sync was interrupted. */ - asyncCkpt, /*!< A checkpoint (sim_exit) is already scheduled */ - idle /*!< There is no active sync. */ - }; - /** - * The lock to protect access to the MultiSync object. - */ - std::mutex lock; - /** - * Condition variable for the simulation thread to wait on - * until all receiver threads completes the current global - * synchronisation. - */ - std::condition_variable cv; - /** - * Number of receiver threads that not yet completed the current global - * synchronisation. - */ - unsigned waitNum; - /** - * The trigger for the most recent sync. - */ - SyncTrigger trigger; - /** - * Map sync triggers to request messages. - */ - std::array triggerToMsg = {{ - MsgType::cmdPeriodicSyncReq, - MsgType::cmdCkptSyncReq, - MsgType::cmdAtomicSyncReq - }}; - - /** - * Current sync state. - */ - SyncState state; - - public: - /** - * Core method to perform a full multi sync. - * - * @param t Sync trigger. - * @param sync_tick The tick the sync was expected to happen at. - * @return true if the sync completed, false if it was interrupted. - * - * @note In case of an interrupted periodic sync, sync_tick can be less - * than curTick() when we resume (i.e. re-run) it - */ - bool run(SyncTrigger t, Tick sync_tick); - /** - * Callback when the receiver thread gets a sync message. - */ - void progress(MsgType m); - - Sync() : waitNum(0), state(SyncState::idle) {} - ~Sync() {} - }; - - - /** - * The global event to schedule peridic multi sync. It is used as a - * singleton object. - * - * The periodic synchronisation works as follows. - * 1. A MultisyncEvent is scheduled as a global event when startup() is - * called. - * 2. The progress() method of the MultisyncEvent initiates a new barrier - * for each simulated Ethernet links. - * 3. Simulation thread(s) then waits until all receiver threads - * completes the ongoing barrier. The global sync event is done. - */ - class SyncEvent : public GlobalSyncEvent - { - public: - /** - * Flag to indicate that the most recent periodic sync was interrupted - * (by a checkpoint request). - */ - bool interrupted; - /** - * The tick when the most recent periodic synchronisation was scheduled - * at. - */ - Tick scheduledAt; - /** - * Flag to indicate an on-going drain cycle. - */ - bool isDraining; - - public: - /** - * Only the firstly instanstiated MultiIface object will - * call this constructor. - */ - SyncEvent() : GlobalSyncEvent(Default_Pri, 0), interrupted(false), - scheduledAt(0), isDraining(false) {} - - ~SyncEvent() { assert (scheduled() == false); } - /** - * Schedule the first periodic sync event. - * - * @param start Start tick for multi synchronisation - * @param repeat Frequency of multi synchronisation - * - */ - void start(Tick start, Tick repeat); - /** - * Reschedule (if necessary) the periodic sync event. - * - * @param start Start tick for multi synchronisation - * @param repeat Frequency of multi synchronisation - * - * @note Useful if we have multiple MultiIface objects with - * different 'start' and 'repeat' values for global sync. - */ - void adjust(Tick start, Tick repeat); - /** - * This is a global event so process() will be called by each - * simulation threads. (See further comments in the .cc file.) - */ - void process() override; - /** - * Schedule periodic sync when resuming from a checkpoint. - */ - void resume(); - - void serialize(const std::string &base, CheckpointOut &cp) const; - void unserialize(const std::string &base, CheckpointIn &cp); - }; - - /** - * The receive thread needs to store the packet pointer and the computed - * receive tick for each incoming data packet. This information is used - * by the simulation thread when it processes the corresponding receive - * event. (See more comments at the implemetation of the recvThreadFunc() - * and RecvPacketIn() methods.) - */ - typedef std::pair RecvInfo; - - /** - * Comparison predicate for RecvInfo, needed by the recvQueue. - */ - struct RecvInfoCompare { - bool operator()(const RecvInfo &lhs, const RecvInfo &rhs) - { - return lhs.second > rhs.second; - } - }; - - /** - * Customized priority queue used to store incoming data packets info by - * the receiver thread. We need to expose the underlying container to - * enable iterator access for serializing. - */ - class RecvQueue : public std::priority_queue, - RecvInfoCompare> - { - public: - std::vector &impl() { return c; } - const std::vector &impl() const { return c; } - }; - - /* - * The priority queue to store RecvInfo items ordered by receive ticks. - */ - RecvQueue recvQueue; - /** - * The singleton Sync object to perform multi synchronisation. - */ - static Sync *sync; - /** - * The singleton SyncEvent object to schedule periodic multi sync. - */ - static SyncEvent *syncEvent; - /** - * Tick to schedule the first multi sync event. - * This is just as optimization : we do not need any multi sync - * event until the simulated NIC is brought up by the OS. - */ - Tick syncStart; - /** - * Frequency of multi sync events in ticks. - */ - Tick syncRepeat; - /** - * Receiver thread pointer. - * Each MultiIface object must have exactly one receiver thread. - */ - std::thread *recvThread; - /** - * The event manager associated with the MultiIface object. - */ - EventManager *eventManager; - - /** - * The receive done event for the simulated Ethernet link. - * It is scheduled by the receiver thread for each incoming data - * packet. - */ - Event *recvDone; - - /** - * The packet that belongs to the currently scheduled recvDone event. - */ - EthPacketPtr scheduledRecvPacket; - - /** - * The link delay in ticks for the simulated Ethernet link. - */ - Tick linkDelay; - - /** - * The rank of this process among the gem5 peers. - */ - unsigned rank; - /** - * Total number of receiver threads (in this gem5 process). - * During the simulation it should be constant and equal to the - * number of MultiIface objects (i.e. simulated Ethernet - * links). - */ - static unsigned recvThreadsNum; - /** - * The very first MultiIface object created becomes the master. We need - * a master to co-ordinate the global synchronisation. - */ - static MultiIface *master; - - protected: - /** - * Low level generic send routine. - * @param buf buffer that holds the data to send out - * @param length number of bytes to send - * @param dest_addr address of the target (simulated NIC). This may be - * used by a subclass for optimization (e.g. optimize broadcast) - */ - virtual void sendRaw(void *buf, - unsigned length, - const MultiHeaderPkt::AddressType dest_addr) = 0; - /** - * Low level generic receive routine. - * @param buf the buffer to store the incoming message - * @param length buffer size (in bytes) - */ - virtual bool recvRaw(void *buf, unsigned length) = 0; - /** - * Low level request for synchronisation among gem5 processes. Only one - * MultiIface object needs to call this (in each gem5 process) to trigger - * a multi sync. - * - * @param sync_req Sync request command. - * @param sync_tick The tick when sync is expected to happen in the sender. - */ - virtual void syncRaw(MsgType sync_req, Tick sync_tick) = 0; - /** - * The function executed by a receiver thread. - */ - void recvThreadFunc(); - /** - * Receive a multi header packet. Called by the receiver thread. - * @param header the structure to store the incoming header packet. - * @return false if any error occured during the receive, true otherwise - * - * A header packet can carry a control command (e.g. 'barrier leave') or - * information about a data packet that is following the header packet - * back to back. - */ - bool recvHeader(MultiHeaderPkt::Header &header); - /** - * Receive a data packet. Called by the receiver thread. - * @param data_header The packet descriptor for the expected incoming data - * packet. - */ - void recvData(const MultiHeaderPkt::Header &data_header); - - public: - - /** - * ctor - * @param multi_rank Rank of this gem5 process within the multi run - * @param sync_start Start tick for multi synchronisation - * @param sync_repeat Frequency for multi synchronisation - * @param em The event manager associated with the simulated Ethernet link - */ - MultiIface(unsigned multi_rank, - Tick sync_start, - Tick sync_repeat, - EventManager *em); - - virtual ~MultiIface(); - /** - * Send out an Ethernet packet. - * @param pkt The Ethernet packet to send. - * @param send_delay The delay in ticks for the send completion event. - */ - void packetOut(EthPacketPtr pkt, Tick send_delay); - /** - * Fetch the next packet from the receive queue. - */ - EthPacketPtr packetIn(); - - /** - * spawn the receiver thread. - * @param recv_done The receive done event associated with the simulated - * Ethernet link. - * @param link_delay The link delay for the simulated Ethernet link. - */ - void spawnRecvThread(Event *recv_done, - Tick link_delay); - /** - * Initialize the random number generator with a different seed in each - * peer gem5 process. - */ - void initRandom(); - - DrainState drain() override; - - /** - * Callback when draining is complete. - */ - void drainDone(); - - /** - * Initialize the periodic synchronisation among peer gem5 processes. - */ - void startPeriodicSync(); - - void serialize(const std::string &base, CheckpointOut &cp) const; - void unserialize(const std::string &base, CheckpointIn &cp); - -}; - - -#endif // __DEV_NET_MULTI_IFACE_HH__ diff --git a/src/dev/net/tcp_iface.cc b/src/dev/net/tcp_iface.cc index 035ec8fd0..38fc7aef2 100644 --- a/src/dev/net/tcp_iface.cc +++ b/src/dev/net/tcp_iface.cc @@ -35,25 +35,35 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * Authors: Gabor Dozsa + * Mohammad Alian */ /* @file - * TCP stream socket based interface class implementation for multi gem5 runs. + * TCP stream socket based interface class implementation for dist-gem5 runs. */ #include "dev/net/tcp_iface.hh" #include #include +#include #include #include #include #include #include +#include #include "base/types.hh" -#include "debug/MultiEthernet.hh" +#include "debug/DistEthernet.hh" +#include "debug/DistEthernetCmd.hh" +#include "sim/sim_exit.hh" + +#if defined(__FreeBSD__) +#include + +#endif // MSG_NOSIGNAL does not exists on OS X #if defined(__APPLE__) || defined(__MACH__) @@ -64,42 +74,181 @@ using namespace std; +std::vector > TCPIface::nodes; vector TCPIface::sockRegistry; +int TCPIface::fdStatic = -1; +bool TCPIface::anyListening = false; TCPIface::TCPIface(string server_name, unsigned server_port, - unsigned multi_rank, Tick sync_start, Tick sync_repeat, - EventManager *em) : - MultiIface(multi_rank, sync_start, sync_repeat, em) + unsigned dist_rank, unsigned dist_size, + Tick sync_start, Tick sync_repeat, + EventManager *em, bool is_switch, int num_nodes) : + DistIface(dist_rank, dist_size, sync_start, sync_repeat, em, + is_switch, num_nodes), serverName(server_name), + serverPort(server_port), isSwitch(is_switch), listening(false) { - struct addrinfo addr_hint, *addr_results; + if (is_switch && isMaster) { + while (!listen(serverPort)) { + DPRINTF(DistEthernet, "TCPIface(listen): Can't bind port %d\n", + serverPort); + serverPort++; + } + inform("tcp_iface listening on port %d", serverPort); + // Now accept the first connection requests from each compute node and + // store the node info. The compute nodes will then wait for ack + // messages. Ack messages will be sent by initTransport() in the + // appropriate order to make sure that every compute node is always + // connected to the same switch port. + NodeInfo ni; + for (int i = 0; i < size; i++) { + accept(); + DPRINTF(DistEthernet, "First connection, waiting for link info\n"); + if (!recvTCP(sock, &ni, sizeof(ni))) + panic("Failed to receive link info"); + nodes.push_back(make_pair(ni, sock)); + } + } +} + +bool +TCPIface::listen(int port) +{ + if (listening) + panic("Socket already listening!"); + + struct sockaddr_in sockaddr; int ret; - string port_str = to_string(server_port); + fdStatic = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + panic_if(fdStatic < 0, "socket() failed: %s", strerror(errno)); - sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); - panic_if(sock < 0, "socket() failed: %s", strerror(errno)); + sockaddr.sin_family = PF_INET; + sockaddr.sin_addr.s_addr = INADDR_ANY; + sockaddr.sin_port = htons(port); + // finally clear sin_zero + memset(&sockaddr.sin_zero, 0, sizeof(sockaddr.sin_zero)); + ret = ::bind(fdStatic, (struct sockaddr *)&sockaddr, sizeof (sockaddr)); - bzero(&addr_hint, sizeof(addr_hint)); - addr_hint.ai_family = AF_INET; - addr_hint.ai_socktype = SOCK_STREAM; - addr_hint.ai_protocol = IPPROTO_TCP; + if (ret != 0) { + if (ret == -1 && errno != EADDRINUSE) + panic("ListenSocket(listen): bind() failed!"); + return false; + } - ret = getaddrinfo(server_name.c_str(), port_str.c_str(), - &addr_hint, &addr_results); - panic_if(ret < 0, "getaddrinf() failed: %s", strerror(errno)); + if (::listen(fdStatic, 24) == -1) { + if (errno != EADDRINUSE) + panic("ListenSocket(listen): listen() failed!"); - DPRINTF(MultiEthernet, "Connecting to %s:%u\n", - server_name.c_str(), port_str.c_str()); + return false; + } - ret = ::connect(sock, (struct sockaddr *)(addr_results->ai_addr), - addr_results->ai_addrlen); - panic_if(ret < 0, "connect() failed: %s", strerror(errno)); + listening = true; + anyListening = true; + return true; +} - freeaddrinfo(addr_results); - // add our socket to the static registry +void +TCPIface::establishConnection() +{ + static unsigned cur_rank = 0; + static unsigned cur_id = 0; + NodeInfo ni; + + if (isSwitch) { + if (cur_id == 0) { // first connection accepted in the ctor already + auto const &iface0 = + find_if(nodes.begin(), nodes.end(), + [](const pair &cn) -> bool { + return cn.first.rank == cur_rank; + }); + assert(iface0 != nodes.end()); + assert(iface0->first.distIfaceId == 0); + sock = iface0->second; + ni = iface0->first; + } else { // additional connections from the same compute node + accept(); + DPRINTF(DistEthernet, "Next connection, waiting for link info\n"); + if (!recvTCP(sock, &ni, sizeof(ni))) + panic("Failed to receive link info"); + assert(ni.rank == cur_rank); + assert(ni.distIfaceId == cur_id); + } + inform("Link okay (iface:%d -> (node:%d, iface:%d))", + distIfaceId, ni.rank, ni.distIfaceId); + if (ni.distIfaceId < ni.distIfaceNum - 1) { + cur_id++; + } else { + cur_rank++; + cur_id = 0; + } + // send ack + ni.distIfaceId = distIfaceId; + ni.distIfaceNum = distIfaceNum; + sendTCP(sock, &ni, sizeof(ni)); + } else { // this is not a switch + connect(); + // send link info + ni.rank = rank; + ni.distIfaceId = distIfaceId; + ni.distIfaceNum = distIfaceNum; + sendTCP(sock, &ni, sizeof(ni)); + DPRINTF(DistEthernet, "Connected, waiting for ack (distIfaceId:%d\n", + distIfaceId); + if (!recvTCP(sock, &ni, sizeof(ni))) + panic("Failed to receive ack"); + assert(ni.rank == rank); + inform("Link okay (iface:%d -> switch iface:%d)", distIfaceId, + ni.distIfaceId); + } sockRegistry.push_back(sock); - // let the server know who we are - sendTCP(sock, &multi_rank, sizeof(multi_rank)); +} + +void +TCPIface::accept() +{ + struct sockaddr_in sockaddr; + socklen_t slen = sizeof (sockaddr); + sock = ::accept(fdStatic, (struct sockaddr *)&sockaddr, &slen); + if (sock != -1) { + int i = 1; + if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&i, + sizeof(i)) < 0) + warn("ListenSocket(accept): setsockopt() TCP_NODELAY failed!"); + } +} + +void +TCPIface::connect() +{ + struct addrinfo addr_hint, *addr_results; + int ret; + + string port_str = to_string(serverPort); + + sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + panic_if(sock < 0, "socket() failed: %s", strerror(errno)); + + int fl = 1; + if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&fl, sizeof(fl)) < 0) + warn("ConnectSocket(connect): setsockopt() TCP_NODELAY failed!"); + + bzero(&addr_hint, sizeof(addr_hint)); + addr_hint.ai_family = AF_INET; + addr_hint.ai_socktype = SOCK_STREAM; + addr_hint.ai_protocol = IPPROTO_TCP; + + ret = getaddrinfo(serverName.c_str(), port_str.c_str(), + &addr_hint, &addr_results); + panic_if(ret < 0, "getaddrinf() failed: %s", strerror(errno)); + + DPRINTF(DistEthernet, "Connecting to %s:%s\n", + serverName.c_str(), port_str.c_str()); + + ret = ::connect(sock, (struct sockaddr *)(addr_results->ai_addr), + addr_results->ai_addrlen); + panic_if(ret < 0, "connect() failed: %s", strerror(errno)); + + freeaddrinfo(addr_results); } TCPIface::~TCPIface() @@ -111,12 +260,20 @@ TCPIface::~TCPIface() } void -TCPIface::sendTCP(int sock, void *buf, unsigned length) +TCPIface::sendTCP(int sock, const void *buf, unsigned length) { ssize_t ret; ret = ::send(sock, buf, length, MSG_NOSIGNAL); - panic_if(ret < 0, "send() failed: %s", strerror(errno)); + if (ret < 0) { + if (errno == ECONNRESET || errno == EPIPE) { + inform("send(): %s", strerror(errno)); + exit_message("info", 0, "Message server closed connection, " + "simulation is exiting"); + } else { + panic("send() failed: %s", strerror(errno)); + } + } panic_if(ret != length, "send() failed"); } @@ -140,19 +297,47 @@ TCPIface::recvTCP(int sock, void *buf, unsigned length) } void -TCPIface::syncRaw(MultiHeaderPkt::MsgType sync_req, Tick sync_tick) +TCPIface::sendPacket(const Header &header, const EthPacketPtr &packet) +{ + sendTCP(sock, &header, sizeof(header)); + sendTCP(sock, packet->data, packet->length); +} + +void +TCPIface::sendCmd(const Header &header) { - /* - * Barrier is simply implemented by point-to-point messages to the server - * for now. This method is called by only one TCPIface object. - * The server will send back an 'ack' message when it gets the - * sync request from all clients. - */ - MultiHeaderPkt::Header header_pkt; - header_pkt.msgType = sync_req; - header_pkt.sendTick = sync_tick; - - for (auto s : sockRegistry) - sendTCP(s, (void *)&header_pkt, sizeof(header_pkt)); + DPRINTF(DistEthernetCmd, "TCPIface::sendCmd() type: %d\n", + static_cast(header.msgType)); + // Global commands (i.e. sync request) are always sent by the master + // DistIface. The transfer method is simply implemented as point-to-point + // messages for now + for (auto s: sockRegistry) + sendTCP(s, (void*)&header, sizeof(header)); } +bool +TCPIface::recvHeader(Header &header) +{ + bool ret = recvTCP(sock, &header, sizeof(header)); + DPRINTF(DistEthernetCmd, "TCPIface::recvHeader() type: %d ret: %d\n", + static_cast(header.msgType), ret); + return ret; +} + +void +TCPIface::recvPacket(const Header &header, EthPacketPtr &packet) +{ + packet = make_shared(header.dataPacketLength); + bool ret = recvTCP(sock, packet->data, header.dataPacketLength); + panic_if(!ret, "Error while reading socket"); + packet->length = header.dataPacketLength; +} + +void +TCPIface::initTransport() +{ + // We cannot setup the conections in the constructor because the number + // of dist interfaces (per process) is unknown until the (simobject) init + // phase. That information is necessary for global connection ordering. + establishConnection(); +} diff --git a/src/dev/net/tcp_iface.hh b/src/dev/net/tcp_iface.hh index 2eb2c1c07..97ae9cde0 100644 --- a/src/dev/net/tcp_iface.hh +++ b/src/dev/net/tcp_iface.hh @@ -35,17 +35,17 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * Authors: Gabor Dozsa + * Mohammad Alian */ /* @file - * TCP stream socket based interface class for multi gem5 runs. + * TCP stream socket based interface class for dist-gem5 runs. * - * For a high level description about multi gem5 see comments in - * header file multi_iface.hh. + * For a high level description about dist-gem5 see comments in + * header file dist_iface.hh. * - * The TCP subclass of MultiIface uses a separate server process - * (see tcp_server.[hh,cc] under directory gem5/util/multi). Each gem5 - * process connects to the server via a stream socket. The server process + * Each gem5 process connects to the server (another gem5 process which + * simulates a switch box) via a stream socket. The server process * transfers messages and co-ordinates the synchronisation among the gem5 * peers. */ @@ -55,11 +55,11 @@ #include -#include "dev/net/multi_iface.hh" +#include "dev/net/dist_iface.hh" class EventManager; -class TCPIface : public MultiIface +class TCPIface : public DistIface { private: /** @@ -67,8 +67,28 @@ class TCPIface : public MultiIface */ int sock; + std::string serverName; + int serverPort; + + bool isSwitch; + + bool listening; + static bool anyListening; + static int fdStatic; + + /** + * Compute node info and storage for the very first connection from each + * node (used by the switch) + */ + struct NodeInfo + { + unsigned rank; + unsigned distIfaceId; + unsigned distIfaceNum; + }; + static std::vector > nodes; /** - * Registry for all sockets to the server opened by this gem5 process. + * Storage for all opened sockets */ static std::vector sockRegistry; @@ -82,7 +102,7 @@ class TCPIface : public MultiIface * @param length Size of the message in bytes. */ void - sendTCP(int sock, void *buf, unsigned length); + sendTCP(int sock, const void *buf, unsigned length); /** * Receive the next incoming message through a TCP stream socket. @@ -92,24 +112,26 @@ class TCPIface : public MultiIface * @param length Exact size of the expected message in bytes. */ bool recvTCP(int sock, void *buf, unsigned length); - + bool listen(int port); + void accept(); + void connect(); + int getfdStatic() const { return fdStatic; } + bool islistening() const { return listening; } + bool anyislistening() const { return anyListening; } + void establishConnection(); protected: - virtual void - sendRaw(void *buf, unsigned length, - const MultiHeaderPkt::AddressType dest_addr=nullptr) override - { - sendTCP(sock, buf, length); - } + void sendPacket(const Header &header, + const EthPacketPtr &packet) override; - virtual bool recvRaw(void *buf, unsigned length) override - { - return recvTCP(sock, buf, length); - } + void sendCmd(const Header &header) override; + + bool recvHeader(Header &header) override; + + void recvPacket(const Header &header, EthPacketPtr &packet) override; - virtual void syncRaw(MultiHeaderPkt::MsgType sync_req, - Tick sync_tick) override; + void initTransport() override; public: /** @@ -118,14 +140,15 @@ class TCPIface : public MultiIface * server process. * @param server_port The port number the server listening for new * connections. - * @param sync_start The tick for the first multi synchronisation. - * @param sync_repeat The frequency of multi synchronisation. + * @param sync_start The tick for the first dist synchronisation. + * @param sync_repeat The frequency of dist synchronisation. * @param em The EventManager object associated with the simulated * Ethernet link. */ TCPIface(std::string server_name, unsigned server_port, - unsigned multi_rank, Tick sync_start, Tick sync_repeat, - EventManager *em); + unsigned dist_rank, unsigned dist_size, + Tick sync_start, Tick sync_repeat, EventManager *em, + bool is_switch, int num_nodes); ~TCPIface() override; }; diff --git a/src/sim/global_event.hh b/src/sim/global_event.hh index 88981b52a..1aab4a233 100644 --- a/src/sim/global_event.hh +++ b/src/sim/global_event.hh @@ -219,7 +219,7 @@ class GlobalSyncEvent : public BaseGlobalEventTemplate }; GlobalSyncEvent(Priority p, Flags f) - : Base(p, f) + : Base(p, f), repeat(0) { } GlobalSyncEvent(Tick when, Tick _repeat, Priority p, Flags f) diff --git a/src/dev/net/multi_packet.cc b/src/sim/initparam_keys.hh similarity index 64% rename from src/dev/net/multi_packet.cc rename to src/sim/initparam_keys.hh index 85f76b0c4..3ea68ddfe 100644 --- a/src/dev/net/multi_packet.cc +++ b/src/sim/initparam_keys.hh @@ -38,63 +38,33 @@ */ /* @file - * MultiHeaderPkt class to encapsulate multi-gem5 header packets - * + * Magic key definitions for the InitParam pseudo inst */ +#ifndef ___SIM_INITPARAM_KEYS_HH__ +#define ___SIM_INITPARAM_KEYS_HH__ -#include "dev/net/multi_packet.hh" - -#include -#include - -#include "base/inet.hh" - -unsigned -MultiHeaderPkt::maxAddressLength() -{ - return sizeof(AddressType); -} - -void -MultiHeaderPkt::clearAddress(AddressType &addr) -{ - std::memset(addr, 0, sizeof(addr)); -} - -bool -MultiHeaderPkt::isAddressEqual(const AddressType &addr1, - const AddressType &addr2) -{ - return (std::memcmp(addr1, addr2, sizeof(addr1)) == 0); -} - -bool -MultiHeaderPkt::isAddressLess(const AddressType &addr1, - const AddressType &addr2) -{ - return (std::memcmp(addr1, addr2, sizeof(addr1)) < 0); -} - -void -MultiHeaderPkt::copyAddress(AddressType &dest, const AddressType &src) -{ - std::memcpy(dest, src, sizeof(dest)); -} - -bool -MultiHeaderPkt::isBroadcastAddress(const AddressType &addr) -{ - return ((Net::EthAddr *)&addr)->broadcast(); -} - -bool -MultiHeaderPkt::isMulticastAddress(const AddressType &addr) +namespace PseudoInst { +/** + * Unique keys to retrieve various params by the initParam pseudo inst. + * + * @note Each key must be shorter than 16 characters (because we use + * two 64-bit registers two pass in the key to the initparam function) + */ +struct InitParamKey { - return ((Net::EthAddr *)&addr)->multicast(); -} + /** + * The default key (empty string) + */ + static constexpr const char *DEFAULT = ""; + /** + * Unique key for "rank" param (distributed gem5 runs) + */ + static constexpr const char *DIST_RANK = "dist-rank"; + /** + * Unique key for "size" param (distributed gem5 runs) + */ + static constexpr const char *DIST_SIZE = "dist-size"; +}; +} // namespace PseudoInst -bool -MultiHeaderPkt::isUnicastAddress(const AddressType &addr) -{ - return ((Net::EthAddr *)&addr)->unicast(); -} +#endif diff --git a/src/sim/pseudo_inst.cc b/src/sim/pseudo_inst.cc index 0f7de0c3a..67c47ce77 100644 --- a/src/sim/pseudo_inst.cc +++ b/src/sim/pseudo_inst.cc @@ -63,8 +63,10 @@ #include "debug/PseudoInst.hh" #include "debug/Quiesce.hh" #include "debug/WorkItems.hh" +#include "dev/net/dist_iface.hh" #include "params/BaseCPU.hh" #include "sim/full_system.hh" +#include "sim/initparam_keys.hh" #include "sim/process.hh" #include "sim/pseudo_inst.hh" #include "sim/serialize.hh" @@ -357,8 +359,10 @@ void m5exit(ThreadContext *tc, Tick delay) { DPRINTF(PseudoInst, "PseudoInst::m5exit(%i)\n", delay); - Tick when = curTick() + delay * SimClock::Int::ns; - exitSimLoop("m5_exit instruction encountered", 0, when, 0, true); + if (DistIface::readyToExit(delay)) { + Tick when = curTick() + delay * SimClock::Int::ns; + exitSimLoop("m5_exit instruction encountered", 0, when, 0, true); + } } void @@ -471,10 +475,14 @@ initParam(ThreadContext *tc, uint64_t key_str1, uint64_t key_str2) // Compare the key parameter with the known values to select the return // value uint64_t val; - if (strlen(key_str) == 0) { + if (strcmp(key_str, InitParamKey::DEFAULT) == 0) { val = tc->getCpuPtr()->system->init_param; + } else if (strcmp(key_str, InitParamKey::DIST_RANK) == 0) { + val = DistIface::rankParam(); + } else if (strcmp(key_str, InitParamKey::DIST_SIZE) == 0) { + val = DistIface::sizeParam(); } else { - panic("Unknown key for initparam pseudo instruction"); + panic("Unknown key for initparam pseudo instruction:\"%s\"", key_str); } return val; } @@ -529,10 +537,11 @@ m5checkpoint(ThreadContext *tc, Tick delay, Tick period) if (!tc->getCpuPtr()->params()->do_checkpoint_insts) return; - Tick when = curTick() + delay * SimClock::Int::ns; - Tick repeat = period * SimClock::Int::ns; - - exitSimLoop("checkpoint", 0, when, repeat); + if (DistIface::readyToCkpt(delay, period)) { + Tick when = curTick() + delay * SimClock::Int::ns; + Tick repeat = period * SimClock::Int::ns; + exitSimLoop("checkpoint", 0, when, repeat); + } } uint64_t diff --git a/util/multi/Makefile b/util/multi/Makefile deleted file mode 100644 index a58dd3307..000000000 --- a/util/multi/Makefile +++ /dev/null @@ -1,63 +0,0 @@ -# -# Copyright (c) 2015 ARM Limited -# All rights reserved -# -# The license below extends only to copyright in the software and shall -# not be construed as granting a license to any other intellectual -# property including but not limited to intellectual property relating -# to a hardware implementation of the functionality of the software -# licensed hereunder. You may use the software subject to the license -# terms below provided that you ensure that this notice is replicated -# unmodified and in its entirety in all distributions of the software, -# modified or unmodified, in source code or in binary form. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer; -# redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution; -# neither the name of the copyright holders nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# -# Authors: Gabor Dozsa - -CXX= g++ - -DEBUG= -DDEBUG - -M5_ARCH?= ARM -M5_DIR?= ../.. - -vpath % $(M5_DIR)/build/$(M5_ARCH)/dev - -INCDIRS= -I. -I$(M5_DIR)/build/$(M5_ARCH) -I$(M5_DIR)/ext -CCFLAGS= -g -Wall -O3 $(DEBUG) -std=c++11 -MMD $(INCDIRS) - -default: tcp_server - -clean: - @rm -f tcp_server *.o *.d *~ - -tcp_server: tcp_server.o multi_packet.o - $(CXX) $(LFLAGS) -o $@ $^ - -%.o: %.cc - @echo '$(CXX) $(CCFLAGS) -c $(notdir $<) -o $@' - @$(CXX) $(CCFLAGS) -c $< -o $@ - --include *.d diff --git a/util/multi/tcp_server.cc b/util/multi/tcp_server.cc deleted file mode 100644 index 1ec4303d3..000000000 --- a/util/multi/tcp_server.cc +++ /dev/null @@ -1,463 +0,0 @@ -/* - * Copyright (c) 2015 ARM Limited - * All rights reserved - * - * The license below extends only to copyright in the software and shall - * not be construed as granting a license to any other intellectual - * property including but not limited to intellectual property relating - * to a hardware implementation of the functionality of the software - * licensed hereunder. You may use the software subject to the license - * terms below provided that you ensure that this notice is replicated - * unmodified and in its entirety in all distributions of the software, - * modified or unmodified, in source code or in binary form. - * - * Copyright (c) 2008 The Regents of The University of Michigan - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer; - * redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution; - * neither the name of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * Authors: Gabor Dozsa - */ - - -/* @file - * Message server implementation using TCP stream sockets for parallel gem5 - * runs. - */ -#include -#include -#include -#include - -#include -#include - -#include "tcp_server.hh" - -using namespace std; - -// Some basic macros for information and error reporting. -#define PRINTF(...) fprintf(stderr, __VA_ARGS__) - -#ifdef DEBUG -static bool debugSetup = true; -static bool debugPeriodic = false; -static bool debugSync = true; -static bool debugPkt = false; -#define DPRINTF(v,...) if (v) PRINTF(__VA_ARGS__) -#else -#define DPRINTF(v,...) -#endif - -#define inform(...) do { PRINTF("info: "); \ - PRINTF(__VA_ARGS__); } while(0) - -#define panic(...) do { PRINTF("panic: "); \ - PRINTF(__VA_ARGS__); \ - PRINTF("\n[%s:%s], line %d\n", \ - __FUNCTION__, __FILE__, __LINE__); \ - exit(-1); } while(0) - -TCPServer *TCPServer::instance = nullptr; - -TCPServer::Channel::Channel() : fd(-1), isAlive(false), state(SyncState::idle) -{ - MultiHeaderPkt::clearAddress(address); -} - -unsigned -TCPServer::Channel::recvRaw(void *buf, unsigned size) const -{ - ssize_t n; - // This is a blocking receive. - n = recv(fd, buf, size, MSG_WAITALL); - - if (n < 0) - panic("read() failed:%s", strerror(errno)); - else if (n > 0 && n < size) - // the recv() call should wait for the full message - panic("read() failed"); - - return n; -} - -void -TCPServer::Channel::sendRaw(const void *buf, unsigned size) const -{ - ssize_t n; - n = send(fd, buf, size, MSG_NOSIGNAL); - if (n < 0) - panic("write() failed:%s", strerror(errno)); - else if (n != size) - panic("write() failed"); -} - -void TCPServer::Channel::updateAddress(const AddressType &new_address) -{ - // check if the known address has changed (e.g. the client reconfigured - // its Ethernet NIC) - if (MultiHeaderPkt::isAddressEqual(address, new_address)) - return; - - // So we have to update the address. Note that we always - // store the same address as key in the map but the ordering - // may change so we need to erase and re-insert it again. - auto info = TCPServer::instance->addressMap.find(&address); - if (info != TCPServer::instance->addressMap.end()) { - TCPServer::instance->addressMap.erase(info); - } - - MultiHeaderPkt::copyAddress(address, new_address); - TCPServer::instance->addressMap[&address] = this; -} - -void -TCPServer::Channel::headerPktIn() -{ - ssize_t n; - Header hdr_pkt; - - n = recvRaw(&hdr_pkt, sizeof(hdr_pkt)); - - if (n == 0) { - // EOF - nothing to do here, we will handle this as a POLLRDHUP event - // in the main loop. - return; - } - - if (hdr_pkt.msgType == MsgType::dataDescriptor) { - updateAddress(hdr_pkt.srcAddress); - TCPServer::instance->xferData(hdr_pkt, *this); - } else { - processCmd(hdr_pkt.msgType, hdr_pkt.sendTick); - } -} - -void TCPServer::Channel::processCmd(MsgType cmd, Tick send_tick) -{ - switch (cmd) { - case MsgType::cmdAtomicSyncReq: - DPRINTF(debugSync,"Atomic sync request (rank:%d)\n",rank); - assert(state == SyncState::idle); - state = SyncState::atomic; - TCPServer::instance->syncTryComplete(SyncState::atomic, - MsgType::cmdAtomicSyncAck); - break; - case MsgType::cmdPeriodicSyncReq: - DPRINTF(debugPeriodic,"PERIODIC sync request (at %ld)\n",send_tick); - // sanity check - if (TCPServer::instance->periodicSyncTick() == 0) { - TCPServer::instance->periodicSyncTick(send_tick); - } else if ( TCPServer::instance->periodicSyncTick() != send_tick) { - panic("Out of order periodic sync request - rank:%d " - "(send_tick:%ld ongoing:%ld)", rank, send_tick, - TCPServer::instance->periodicSyncTick()); - } - switch (state) { - case SyncState::idle: - state = SyncState::periodic; - TCPServer::instance->syncTryComplete(SyncState::periodic, - MsgType::cmdPeriodicSyncAck); - break; - case SyncState::asyncCkpt: - // An async ckpt request has already been sent to this client and - // that will interrupt this periodic sync. We can simply drop this - // message. - break; - default: - panic("Unexpected state for periodic sync request (rank:%d)", - rank); - break; - } - break; - case MsgType::cmdCkptSyncReq: - DPRINTF(debugSync, "CKPT sync request (rank:%d)\n",rank); - switch (state) { - case SyncState::idle: - TCPServer::instance->ckptPropagate(*this); - // we fall through here to complete #clients==1 case - case SyncState::asyncCkpt: - state = SyncState::ckpt; - TCPServer::instance->syncTryComplete(SyncState::ckpt, - MsgType::cmdCkptSyncAck); - break; - default: - panic("Unexpected state for ckpt sync request (rank:%d)", rank); - break; - } - break; - default: - panic("Unexpected header packet (rank:%d)",rank); - break; - } -} - -TCPServer::TCPServer(unsigned clients_num, - unsigned listen_port, - int timeout_in_sec) -{ - assert(instance == nullptr); - construct(clients_num, listen_port, timeout_in_sec); - instance = this; -} - -TCPServer::~TCPServer() -{ - for (auto &c : clientsPollFd) - close(c.fd); -} - -void -TCPServer::construct(unsigned clients_num, unsigned port, int timeout_in_sec) -{ - int listen_sock, new_sock, ret; - unsigned client_len; - struct sockaddr_in server_addr, client_addr; - struct pollfd new_pollfd; - Channel new_channel; - - DPRINTF(debugSetup, "Start listening on port %u ...\n", port); - - listen_sock = socket(AF_INET, SOCK_STREAM, 0); - if (listen_sock < 0) - panic("socket() failed:%s", strerror(errno)); - - bzero(&server_addr, sizeof(server_addr)); - server_addr.sin_family = AF_INET; - server_addr.sin_addr.s_addr = INADDR_ANY; - server_addr.sin_port = htons(port); - if (bind(listen_sock, (struct sockaddr *) &server_addr, - sizeof(server_addr)) < 0) - panic("bind() failed:%s", strerror(errno)); - listen(listen_sock, 10); - - clientsPollFd.reserve(clients_num); - clientsChannel.reserve(clients_num); - - new_pollfd.events = POLLIN | POLLRDHUP; - new_pollfd.revents = 0; - while (clientsPollFd.size() < clients_num) { - new_pollfd.fd = listen_sock; - ret = poll(&new_pollfd, 1, timeout_in_sec*1000); - if (ret == 0) - panic("Timeout while waiting for clients to connect"); - assert(ret == 1 && new_pollfd.revents == POLLIN); - client_len = sizeof(client_addr); - new_sock = accept(listen_sock, - (struct sockaddr *) &client_addr, - &client_len); - if (new_sock < 0) - panic("accept() failed:%s", strerror(errno)); - new_pollfd.fd = new_sock; - new_pollfd.revents = 0; - clientsPollFd.push_back(new_pollfd); - new_channel.fd = new_sock; - new_channel.isAlive = true; - new_channel.recvRaw(&new_channel.rank, sizeof(new_channel.rank)); - clientsChannel.push_back(new_channel); - - DPRINTF(debugSetup, "New client connection addr:%u port:%hu rank:%d\n", - client_addr.sin_addr.s_addr, client_addr.sin_port, - new_channel.rank); - } - ret = close(listen_sock); - assert(ret == 0); - - DPRINTF(debugSetup, "Setup complete\n"); -} - -void -TCPServer::run() -{ - int nfd; - unsigned num_active_clients = clientsPollFd.size(); - - DPRINTF(debugSetup, "Entering run() loop\n"); - while (num_active_clients == clientsPollFd.size()) { - nfd = poll(&clientsPollFd[0], clientsPollFd.size(), -1); - if (nfd == -1) - panic("poll() failed:%s", strerror(errno)); - - for (unsigned i = 0, n = 0; - i < clientsPollFd.size() && (signed)n < nfd; - i++) { - struct pollfd &pfd = clientsPollFd[i]; - if (pfd.revents) { - if (pfd.revents & POLLERR) - panic("poll() returned POLLERR"); - if (pfd.revents & POLLIN) { - clientsChannel[i].headerPktIn(); - } - if (pfd.revents & POLLRDHUP) { - // One gem5 process exited or aborted. Either way, we - // assume the full simulation should stop now (either - // because m5 exit was called or a serious error - // occurred.) So we quit the run loop here and close all - // sockets to notify the remaining peer gem5 processes. - pfd.events = 0; - clientsChannel[i].isAlive = false; - num_active_clients--; - DPRINTF(debugSetup, "POLLRDHUP event"); - } - n++; - if ((signed)n == nfd) - break; - } - } - } - DPRINTF(debugSetup, "Exiting run() loop\n"); -} - -void -TCPServer::xferData(const Header &hdr_pkt, const Channel &src) -{ - unsigned n; - assert(hdr_pkt.dataPacketLength <= sizeof(packetBuffer)); - n = src.recvRaw(packetBuffer, hdr_pkt.dataPacketLength); - - if (n == 0) - panic("recvRaw() failed"); - DPRINTF(debugPkt, "Incoming data packet (from rank %d) " - "src:0x%02x%02x%02x%02x%02x%02x " - "dst:0x%02x%02x%02x%02x%02x%02x\n", - src.rank, - hdr_pkt.srcAddress[0], - hdr_pkt.srcAddress[1], - hdr_pkt.srcAddress[2], - hdr_pkt.srcAddress[3], - hdr_pkt.srcAddress[4], - hdr_pkt.srcAddress[5], - hdr_pkt.dstAddress[0], - hdr_pkt.dstAddress[1], - hdr_pkt.dstAddress[2], - hdr_pkt.dstAddress[3], - hdr_pkt.dstAddress[4], - hdr_pkt.dstAddress[5]); - // Now try to figure out the destination client(s). - auto dst_info = addressMap.find(&hdr_pkt.dstAddress); - - // First handle the multicast/broadcast or unknonw destination case. These - // all trigger a broadcast of the packet to all clients. - if (MultiHeaderPkt::isUnicastAddress(hdr_pkt.dstAddress) == false || - dst_info == addressMap.end()) { - unsigned n = 0; - for (auto const &c: clientsChannel) { - if (c.isAlive && &c!=&src) { - c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); - c.sendRaw(packetBuffer, hdr_pkt.dataPacketLength); - n++; - } - } - if (n == 0) { - inform("Broadcast/multicast packet dropped\n"); - } - } else { - // It is a unicast address with a known destination - Channel *dst = dst_info->second; - if (dst->isAlive) { - dst->sendRaw(&hdr_pkt, sizeof(hdr_pkt)); - dst->sendRaw(packetBuffer, hdr_pkt.dataPacketLength); - DPRINTF(debugPkt, "Unicast packet sent (to rank %d)\n",dst->rank); - } else { - inform("Unicast packet dropped (destination exited)\n"); - } - } -} - -void -TCPServer::syncTryComplete(SyncState st, MsgType ack) -{ - // Check if the barrieris complete. If so then notify all the clients. - for (auto &c : clientsChannel) { - if (c.isAlive && (c.state != st)) { - // sync not complete yet, stop here - return; - } - } - // Sync complete, send out the acks - MultiHeaderPkt::Header hdr_pkt; - hdr_pkt.msgType = ack; - for (auto &c : clientsChannel) { - if (c.isAlive) { - c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); - c.state = SyncState::idle; - } - } - // Reset periodic send tick - _periodicSyncTick = 0; - DPRINTF(st == SyncState::periodic ? debugPeriodic : debugSync, - "Sync COMPLETE\n"); -} - -void -TCPServer::ckptPropagate(Channel &ch) -{ - // Channel ch got a ckpt request that needs to be propagated to the other - // clients - MultiHeaderPkt::Header hdr_pkt; - hdr_pkt.msgType = MsgType::cmdCkptSyncReq; - for (auto &c : clientsChannel) { - if (c.isAlive && (&c != &ch)) { - switch (c.state) { - case SyncState::idle: - case SyncState::periodic: - c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); - c.state = SyncState::asyncCkpt; - break; - default: - panic("Unexpected state for ckpt sync request propagation " - "(rank:%d)\n",c.rank); - break; - } - } - } -} - -int main(int argc, char *argv[]) -{ - TCPServer *server; - int clients_num = -1, listen_port = -1; - int first_arg = 1, timeout_in_sec = 60; - - if (argc > 1 && string(argv[1]).compare("-debug") == 0) { - timeout_in_sec = -1; - first_arg++; - argc--; - } - - if (argc != 3) - panic("We need two command line args (number of clients and tcp listen" - " port"); - - clients_num = atoi(argv[first_arg]); - listen_port = atoi(argv[first_arg + 1]); - - server = new TCPServer(clients_num, listen_port, timeout_in_sec); - - server->run(); - - delete server; - - return 0; -} diff --git a/util/multi/tcp_server.hh b/util/multi/tcp_server.hh deleted file mode 100644 index c5f2a9ce8..000000000 --- a/util/multi/tcp_server.hh +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Copyright (c) 2015 ARM Limited - * All rights reserved - * - * The license below extends only to copyright in the software and shall - * not be construed as granting a license to any other intellectual - * property including but not limited to intellectual property relating - * to a hardware implementation of the functionality of the software - * licensed hereunder. You may use the software subject to the license - * terms below provided that you ensure that this notice is replicated - * unmodified and in its entirety in all distributions of the software, - * modified or unmodified, in source code or in binary form. - * - * Copyright (c) 2008 The Regents of The University of Michigan - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer; - * redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution; - * neither the name of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * Authors: Gabor Dozsa - */ - -/* @file - * Message server using TCP stream sockets for parallel gem5 runs. - * - * For a high level description about multi gem5 see comments in - * header files src/dev/multi_iface.hh and src/dev/tcp_iface.hh. - * - * This file implements the central message server process for multi gem5. - * The server is responsible the following tasks. - * 1. Establishing a TCP socket connection for each gem5 process (clients). - * - * 2. Process data messages coming in from clients. The server checks - * the MAC addresses in the header message and transfers the message - * to the target(s) client(s). - * - * 3. Processing synchronisation related control messages. Synchronisation - * is performed as follows. The server waits for a 'barrier enter' message - * from all the clients. When the last such control message arrives, the - * server sends out a 'barrier leave' control message to all the clients. - * - * 4. Triggers complete termination in case a client exits. A client may - * exit either by calling 'm5 exit' pseudo instruction or due to a fatal - * error. In either case, we assume that the entire multi simulation needs to - * terminate. The server triggers full termination by tearing down the - * open TCP sockets. - * - * The TCPServer class is instantiated as a singleton object. - * - * The server can be built independently from the rest of gem5 (and it is - * architecture agnostic). See the Makefile in the same directory. - * - */ - -#include - -#include -#include - -#include "dev/etherpkt.hh" -#include "dev/multi_packet.hh" - -/** - * The maximum length of an Ethernet packet (allowing Jumbo frames). - */ -#define MAX_ETH_PACKET_LENGTH 9014 - -class TCPServer -{ - public: - typedef MultiHeaderPkt::AddressType AddressType; - typedef MultiHeaderPkt::Header Header; - typedef MultiHeaderPkt::MsgType MsgType; - - private: - - enum - class SyncState { periodic, ckpt, asyncCkpt, atomic, idle }; - /** - * The Channel class encapsulates all the information about a client - * and its current status. - */ - class Channel - { - private: - /** - * The MAC address of the client. - */ - AddressType address; - - /** - * Update the client MAC address. It is called every time a new data - * packet is to come in. - */ - void updateAddress(const AddressType &new_addr); - /** - * Process an incoming command message. - */ - void processCmd(MultiHeaderPkt::MsgType cmd, Tick send_tick); - - - public: - /** - * TCP stream socket. - */ - int fd; - /** - * Is client connected? - */ - bool isAlive; - /** - * Current state of the channel wrt. multi synchronisation. - */ - SyncState state; - /** - * Multi rank of the client - */ - unsigned rank; - - public: - Channel(); - ~Channel () {} - - - /** - * Receive and process the next incoming header packet. - */ - void headerPktIn(); - /** - * Send raw data to the connected client. - * - * @param data The data to send. - * @param size Size of the data (in bytes). - */ - void sendRaw(const void *data, unsigned size) const; - /** - * Receive raw data from the connected client. - * - * @param buf The buffer to store the incoming data into. - * @param size Size of data to receive (in bytes). - * @return In case of success, it returns size. Zero is returned - * if the socket is already closed by the client. - */ - unsigned recvRaw(void *buf, unsigned size) const; - }; - - /** - * The array of socket descriptors needed by the poll() system call. - */ - std::vector clientsPollFd; - /** - * Array holding all clients info. - */ - std::vector clientsChannel; - - - /** - * We use a map to select the target client based on the destination - * MAC address. - */ - struct AddressCompare - { - bool operator()(const AddressType *a1, const AddressType *a2) - { - return MultiHeaderPkt::isAddressLess(*a1, *a2); - } - }; - std::map addressMap; - - /** - * As we dealt with only one message at a time, we can allocate and re-use - * a single packet buffer (to hold any incoming data packet). - */ - uint8_t packetBuffer[MAX_ETH_PACKET_LENGTH]; - /** - * Send tick of the current periodic sync. It is used for sanity check. - */ - Tick _periodicSyncTick; - /** - * The singleton server object. - */ - static TCPServer *instance; - - /** - * Set up the socket connections to all the clients. - * - * @param listen_port The port we are listening on for new client - * connection requests. - * @param nclients The number of clients to connect to. - * @param timeout Timeout in sec to complete the setup phase - * (i.e. all gem5 establish socket connections) - */ - void construct(unsigned listen_port, unsigned nclients, int timeout); - /** - * Transfer the header and the follow up data packet to the target(s) - * clients. - * - * @param hdr The header message structure. - * @param ch The source channel for the message. - */ - void xferData(const Header &hdr, const Channel &ch); - /** - * Check if the current round of a synchronisation is completed and notify - * the clients if it is so. - * - * @param st The state all channels should have if sync is complete. - * @param ack The type of ack message to send out if the sync is compete. - */ - void syncTryComplete(SyncState st, MultiHeaderPkt::MsgType ack); - /** - * Broadcast a request for checkpoint sync. - * - * @param ch The source channel of the checkpoint sync request. - */ - void ckptPropagate(Channel &ch); - /** - * Setter for current periodic send tick. - */ - void periodicSyncTick(Tick t) { _periodicSyncTick = t; } - /** - * Getter for current periodic send tick. - */ - Tick periodicSyncTick() { return _periodicSyncTick; } - - public: - - TCPServer(unsigned clients_num, unsigned listen_port, int timeout_in_sec); - ~TCPServer(); - - /** - * The main server loop that waits for and processes incoming messages. - */ - void run(); -}; -- 2.30.2