// Copyright 2013 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows package net #include "runtime.h" #include "defs.h" #include "arch.h" #include "malloc.h" // Map gccgo field names to gc field names. // Eface aka __go_empty_interface. #define type __type_descriptor #define data __object // Integrated network poller (platform-independent part). // A particular implementation (epoll/kqueue) must define the following functions: // void runtime_netpollinit(void); // to initialize the poller // int32 runtime_netpollopen(uintptr fd, PollDesc *pd); // to arm edge-triggered notifications // and associate fd with pd. // An implementation must call the following function to denote that the pd is ready. // void runtime_netpollready(G **gpp, PollDesc *pd, int32 mode); // PollDesc contains 2 binary semaphores, rg and wg, to park reader and writer // goroutines respectively. The semaphore can be in the following states: // READY - io readiness notification is pending; // a goroutine consumes the notification by changing the state to nil. // WAIT - a goroutine prepares to park on the semaphore, but not yet parked; // the goroutine commits to park by changing the state to G pointer, // or, alternatively, concurrent io notification changes the state to READY, // or, alternatively, concurrent timeout/close changes the state to nil. // G pointer - the goroutine is blocked on the semaphore; // io notification or timeout/close changes the state to READY or nil respectively // and unparks the goroutine. // nil - nothing of the above. #define READY ((G*)1) #define WAIT ((G*)2) enum { PollBlockSize = 4*1024, }; struct PollDesc { PollDesc* link; // in pollcache, protected by pollcache.Lock // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations. // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime. // pollReset, pollWait, pollWaitCanceled and runtime_netpollready (IO rediness notification) // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated // in a lock-free way by all operations. Lock; // protectes the following fields uintptr fd; bool closing; uintptr seq; // protects from stale timers and ready notifications G* rg; // READY, WAIT, G waiting for read or nil Timer rt; // read deadline timer (set if rt.fv != nil) int64 rd; // read deadline G* wg; // READY, WAIT, G waiting for write or nil Timer wt; // write deadline timer int64 wd; // write deadline void* user; // user settable cookie }; static struct { Lock; PollDesc* first; // PollDesc objects must be type-stable, // because we can get ready notification from epoll/kqueue // after the descriptor is closed/reused. // Stale notifications are detected using seq variable, // seq is incremented when deadlines are changed or descriptor is reused. } pollcache; static bool netpollblock(PollDesc*, int32, bool); static G* netpollunblock(PollDesc*, int32, bool); static void deadline(Eface, uintptr); static void readDeadline(Eface, uintptr); static void writeDeadline(Eface, uintptr); static PollDesc* allocPollDesc(void); static intgo checkerr(PollDesc *pd, int32 mode); static FuncVal deadlineFn = {(void(*)(void))deadline}; static FuncVal readDeadlineFn = {(void(*)(void))readDeadline}; static FuncVal writeDeadlineFn = {(void(*)(void))writeDeadline}; // runtimeNano returns the current value of the runtime clock in nanoseconds. func runtimeNano() (ns int64) { ns = runtime_nanotime(); } func runtime_pollServerInit() { runtime_netpollinit(); } func runtime_pollOpen(fd uintptr) (pd *PollDesc, errno int) { pd = allocPollDesc(); runtime_lock(pd); if(pd->wg != nil && pd->wg != READY) runtime_throw("runtime_pollOpen: blocked write on free descriptor"); if(pd->rg != nil && pd->rg != READY) runtime_throw("runtime_pollOpen: blocked read on free descriptor"); pd->fd = fd; pd->closing = false; pd->seq++; pd->rg = nil; pd->rd = 0; pd->wg = nil; pd->wd = 0; runtime_unlock(pd); errno = runtime_netpollopen(fd, pd); } func runtime_pollClose(pd *PollDesc) { if(!pd->closing) runtime_throw("runtime_pollClose: close w/o unblock"); if(pd->wg != nil && pd->wg != READY) runtime_throw("runtime_pollClose: blocked write on closing descriptor"); if(pd->rg != nil && pd->rg != READY) runtime_throw("runtime_pollClose: blocked read on closing descriptor"); runtime_netpollclose(pd->fd); runtime_lock(&pollcache); pd->link = pollcache.first; pollcache.first = pd; runtime_unlock(&pollcache); } func runtime_pollReset(pd *PollDesc, mode int) (err int) { err = checkerr(pd, mode); if(err) goto ret; if(mode == 'r') pd->rg = nil; else if(mode == 'w') pd->wg = nil; ret: } func runtime_pollWait(pd *PollDesc, mode int) (err int) { err = checkerr(pd, mode); if(err == 0) { // As for now only Solaris uses level-triggered IO. if(Solaris) runtime_netpollarm(pd, mode); while(!netpollblock(pd, mode, false)) { err = checkerr(pd, mode); if(err != 0) break; // Can happen if timeout has fired and unblocked us, // but before we had a chance to run, timeout has been reset. // Pretend it has not happened and retry. } } } func runtime_pollWaitCanceled(pd *PollDesc, mode int) { // This function is used only on windows after a failed attempt to cancel // a pending async IO operation. Wait for ioready, ignore closing or timeouts. while(!netpollblock(pd, mode, true)) ; } func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) { G *rg, *wg; runtime_lock(pd); if(pd->closing) { runtime_unlock(pd); return; } pd->seq++; // invalidate current timers // Reset current timers. if(pd->rt.fv) { runtime_deltimer(&pd->rt); pd->rt.fv = nil; } if(pd->wt.fv) { runtime_deltimer(&pd->wt); pd->wt.fv = nil; } // Setup new timers. if(d != 0 && d <= runtime_nanotime()) d = -1; if(mode == 'r' || mode == 'r'+'w') pd->rd = d; if(mode == 'w' || mode == 'r'+'w') pd->wd = d; if(pd->rd > 0 && pd->rd == pd->wd) { pd->rt.fv = &deadlineFn; pd->rt.when = pd->rd; // Copy current seq into the timer arg. // Timer func will check the seq against current descriptor seq, // if they differ the descriptor was reused or timers were reset. pd->rt.arg.type = nil; // should be *pollDesc type descriptor. pd->rt.arg.data = pd; pd->rt.seq = pd->seq; runtime_addtimer(&pd->rt); } else { if(pd->rd > 0) { pd->rt.fv = &readDeadlineFn; pd->rt.when = pd->rd; pd->rt.arg.type = nil; // should be *pollDesc type descriptor. pd->rt.arg.data = pd; pd->rt.seq = pd->seq; runtime_addtimer(&pd->rt); } if(pd->wd > 0) { pd->wt.fv = &writeDeadlineFn; pd->wt.when = pd->wd; pd->wt.arg.type = nil; // should be *pollDesc type descriptor. pd->wt.arg.data = pd; pd->wt.seq = pd->seq; runtime_addtimer(&pd->wt); } } // If we set the new deadline in the past, unblock currently pending IO if any. rg = nil; runtime_atomicstorep(&wg, nil); // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock if(pd->rd < 0) rg = netpollunblock(pd, 'r', false); if(pd->wd < 0) wg = netpollunblock(pd, 'w', false); runtime_unlock(pd); if(rg) runtime_ready(rg); if(wg) runtime_ready(wg); } func runtime_pollUnblock(pd *PollDesc) { G *rg, *wg; runtime_lock(pd); if(pd->closing) runtime_throw("runtime_pollUnblock: already closing"); pd->closing = true; pd->seq++; runtime_atomicstorep(&rg, nil); // full memory barrier between store to closing and read of rg/wg in netpollunblock rg = netpollunblock(pd, 'r', false); wg = netpollunblock(pd, 'w', false); if(pd->rt.fv) { runtime_deltimer(&pd->rt); pd->rt.fv = nil; } if(pd->wt.fv) { runtime_deltimer(&pd->wt); pd->wt.fv = nil; } runtime_unlock(pd); if(rg) runtime_ready(rg); if(wg) runtime_ready(wg); } uintptr runtime_netpollfd(PollDesc *pd) { return pd->fd; } void** runtime_netpolluser(PollDesc *pd) { return &pd->user; } bool runtime_netpollclosing(PollDesc *pd) { return pd->closing; } void runtime_netpolllock(PollDesc *pd) { runtime_lock(pd); } void runtime_netpollunlock(PollDesc *pd) { runtime_unlock(pd); } // make pd ready, newly runnable goroutines (if any) are enqueued info gpp list void runtime_netpollready(G **gpp, PollDesc *pd, int32 mode) { G *rg, *wg; rg = wg = nil; if(mode == 'r' || mode == 'r'+'w') rg = netpollunblock(pd, 'r', true); if(mode == 'w' || mode == 'r'+'w') wg = netpollunblock(pd, 'w', true); if(rg) { rg->schedlink = (uintptr)*gpp; *gpp = rg; } if(wg) { wg->schedlink = (uintptr)*gpp; *gpp = wg; } } static intgo checkerr(PollDesc *pd, int32 mode) { if(pd->closing) return 1; // errClosing if((mode == 'r' && pd->rd < 0) || (mode == 'w' && pd->wd < 0)) return 2; // errTimeout return 0; } static bool blockcommit(G *gp, G **gpp) { return runtime_casp(gpp, WAIT, gp); } // returns true if IO is ready, or false if timedout or closed // waitio - wait only for completed IO, ignore errors static bool netpollblock(PollDesc *pd, int32 mode, bool waitio) { G **gpp, *old; gpp = &pd->rg; if(mode == 'w') gpp = &pd->wg; // set the gpp semaphore to WAIT for(;;) { old = *gpp; if(old == READY) { *gpp = nil; return true; } if(old != nil) runtime_throw("netpollblock: double wait"); if(runtime_casp(gpp, nil, WAIT)) break; } // need to recheck error states after setting gpp to WAIT // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg if(waitio || checkerr(pd, mode) == 0) runtime_park((bool(*)(G*, void*))blockcommit, gpp, "IO wait"); // be careful to not lose concurrent READY notification old = runtime_xchgp(gpp, nil); if(old > WAIT) runtime_throw("netpollblock: corrupted state"); return old == READY; } static G* netpollunblock(PollDesc *pd, int32 mode, bool ioready) { G **gpp, *old, *new; gpp = &pd->rg; if(mode == 'w') gpp = &pd->wg; for(;;) { old = *gpp; if(old == READY) return nil; if(old == nil && !ioready) { // Only set READY for ioready. runtime_pollWait // will check for timeout/cancel before waiting. return nil; } new = nil; if(ioready) new = READY; if(runtime_casp(gpp, old, new)) break; } if(old > WAIT) return old; // must be G* return nil; } static void deadlineimpl(Eface arg, uintptr seq, bool read, bool write) { PollDesc *pd; G *rg, *wg; pd = (PollDesc*)arg.data; rg = wg = nil; runtime_lock(pd); // Seq arg is seq when the timer was set. // If it's stale, ignore the timer event. if(seq != pd->seq) { // The descriptor was reused or timers were reset. runtime_unlock(pd); return; } if(read) { if(pd->rd <= 0 || pd->rt.fv == nil) runtime_throw("deadlineimpl: inconsistent read deadline"); pd->rd = -1; runtime_atomicstorep(&pd->rt.fv, nil); // full memory barrier between store to rd and load of rg in netpollunblock rg = netpollunblock(pd, 'r', false); } if(write) { if(pd->wd <= 0 || (pd->wt.fv == nil && !read)) runtime_throw("deadlineimpl: inconsistent write deadline"); pd->wd = -1; runtime_atomicstorep(&pd->wt.fv, nil); // full memory barrier between store to wd and load of wg in netpollunblock wg = netpollunblock(pd, 'w', false); } runtime_unlock(pd); if(rg) runtime_ready(rg); if(wg) runtime_ready(wg); } static void deadline(Eface arg, uintptr seq) { deadlineimpl(arg, seq, true, true); } static void readDeadline(Eface arg, uintptr seq) { deadlineimpl(arg, seq, true, false); } static void writeDeadline(Eface arg, uintptr seq) { deadlineimpl(arg, seq, false, true); } static PollDesc* allocPollDesc(void) { PollDesc *pd; uint32 i, n; runtime_lock(&pollcache); if(pollcache.first == nil) { n = PollBlockSize/sizeof(*pd); if(n == 0) n = 1; // Must be in non-GC memory because can be referenced // only from epoll/kqueue internals. pd = runtime_persistentalloc(n*sizeof(*pd), 0, &mstats.other_sys); for(i = 0; i < n; i++) { pd[i].link = pollcache.first; pollcache.first = &pd[i]; } } pd = pollcache.first; pollcache.first = pd->link; runtime_unlock(&pollcache); return pd; }