From 2193ad7fbf3e917a0ef5a2b48e13f84da1be44f1 Mon Sep 17 00:00:00 2001 From: Ian Lance Taylor Date: Mon, 9 Jan 2017 19:37:19 +0000 Subject: [PATCH] runtime: copy more of scheduler from Go 1.7 runtime This started by moving procresize from C to Go so that we can pass the right type to the memory allocator when allocating a p, which forced the gomaxprocs variable to move from C to Go, and everything else followed from that. Reviewed-on: https://go-review.googlesource.com/34916 From-SVN: r244236 --- gcc/go/gofrontend/MERGE | 2 +- libgo/go/runtime/debug.go | 20 +- libgo/go/runtime/export_test.go | 62 +- libgo/go/runtime/lock_futex.go | 21 +- libgo/go/runtime/lock_sema.go | 13 +- libgo/go/runtime/proc.go | 1744 ++++++++++++++++++++++++++++++- libgo/go/runtime/proc_test.go | 10 +- libgo/go/runtime/runtime2.go | 36 +- libgo/go/runtime/stubs.go | 88 +- libgo/go/runtime/trace.go | 75 +- libgo/runtime/heapdump.c | 8 +- libgo/runtime/malloc.goc | 3 +- libgo/runtime/malloc.h | 21 +- libgo/runtime/mgc0.c | 24 +- libgo/runtime/proc.c | 1400 ++----------------------- libgo/runtime/runtime.h | 12 +- 16 files changed, 1997 insertions(+), 1542 deletions(-) diff --git a/gcc/go/gofrontend/MERGE b/gcc/go/gofrontend/MERGE index d42c54072a8..2bbbc0f3292 100644 --- a/gcc/go/gofrontend/MERGE +++ b/gcc/go/gofrontend/MERGE @@ -1,4 +1,4 @@ -eef0fb3b092dc22d9830cac15a536760da5d033a +189ea81cc758e000325fd6cca7882c252d33f8f0 The first line of this file holds the git revision number of the last merge done from the gofrontend repository. diff --git a/libgo/go/runtime/debug.go b/libgo/go/runtime/debug.go index 43f6e1e00f6..55937ff8f05 100644 --- a/libgo/go/runtime/debug.go +++ b/libgo/go/runtime/debug.go @@ -14,7 +14,25 @@ import ( // change the current setting. // The number of logical CPUs on the local machine can be queried with NumCPU. // This call will go away when the scheduler improves. -func GOMAXPROCS(n int) int +func GOMAXPROCS(n int) int { + if n > _MaxGomaxprocs { + n = _MaxGomaxprocs + } + lock(&sched.lock) + ret := int(gomaxprocs) + unlock(&sched.lock) + if n <= 0 || n == ret { + return ret + } + + stopTheWorld("GOMAXPROCS") + + // newprocs will be processed by startTheWorld + newprocs = int32(n) + + startTheWorld() + return ret +} // NumCPU returns the number of logical CPUs usable by the current process. // diff --git a/libgo/go/runtime/export_test.go b/libgo/go/runtime/export_test.go index b8b129d815d..36e6256992e 100644 --- a/libgo/go/runtime/export_test.go +++ b/libgo/go/runtime/export_test.go @@ -7,6 +7,7 @@ package runtime import ( + "runtime/internal/atomic" "unsafe" ) @@ -47,39 +48,6 @@ func GCMask(x interface{}) (ret []byte) { return nil } -//func testSchedLocalQueue() -//func testSchedLocalQueueSteal() -// -//func RunSchedLocalQueueTest() { -// testSchedLocalQueue() -//} -// -//func RunSchedLocalQueueStealTest() { -// testSchedLocalQueueSteal() -//} - -//var StringHash = stringHash -//var BytesHash = bytesHash -//var Int32Hash = int32Hash -//var Int64Hash = int64Hash -//var EfaceHash = efaceHash -//var IfaceHash = ifaceHash -//var MemclrBytes = memclrBytes - -var HashLoad = &hashLoad - -// entry point for testing -//func GostringW(w []uint16) (s string) { -// s = gostringw(&w[0]) -// return -//} - -//var Gostringnocopy = gostringnocopy -//var Maxstring = &maxstring - -//type Uintreg uintreg - -/* func RunSchedLocalQueueTest() { _p_ := new(p) gs := make([]g, len(_p_.runq)) @@ -177,14 +145,26 @@ func RunSchedLocalQueueEmptyTest(iters int) { } } -var StringHash = stringHash -var BytesHash = bytesHash -var Int32Hash = int32Hash -var Int64Hash = int64Hash -var EfaceHash = efaceHash -var IfaceHash = ifaceHash -var MemclrBytes = memclrBytes -*/ +//var StringHash = stringHash +//var BytesHash = bytesHash +//var Int32Hash = int32Hash +//var Int64Hash = int64Hash +//var EfaceHash = efaceHash +//var IfaceHash = ifaceHash +//var MemclrBytes = memclrBytes + +var HashLoad = &hashLoad + +// entry point for testing +//func GostringW(w []uint16) (s string) { +// s = gostringw(&w[0]) +// return +//} + +//var Gostringnocopy = gostringnocopy +//var Maxstring = &maxstring + +//type Uintreg uintreg var Open = open var Close = closefd diff --git a/libgo/go/runtime/lock_futex.go b/libgo/go/runtime/lock_futex.go index 1ad79111108..4d914b25fad 100644 --- a/libgo/go/runtime/lock_futex.go +++ b/libgo/go/runtime/lock_futex.go @@ -149,13 +149,9 @@ func notewakeup(n *note) { func notesleep(n *note) { gp := getg() - - // Currently OK to sleep in non-g0 for gccgo. It happens in - // stoptheworld because we have not implemented preemption. - // if gp != gp.m.g0 { - // throw("notesleep not on g0") - // } - + if gp != gp.m.g0 { + throw("notesleep not on g0") + } for atomic.Load(key32(&n.key)) == 0 { gp.m.blocked = true futexsleep(key32(&n.key), 0, -1) @@ -202,10 +198,13 @@ func notetsleep_internal(n *note, ns int64) bool { } func notetsleep(n *note, ns int64) bool { - gp := getg() - if gp != gp.m.g0 && gp.m.preemptoff != "" { - throw("notetsleep not on g0") - } + // Currently OK to sleep in non-g0 for gccgo. It happens in + // stoptheworld because our version of systemstack does not + // change to g0. + // gp := getg() + // if gp != gp.m.g0 && gp.m.preemptoff != "" { + // throw("notetsleep not on g0") + // } return notetsleep_internal(n, ns) } diff --git a/libgo/go/runtime/lock_sema.go b/libgo/go/runtime/lock_sema.go index eaf938a9bb3..5c70a747dab 100644 --- a/libgo/go/runtime/lock_sema.go +++ b/libgo/go/runtime/lock_sema.go @@ -162,13 +162,9 @@ func notewakeup(n *note) { func notesleep(n *note) { gp := getg() - - // Currently OK to sleep in non-g0 for gccgo. It happens in - // stoptheworld because we have not implemented preemption. - // if gp != gp.m.g0 { - // throw("notesleep not on g0") - // } - + if gp != gp.m.g0 { + throw("notesleep not on g0") + } semacreate(gp.m) if !atomic.Casuintptr(&n.key, 0, uintptr(unsafe.Pointer(gp.m))) { // Must be locked (got wakeup). @@ -257,7 +253,8 @@ func notetsleep(n *note, ns int64) bool { gp := getg() // Currently OK to sleep in non-g0 for gccgo. It happens in - // stoptheworld because we have not implemented preemption. + // stoptheworld because our version of systemstack does not + // change to g0. // if gp != gp.m.g0 && gp.m.preemptoff != "" { // throw("notetsleep not on g0") // } diff --git a/libgo/go/runtime/proc.go b/libgo/go/runtime/proc.go index 78cc6ee7d8a..659b17d1907 100644 --- a/libgo/go/runtime/proc.go +++ b/libgo/go/runtime/proc.go @@ -11,15 +11,45 @@ import ( // Functions temporarily called by C code. //go:linkname newextram runtime.newextram +//go:linkname acquirep runtime.acquirep +//go:linkname releasep runtime.releasep +//go:linkname incidlelocked runtime.incidlelocked //go:linkname checkdead runtime.checkdead +//go:linkname sysmon runtime.sysmon //go:linkname schedtrace runtime.schedtrace //go:linkname allgadd runtime.allgadd +//go:linkname ready runtime.ready +//go:linkname gcprocs runtime.gcprocs +//go:linkname needaddgcproc runtime.needaddgcproc +//go:linkname stopm runtime.stopm +//go:linkname handoffp runtime.handoffp +//go:linkname wakep runtime.wakep +//go:linkname stoplockedm runtime.stoplockedm +//go:linkname schedule runtime.schedule +//go:linkname execute runtime.execute +//go:linkname procresize runtime.procresize +//go:linkname helpgc runtime.helpgc +//go:linkname stopTheWorldWithSema runtime.stopTheWorldWithSema +//go:linkname startTheWorldWithSema runtime.startTheWorldWithSema +//go:linkname mput runtime.mput +//go:linkname mget runtime.mget +//go:linkname globrunqput runtime.globrunqput +//go:linkname pidleget runtime.pidleget +//go:linkname runqempty runtime.runqempty +//go:linkname runqput runtime.runqput // Functions temporarily in C that have not yet been ported. func allocm(*p, bool, *unsafe.Pointer, *uintptr) *m func malg(bool, bool, *unsafe.Pointer, *uintptr) *g +func startm(*p, bool) +func newm(unsafe.Pointer, *p) +func gchelper() +func getfingwait() bool +func getfingwake() bool +func wakefing() *g // C functions for ucontext management. +func gogo(*g) func setGContext() func makeGContext(*g, unsafe.Pointer, uintptr) func getTraceback(me, gp *g) @@ -30,6 +60,12 @@ func getTraceback(me, gp *g) // it is closed, meaning cgocallbackg can reliably receive from it. var main_init_done chan bool +func goready(gp *g, traceskip int) { + systemstack(func() { + ready(gp, traceskip, true) + }) +} + var ( allgs []*g allglock mutex @@ -56,6 +92,117 @@ func allgadd(gp *g) { unlock(&allglock) } +func dumpgstatus(gp *g) { + _g_ := getg() + print("runtime: gp: gp=", gp, ", goid=", gp.goid, ", gp->atomicstatus=", readgstatus(gp), "\n") + print("runtime: g: g=", _g_, ", goid=", _g_.goid, ", g->atomicstatus=", readgstatus(_g_), "\n") +} + +// Mark gp ready to run. +func ready(gp *g, traceskip int, next bool) { + if trace.enabled { + traceGoUnpark(gp, traceskip) + } + + status := readgstatus(gp) + + // Mark runnable. + _g_ := getg() + _g_.m.locks++ // disable preemption because it can be holding p in a local var + if status&^_Gscan != _Gwaiting { + dumpgstatus(gp) + throw("bad g->status in ready") + } + + // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq + casgstatus(gp, _Gwaiting, _Grunnable) + runqput(_g_.m.p.ptr(), gp, next) + if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { // TODO: fast atomic + wakep() + } + _g_.m.locks-- +} + +func gcprocs() int32 { + // Figure out how many CPUs to use during GC. + // Limited by gomaxprocs, number of actual CPUs, and MaxGcproc. + lock(&sched.lock) + n := gomaxprocs + if n > ncpu { + n = ncpu + } + if n > _MaxGcproc { + n = _MaxGcproc + } + if n > sched.nmidle+1 { // one M is currently running + n = sched.nmidle + 1 + } + unlock(&sched.lock) + return n +} + +func needaddgcproc() bool { + lock(&sched.lock) + n := gomaxprocs + if n > ncpu { + n = ncpu + } + if n > _MaxGcproc { + n = _MaxGcproc + } + n -= sched.nmidle + 1 // one M is currently running + unlock(&sched.lock) + return n > 0 +} + +func helpgc(nproc int32) { + _g_ := getg() + lock(&sched.lock) + pos := 0 + for n := int32(1); n < nproc; n++ { // one M is currently running + if allp[pos].mcache == _g_.m.mcache { + pos++ + } + mp := mget() + if mp == nil { + throw("gcprocs inconsistency") + } + mp.helpgc = n + mp.p.set(allp[pos]) + mp.mcache = allp[pos].mcache + pos++ + notewakeup(&mp.park) + } + unlock(&sched.lock) +} + +// freezeStopWait is a large value that freezetheworld sets +// sched.stopwait to in order to request that all Gs permanently stop. +const freezeStopWait = 0x7fffffff + +// Similar to stopTheWorld but best-effort and can be called several times. +// There is no reverse operation, used during crashing. +// This function must not lock any mutexes. +func freezetheworld() { + // stopwait and preemption requests can be lost + // due to races with concurrently executing threads, + // so try several times + for i := 0; i < 5; i++ { + // this should tell the scheduler to not start any new goroutines + sched.stopwait = freezeStopWait + atomic.Store(&sched.gcwaiting, 1) + // this should stop running goroutines + if !preemptall() { + break // no running goroutines + } + usleep(1000) + } + // to be sure + usleep(1000) + preemptall() + usleep(1000) +} + // All reads and writes of g's status go through readgstatus, casgstatus // castogscanstatus, casfrom_Gscanstatus. //go:nosplit @@ -123,6 +270,217 @@ func casgstatus(gp *g, oldval, newval uint32) { } } +// stopTheWorld stops all P's from executing goroutines, interrupting +// all goroutines at GC safe points and records reason as the reason +// for the stop. On return, only the current goroutine's P is running. +// stopTheWorld must not be called from a system stack and the caller +// must not hold worldsema. The caller must call startTheWorld when +// other P's should resume execution. +// +// stopTheWorld is safe for multiple goroutines to call at the +// same time. Each will execute its own stop, and the stops will +// be serialized. +// +// This is also used by routines that do stack dumps. If the system is +// in panic or being exited, this may not reliably stop all +// goroutines. +func stopTheWorld(reason string) { + semacquire(&worldsema, false) + getg().m.preemptoff = reason + systemstack(stopTheWorldWithSema) +} + +// startTheWorld undoes the effects of stopTheWorld. +func startTheWorld() { + systemstack(startTheWorldWithSema) + // worldsema must be held over startTheWorldWithSema to ensure + // gomaxprocs cannot change while worldsema is held. + semrelease(&worldsema) + getg().m.preemptoff = "" +} + +// Holding worldsema grants an M the right to try to stop the world +// and prevents gomaxprocs from changing concurrently. +var worldsema uint32 = 1 + +// stopTheWorldWithSema is the core implementation of stopTheWorld. +// The caller is responsible for acquiring worldsema and disabling +// preemption first and then should stopTheWorldWithSema on the system +// stack: +// +// semacquire(&worldsema, false) +// m.preemptoff = "reason" +// systemstack(stopTheWorldWithSema) +// +// When finished, the caller must either call startTheWorld or undo +// these three operations separately: +// +// m.preemptoff = "" +// systemstack(startTheWorldWithSema) +// semrelease(&worldsema) +// +// It is allowed to acquire worldsema once and then execute multiple +// startTheWorldWithSema/stopTheWorldWithSema pairs. +// Other P's are able to execute between successive calls to +// startTheWorldWithSema and stopTheWorldWithSema. +// Holding worldsema causes any other goroutines invoking +// stopTheWorld to block. +func stopTheWorldWithSema() { + _g_ := getg() + + // If we hold a lock, then we won't be able to stop another M + // that is blocked trying to acquire the lock. + if _g_.m.locks > 0 { + throw("stopTheWorld: holding locks") + } + + lock(&sched.lock) + sched.stopwait = gomaxprocs + atomic.Store(&sched.gcwaiting, 1) + preemptall() + // stop current P + _g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic. + sched.stopwait-- + // try to retake all P's in Psyscall status + for i := 0; i < int(gomaxprocs); i++ { + p := allp[i] + s := p.status + if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) { + if trace.enabled { + traceGoSysBlock(p) + traceProcStop(p) + } + p.syscalltick++ + sched.stopwait-- + } + } + // stop idle P's + for { + p := pidleget() + if p == nil { + break + } + p.status = _Pgcstop + sched.stopwait-- + } + wait := sched.stopwait > 0 + unlock(&sched.lock) + + // wait for remaining P's to stop voluntarily + if wait { + for { + // wait for 100us, then try to re-preempt in case of any races + if notetsleep(&sched.stopnote, 100*1000) { + noteclear(&sched.stopnote) + break + } + preemptall() + } + } + if sched.stopwait != 0 { + throw("stopTheWorld: not stopped") + } + for i := 0; i < int(gomaxprocs); i++ { + p := allp[i] + if p.status != _Pgcstop { + throw("stopTheWorld: not stopped") + } + } +} + +func mhelpgc() { + _g_ := getg() + _g_.m.helpgc = -1 +} + +func startTheWorldWithSema() { + _g_ := getg() + + _g_.m.locks++ // disable preemption because it can be holding p in a local var + gp := netpoll(false) // non-blocking + injectglist(gp) + add := needaddgcproc() + lock(&sched.lock) + + procs := gomaxprocs + if newprocs != 0 { + procs = newprocs + newprocs = 0 + } + p1 := procresize(procs) + sched.gcwaiting = 0 + if sched.sysmonwait != 0 { + sched.sysmonwait = 0 + notewakeup(&sched.sysmonnote) + } + unlock(&sched.lock) + + for p1 != nil { + p := p1 + p1 = p1.link.ptr() + if p.m != 0 { + mp := p.m.ptr() + p.m = 0 + if mp.nextp != 0 { + throw("startTheWorld: inconsistent mp->nextp") + } + mp.nextp.set(p) + notewakeup(&mp.park) + } else { + // Start M to run P. Do not start another M below. + newm(nil, p) + add = false + } + } + + // Wakeup an additional proc in case we have excessive runnable goroutines + // in local queues or in the global queue. If we don't, the proc will park itself. + // If we have lots of excessive work, resetspinning will unpark additional procs as necessary. + if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { + wakep() + } + + if add { + // If GC could have used another helper proc, start one now, + // in the hope that it will be available next time. + // It would have been even better to start it before the collection, + // but doing so requires allocating memory, so it's tricky to + // coordinate. This lazy approach works out in practice: + // we don't mind if the first couple gc rounds don't have quite + // the maximum number of procs. + newm(unsafe.Pointer(funcPC(mhelpgc)), nil) + } + _g_.m.locks-- +} + +// runSafePointFn runs the safe point function, if any, for this P. +// This should be called like +// +// if getg().m.p.runSafePointFn != 0 { +// runSafePointFn() +// } +// +// runSafePointFn must be checked on any transition in to _Pidle or +// _Psyscall to avoid a race where forEachP sees that the P is running +// just before the P goes into _Pidle/_Psyscall and neither forEachP +// nor the P run the safe-point function. +func runSafePointFn() { + p := getg().m.p.ptr() + // Resolve the race between forEachP running the safe-point + // function on this P's behalf and this P running the + // safe-point function directly. + if !atomic.Cas(&p.runSafePointFn, 1, 0) { + return + } + sched.safePointFn(p) + lock(&sched.lock) + sched.safePointWait-- + if sched.safePointWait == 0 { + notewakeup(&sched.safePointNote) + } + unlock(&sched.lock) +} + // needm is called when a cgo callback happens on a // thread without an m (a thread not created by Go). // In this case, needm is expected to find an m to use @@ -245,9 +603,6 @@ func oneNewExtraM() { mp.lockedg = gp gp.lockedm = mp gp.goid = int64(atomic.Xadd64(&sched.goidgen, 1)) - if raceenabled { - gp.racectx = racegostart(funcPC(newextram)) - } // put on allg for garbage collector allgadd(gp) @@ -365,6 +720,744 @@ func unlockextra(mp *m) { atomic.Storeuintptr(&extram, uintptr(unsafe.Pointer(mp))) } +// Stops execution of the current m until new work is available. +// Returns with acquired P. +func stopm() { + _g_ := getg() + + if _g_.m.locks != 0 { + throw("stopm holding locks") + } + if _g_.m.p != 0 { + throw("stopm holding p") + } + if _g_.m.spinning { + throw("stopm spinning") + } + +retry: + lock(&sched.lock) + mput(_g_.m) + unlock(&sched.lock) + notesleep(&_g_.m.park) + noteclear(&_g_.m.park) + if _g_.m.helpgc != 0 { + gchelper() + _g_.m.helpgc = 0 + _g_.m.mcache = nil + _g_.m.p = 0 + goto retry + } + acquirep(_g_.m.nextp.ptr()) + _g_.m.nextp = 0 +} + +// Hands off P from syscall or locked M. +// Always runs without a P, so write barriers are not allowed. +//go:nowritebarrier +func handoffp(_p_ *p) { + // handoffp must start an M in any situation where + // findrunnable would return a G to run on _p_. + + // if it has local work, start it straight away + if !runqempty(_p_) || sched.runqsize != 0 { + startm(_p_, false) + return + } + // if it has GC work, start it straight away + if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) { + startm(_p_, false) + return + } + // no local work, check that there are no spinning/idle M's, + // otherwise our help is not required + if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic + startm(_p_, true) + return + } + lock(&sched.lock) + if sched.gcwaiting != 0 { + _p_.status = _Pgcstop + sched.stopwait-- + if sched.stopwait == 0 { + notewakeup(&sched.stopnote) + } + unlock(&sched.lock) + return + } + if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) { + sched.safePointFn(_p_) + sched.safePointWait-- + if sched.safePointWait == 0 { + notewakeup(&sched.safePointNote) + } + } + if sched.runqsize != 0 { + unlock(&sched.lock) + startm(_p_, false) + return + } + // If this is the last running P and nobody is polling network, + // need to wakeup another M to poll network. + if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 { + unlock(&sched.lock) + startm(_p_, false) + return + } + pidleput(_p_) + unlock(&sched.lock) +} + +// Tries to add one more P to execute G's. +// Called when a G is made runnable (newproc, ready). +func wakep() { + // be conservative about spinning threads + if !atomic.Cas(&sched.nmspinning, 0, 1) { + return + } + startm(nil, true) +} + +// Stops execution of the current m that is locked to a g until the g is runnable again. +// Returns with acquired P. +func stoplockedm() { + _g_ := getg() + + if _g_.m.lockedg == nil || _g_.m.lockedg.lockedm != _g_.m { + throw("stoplockedm: inconsistent locking") + } + if _g_.m.p != 0 { + // Schedule another M to run this p. + _p_ := releasep() + handoffp(_p_) + } + incidlelocked(1) + // Wait until another thread schedules lockedg again. + notesleep(&_g_.m.park) + noteclear(&_g_.m.park) + status := readgstatus(_g_.m.lockedg) + if status&^_Gscan != _Grunnable { + print("runtime:stoplockedm: g is not Grunnable or Gscanrunnable\n") + dumpgstatus(_g_) + throw("stoplockedm: not runnable") + } + acquirep(_g_.m.nextp.ptr()) + _g_.m.nextp = 0 +} + +// Schedules the locked m to run the locked gp. +// May run during STW, so write barriers are not allowed. +//go:nowritebarrier +func startlockedm(gp *g) { + _g_ := getg() + + mp := gp.lockedm + if mp == _g_.m { + throw("startlockedm: locked to me") + } + if mp.nextp != 0 { + throw("startlockedm: m has p") + } + // directly handoff current P to the locked m + incidlelocked(-1) + _p_ := releasep() + mp.nextp.set(_p_) + notewakeup(&mp.park) + stopm() +} + +// Stops the current m for stopTheWorld. +// Returns when the world is restarted. +func gcstopm() { + _g_ := getg() + + if sched.gcwaiting == 0 { + throw("gcstopm: not waiting for gc") + } + if _g_.m.spinning { + _g_.m.spinning = false + // OK to just drop nmspinning here, + // startTheWorld will unpark threads as necessary. + if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { + throw("gcstopm: negative nmspinning") + } + } + _p_ := releasep() + lock(&sched.lock) + _p_.status = _Pgcstop + sched.stopwait-- + if sched.stopwait == 0 { + notewakeup(&sched.stopnote) + } + unlock(&sched.lock) + stopm() +} + +// Schedules gp to run on the current M. +// If inheritTime is true, gp inherits the remaining time in the +// current time slice. Otherwise, it starts a new time slice. +// Never returns. +func execute(gp *g, inheritTime bool) { + _g_ := getg() + + casgstatus(gp, _Grunnable, _Grunning) + gp.waitsince = 0 + gp.preempt = false + if !inheritTime { + _g_.m.p.ptr().schedtick++ + } + _g_.m.curg = gp + gp.m = _g_.m + + // Check whether the profiler needs to be turned on or off. + hz := sched.profilehz + if _g_.m.profilehz != hz { + resetcpuprofiler(hz) + } + + if trace.enabled { + // GoSysExit has to happen when we have a P, but before GoStart. + // So we emit it here. + if gp.syscallsp != 0 && gp.sysblocktraced { + traceGoSysExit(gp.sysexitticks) + } + traceGoStart() + } + + gogo(gp) +} + +// Finds a runnable goroutine to execute. +// Tries to steal from other P's, get g from global queue, poll network. +func findrunnable() (gp *g, inheritTime bool) { + _g_ := getg() + + // The conditions here and in handoffp must agree: if + // findrunnable would return a G to run, handoffp must start + // an M. + +top: + _p_ := _g_.m.p.ptr() + if sched.gcwaiting != 0 { + gcstopm() + goto top + } + if _p_.runSafePointFn != 0 { + runSafePointFn() + } + if getfingwait() && getfingwake() { + if gp := wakefing(); gp != nil { + ready(gp, 0, true) + } + } + + // local runq + if gp, inheritTime := runqget(_p_); gp != nil { + return gp, inheritTime + } + + // global runq + if sched.runqsize != 0 { + lock(&sched.lock) + gp := globrunqget(_p_, 0) + unlock(&sched.lock) + if gp != nil { + return gp, false + } + } + + // Poll network. + // This netpoll is only an optimization before we resort to stealing. + // We can safely skip it if there a thread blocked in netpoll already. + // If there is any kind of logical race with that blocked thread + // (e.g. it has already returned from netpoll, but does not set lastpoll yet), + // this thread will do blocking netpoll below anyway. + if netpollinited() && sched.lastpoll != 0 { + if gp := netpoll(false); gp != nil { // non-blocking + // netpoll returns list of goroutines linked by schedlink. + injectglist(gp.schedlink.ptr()) + casgstatus(gp, _Gwaiting, _Grunnable) + if trace.enabled { + traceGoUnpark(gp, 0) + } + return gp, false + } + } + + // Steal work from other P's. + procs := uint32(gomaxprocs) + if atomic.Load(&sched.npidle) == procs-1 { + // Either GOMAXPROCS=1 or everybody, except for us, is idle already. + // New work can appear from returning syscall/cgocall, network or timers. + // Neither of that submits to local run queues, so no point in stealing. + goto stop + } + // If number of spinning M's >= number of busy P's, block. + // This is necessary to prevent excessive CPU consumption + // when GOMAXPROCS>>1 but the program parallelism is low. + if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { // TODO: fast atomic + goto stop + } + if !_g_.m.spinning { + _g_.m.spinning = true + atomic.Xadd(&sched.nmspinning, 1) + } + for i := 0; i < 4; i++ { + for enum := stealOrder.start(fastrand1()); !enum.done(); enum.next() { + if sched.gcwaiting != 0 { + goto top + } + stealRunNextG := i > 2 // first look for ready queues with more than 1 g + if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { + return gp, false + } + } + } + +stop: + + // We have nothing to do. If we're in the GC mark phase, can + // safely scan and blacken objects, and have work to do, run + // idle-time marking rather than give up the P. + if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) { + _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode + gp := _p_.gcBgMarkWorker.ptr() + casgstatus(gp, _Gwaiting, _Grunnable) + if trace.enabled { + traceGoUnpark(gp, 0) + } + return gp, false + } + + // return P and block + lock(&sched.lock) + if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { + unlock(&sched.lock) + goto top + } + if sched.runqsize != 0 { + gp := globrunqget(_p_, 0) + unlock(&sched.lock) + return gp, false + } + if releasep() != _p_ { + throw("findrunnable: wrong p") + } + pidleput(_p_) + unlock(&sched.lock) + + // Delicate dance: thread transitions from spinning to non-spinning state, + // potentially concurrently with submission of new goroutines. We must + // drop nmspinning first and then check all per-P queues again (with + // #StoreLoad memory barrier in between). If we do it the other way around, + // another thread can submit a goroutine after we've checked all run queues + // but before we drop nmspinning; as the result nobody will unpark a thread + // to run the goroutine. + // If we discover new work below, we need to restore m.spinning as a signal + // for resetspinning to unpark a new worker thread (because there can be more + // than one starving goroutine). However, if after discovering new work + // we also observe no idle Ps, it is OK to just park the current thread: + // the system is fully loaded so no spinning threads are required. + // Also see "Worker thread parking/unparking" comment at the top of the file. + wasSpinning := _g_.m.spinning + if _g_.m.spinning { + _g_.m.spinning = false + if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { + throw("findrunnable: negative nmspinning") + } + } + + // check all runqueues once again + for i := 0; i < int(gomaxprocs); i++ { + _p_ := allp[i] + if _p_ != nil && !runqempty(_p_) { + lock(&sched.lock) + _p_ = pidleget() + unlock(&sched.lock) + if _p_ != nil { + acquirep(_p_) + if wasSpinning { + _g_.m.spinning = true + atomic.Xadd(&sched.nmspinning, 1) + } + goto top + } + break + } + } + + // poll network + if netpollinited() && atomic.Xchg64(&sched.lastpoll, 0) != 0 { + if _g_.m.p != 0 { + throw("findrunnable: netpoll with p") + } + if _g_.m.spinning { + throw("findrunnable: netpoll with spinning") + } + gp := netpoll(true) // block until new work is available + atomic.Store64(&sched.lastpoll, uint64(nanotime())) + if gp != nil { + lock(&sched.lock) + _p_ = pidleget() + unlock(&sched.lock) + if _p_ != nil { + acquirep(_p_) + injectglist(gp.schedlink.ptr()) + casgstatus(gp, _Gwaiting, _Grunnable) + if trace.enabled { + traceGoUnpark(gp, 0) + } + return gp, false + } + injectglist(gp) + } + } + stopm() + goto top +} + +func resetspinning() { + _g_ := getg() + if !_g_.m.spinning { + throw("resetspinning: not a spinning m") + } + _g_.m.spinning = false + nmspinning := atomic.Xadd(&sched.nmspinning, -1) + if int32(nmspinning) < 0 { + throw("findrunnable: negative nmspinning") + } + // M wakeup policy is deliberately somewhat conservative, so check if we + // need to wakeup another P here. See "Worker thread parking/unparking" + // comment at the top of the file for details. + if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 { + wakep() + } +} + +// Injects the list of runnable G's into the scheduler. +// Can run concurrently with GC. +func injectglist(glist *g) { + if glist == nil { + return + } + if trace.enabled { + for gp := glist; gp != nil; gp = gp.schedlink.ptr() { + traceGoUnpark(gp, 0) + } + } + lock(&sched.lock) + var n int + for n = 0; glist != nil; n++ { + gp := glist + glist = gp.schedlink.ptr() + casgstatus(gp, _Gwaiting, _Grunnable) + globrunqput(gp) + } + unlock(&sched.lock) + for ; n != 0 && sched.npidle != 0; n-- { + startm(nil, false) + } +} + +// One round of scheduler: find a runnable goroutine and execute it. +// Never returns. +func schedule() { + _g_ := getg() + + if _g_.m.locks != 0 { + throw("schedule: holding locks") + } + + if _g_.m.lockedg != nil { + stoplockedm() + execute(_g_.m.lockedg, false) // Never returns. + } + +top: + if sched.gcwaiting != 0 { + gcstopm() + goto top + } + if _g_.m.p.ptr().runSafePointFn != 0 { + runSafePointFn() + } + + var gp *g + var inheritTime bool + if trace.enabled || trace.shutdown { + gp = traceReader() + if gp != nil { + casgstatus(gp, _Gwaiting, _Grunnable) + traceGoUnpark(gp, 0) + } + } + if gp == nil && gcBlackenEnabled != 0 { + gp = gcController.findRunnableGCWorker(_g_.m.p.ptr()) + } + if gp == nil { + // Check the global runnable queue once in a while to ensure fairness. + // Otherwise two goroutines can completely occupy the local runqueue + // by constantly respawning each other. + if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { + lock(&sched.lock) + gp = globrunqget(_g_.m.p.ptr(), 1) + unlock(&sched.lock) + } + } + if gp == nil { + gp, inheritTime = runqget(_g_.m.p.ptr()) + if gp != nil && _g_.m.spinning { + throw("schedule: spinning with local work") + } + + // Because gccgo does not implement preemption as a stack check, + // we need to check for preemption here for fairness. + // Otherwise goroutines on the local queue may starve + // goroutines on the global queue. + // Since we preempt by storing the goroutine on the global + // queue, this is the only place we need to check preempt. + if gp != nil && gp.preempt { + gp.preempt = false + lock(&sched.lock) + globrunqput(gp) + unlock(&sched.lock) + goto top + } + } + if gp == nil { + gp, inheritTime = findrunnable() // blocks until work is available + } + + // This thread is going to run a goroutine and is not spinning anymore, + // so if it was marked as spinning we need to reset it now and potentially + // start a new spinning M. + if _g_.m.spinning { + resetspinning() + } + + if gp.lockedm != nil { + // Hands off own p to the locked m, + // then blocks waiting for a new p. + startlockedm(gp) + goto top + } + + execute(gp, inheritTime) +} + +// Purge all cached G's from gfree list to the global list. +func gfpurge(_p_ *p) { + lock(&sched.gflock) + for _p_.gfreecnt != 0 { + _p_.gfreecnt-- + gp := _p_.gfree + _p_.gfree = gp.schedlink.ptr() + gp.schedlink.set(sched.gfree) + sched.gfree = gp + sched.ngfree++ + } + unlock(&sched.gflock) +} + +// Change number of processors. The world is stopped, sched is locked. +// gcworkbufs are not being modified by either the GC or +// the write barrier code. +// Returns list of Ps with local work, they need to be scheduled by the caller. +func procresize(nprocs int32) *p { + old := gomaxprocs + if old < 0 || old > _MaxGomaxprocs || nprocs <= 0 || nprocs > _MaxGomaxprocs { + throw("procresize: invalid arg") + } + if trace.enabled { + traceGomaxprocs(nprocs) + } + + // update statistics + now := nanotime() + if sched.procresizetime != 0 { + sched.totaltime += int64(old) * (now - sched.procresizetime) + } + sched.procresizetime = now + + // initialize new P's + for i := int32(0); i < nprocs; i++ { + pp := allp[i] + if pp == nil { + pp = new(p) + pp.id = i + pp.status = _Pgcstop + pp.sudogcache = pp.sudogbuf[:0] + pp.deferpool = pp.deferpoolbuf[:0] + atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) + } + if pp.mcache == nil { + if old == 0 && i == 0 { + if getg().m.mcache == nil { + throw("missing mcache?") + } + pp.mcache = getg().m.mcache // bootstrap + } else { + pp.mcache = allocmcache() + } + } + } + + // free unused P's + for i := nprocs; i < old; i++ { + p := allp[i] + if trace.enabled { + if p == getg().m.p.ptr() { + // moving to p[0], pretend that we were descheduled + // and then scheduled again to keep the trace sane. + traceGoSched() + traceProcStop(p) + } + } + // move all runnable goroutines to the global queue + for p.runqhead != p.runqtail { + // pop from tail of local queue + p.runqtail-- + gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr() + // push onto head of global queue + globrunqputhead(gp) + } + if p.runnext != 0 { + globrunqputhead(p.runnext.ptr()) + p.runnext = 0 + } + // if there's a background worker, make it runnable and put + // it on the global queue so it can clean itself up + if gp := p.gcBgMarkWorker.ptr(); gp != nil { + casgstatus(gp, _Gwaiting, _Grunnable) + if trace.enabled { + traceGoUnpark(gp, 0) + } + globrunqput(gp) + // This assignment doesn't race because the + // world is stopped. + p.gcBgMarkWorker.set(nil) + } + for i := range p.sudogbuf { + p.sudogbuf[i] = nil + } + p.sudogcache = p.sudogbuf[:0] + for i := range p.deferpoolbuf { + p.deferpoolbuf[i] = nil + } + p.deferpool = p.deferpoolbuf[:0] + freemcache(p.mcache) + p.mcache = nil + gfpurge(p) + traceProcFree(p) + p.status = _Pdead + // can't free P itself because it can be referenced by an M in syscall + } + + _g_ := getg() + if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs { + // continue to use the current P + _g_.m.p.ptr().status = _Prunning + } else { + // release the current P and acquire allp[0] + if _g_.m.p != 0 { + _g_.m.p.ptr().m = 0 + } + _g_.m.p = 0 + _g_.m.mcache = nil + p := allp[0] + p.m = 0 + p.status = _Pidle + acquirep(p) + if trace.enabled { + traceGoStart() + } + } + var runnablePs *p + for i := nprocs - 1; i >= 0; i-- { + p := allp[i] + if _g_.m.p.ptr() == p { + continue + } + p.status = _Pidle + if runqempty(p) { + pidleput(p) + } else { + p.m.set(mget()) + p.link.set(runnablePs) + runnablePs = p + } + } + stealOrder.reset(uint32(nprocs)) + var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32 + atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs)) + return runnablePs +} + +// Associate p and the current m. +func acquirep(_p_ *p) { + acquirep1(_p_) + + // have p; write barriers now allowed + _g_ := getg() + _g_.m.mcache = _p_.mcache + + if trace.enabled { + traceProcStart() + } +} + +// May run during STW, so write barriers are not allowed. +//go:nowritebarrier +func acquirep1(_p_ *p) { + _g_ := getg() + + if _g_.m.p != 0 || _g_.m.mcache != nil { + throw("acquirep: already in go") + } + if _p_.m != 0 || _p_.status != _Pidle { + id := int32(0) + if _p_.m != 0 { + id = _p_.m.ptr().id + } + print("acquirep: p->m=", _p_.m, "(", id, ") p->status=", _p_.status, "\n") + throw("acquirep: invalid p state") + } + _g_.m.p.set(_p_) + _p_.m.set(_g_.m) + _p_.status = _Prunning +} + +// Disassociate p and the current m. +func releasep() *p { + _g_ := getg() + + if _g_.m.p == 0 || _g_.m.mcache == nil { + throw("releasep: invalid arg") + } + _p_ := _g_.m.p.ptr() + if _p_.m.ptr() != _g_.m || _p_.mcache != _g_.m.mcache || _p_.status != _Prunning { + print("releasep: m=", _g_.m, " m->p=", _g_.m.p.ptr(), " p->m=", _p_.m, " m->mcache=", _g_.m.mcache, " p->mcache=", _p_.mcache, " p->status=", _p_.status, "\n") + throw("releasep: invalid p state") + } + if trace.enabled { + traceProcStop(_g_.m.p.ptr()) + } + _g_.m.p = 0 + _g_.m.mcache = nil + _p_.m = 0 + _p_.status = _Pidle + return _p_ +} + +func incidlelocked(v int32) { + lock(&sched.lock) + sched.nmidlelocked += v + if v > 0 { + checkdead() + } + unlock(&sched.lock) +} + // Check for deadlock situation. // The check is based on number of running M's, if 0 -> deadlock. func checkdead() { @@ -443,6 +1536,236 @@ func checkdead() { throw("all goroutines are asleep - deadlock!") } +// forcegcperiod is the maximum time in nanoseconds between garbage +// collections. If we go this long without a garbage collection, one +// is forced to run. +// +// This is a variable for testing purposes. It normally doesn't change. +var forcegcperiod int64 = 2 * 60 * 1e9 + +// Always runs without a P, so write barriers are not allowed. +// +//go:nowritebarrierrec +func sysmon() { + // If a heap span goes unused for 5 minutes after a garbage collection, + // we hand it back to the operating system. + scavengelimit := int64(5 * 60 * 1e9) + + if debug.scavenge > 0 { + // Scavenge-a-lot for testing. + forcegcperiod = 10 * 1e6 + scavengelimit = 20 * 1e6 + } + + lastscavenge := nanotime() + nscavenge := 0 + + lasttrace := int64(0) + idle := 0 // how many cycles in succession we had not wokeup somebody + delay := uint32(0) + for { + if idle == 0 { // start with 20us sleep... + delay = 20 + } else if idle > 50 { // start doubling the sleep after 1ms... + delay *= 2 + } + if delay > 10*1000 { // up to 10ms + delay = 10 * 1000 + } + usleep(delay) + if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) { // TODO: fast atomic + lock(&sched.lock) + if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) { + atomic.Store(&sched.sysmonwait, 1) + unlock(&sched.lock) + // Make wake-up period small enough + // for the sampling to be correct. + maxsleep := forcegcperiod / 2 + if scavengelimit < forcegcperiod { + maxsleep = scavengelimit / 2 + } + notetsleep(&sched.sysmonnote, maxsleep) + lock(&sched.lock) + atomic.Store(&sched.sysmonwait, 0) + noteclear(&sched.sysmonnote) + idle = 0 + delay = 20 + } + unlock(&sched.lock) + } + // poll network if not polled for more than 10ms + lastpoll := int64(atomic.Load64(&sched.lastpoll)) + now := nanotime() + unixnow := unixnanotime() + if lastpoll != 0 && lastpoll+10*1000*1000 < now { + atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) + gp := netpoll(false) // non-blocking - returns list of goroutines + if gp != nil { + // Need to decrement number of idle locked M's + // (pretending that one more is running) before injectglist. + // Otherwise it can lead to the following situation: + // injectglist grabs all P's but before it starts M's to run the P's, + // another M returns from syscall, finishes running its G, + // observes that there is no work to do and no other running M's + // and reports deadlock. + incidlelocked(-1) + injectglist(gp) + incidlelocked(1) + } + } + // retake P's blocked in syscalls + // and preempt long running G's + if retake(now) != 0 { + idle = 0 + } else { + idle++ + } + // check if we need to force a GC + lastgc := int64(atomic.Load64(&memstats.last_gc)) + if gcphase == _GCoff && lastgc != 0 && unixnow-lastgc > forcegcperiod && atomic.Load(&forcegc.idle) != 0 { + lock(&forcegc.lock) + forcegc.idle = 0 + forcegc.g.schedlink = 0 + injectglist(forcegc.g) + unlock(&forcegc.lock) + } + // scavenge heap once in a while + if lastscavenge+scavengelimit/2 < now { + mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit)) + lastscavenge = now + nscavenge++ + } + if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now { + lasttrace = now + schedtrace(debug.scheddetail > 0) + } + } +} + +var pdesc [_MaxGomaxprocs]struct { + schedtick uint32 + schedwhen int64 + syscalltick uint32 + syscallwhen int64 +} + +// forcePreemptNS is the time slice given to a G before it is +// preempted. +const forcePreemptNS = 10 * 1000 * 1000 // 10ms + +func retake(now int64) uint32 { + n := 0 + for i := int32(0); i < gomaxprocs; i++ { + _p_ := allp[i] + if _p_ == nil { + continue + } + pd := &pdesc[i] + s := _p_.status + if s == _Psyscall { + // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us). + t := int64(_p_.syscalltick) + if int64(pd.syscalltick) != t { + pd.syscalltick = uint32(t) + pd.syscallwhen = now + continue + } + // On the one hand we don't want to retake Ps if there is no other work to do, + // but on the other hand we want to retake them eventually + // because they can prevent the sysmon thread from deep sleep. + if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { + continue + } + // Need to decrement number of idle locked M's + // (pretending that one more is running) before the CAS. + // Otherwise the M from which we retake can exit the syscall, + // increment nmidle and report deadlock. + incidlelocked(-1) + if atomic.Cas(&_p_.status, s, _Pidle) { + if trace.enabled { + traceGoSysBlock(_p_) + traceProcStop(_p_) + } + n++ + _p_.syscalltick++ + handoffp(_p_) + } + incidlelocked(1) + } else if s == _Prunning { + // Preempt G if it's running for too long. + t := int64(_p_.schedtick) + if int64(pd.schedtick) != t { + pd.schedtick = uint32(t) + pd.schedwhen = now + continue + } + if pd.schedwhen+forcePreemptNS > now { + continue + } + preemptone(_p_) + } + } + return uint32(n) +} + +// Tell all goroutines that they have been preempted and they should stop. +// This function is purely best-effort. It can fail to inform a goroutine if a +// processor just started running it. +// No locks need to be held. +// Returns true if preemption request was issued to at least one goroutine. +func preemptall() bool { + res := false + for i := int32(0); i < gomaxprocs; i++ { + _p_ := allp[i] + if _p_ == nil || _p_.status != _Prunning { + continue + } + if preemptone(_p_) { + res = true + } + } + return res +} + +// Tell the goroutine running on processor P to stop. +// This function is purely best-effort. It can incorrectly fail to inform the +// goroutine. It can send inform the wrong goroutine. Even if it informs the +// correct goroutine, that goroutine might ignore the request if it is +// simultaneously executing newstack. +// No lock needs to be held. +// Returns true if preemption request was issued. +// The actual preemption will happen at some point in the future +// and will be indicated by the gp->status no longer being +// Grunning +func preemptone(_p_ *p) bool { + mp := _p_.m.ptr() + if mp == nil || mp == getg().m { + return false + } + gp := mp.curg + if gp == nil || gp == mp.g0 { + return false + } + + gp.preempt = true + + // At this point the gc implementation sets gp.stackguard0 to + // a value that causes the goroutine to suspend itself. + // gccgo has no support for this, and it's hard to support. + // The split stack code reads a value from its TCB. + // We have no way to set a value in the TCB of a different thread. + // And, of course, not all systems support split stack anyhow. + // Checking the field in the g is expensive, since it requires + // loading the g from TLS. The best mechanism is likely to be + // setting a global variable and figuring out a way to efficiently + // check that global variable. + // + // For now we check gp.preempt in schedule and mallocgc, + // which is at least better than doing nothing at all. + + return true +} + var starttime int64 func schedtrace(detailed bool) { @@ -451,8 +1774,6 @@ func schedtrace(detailed bool) { starttime = now } - gomaxprocs := int32(GOMAXPROCS(0)) - lock(&sched.lock) print("SCHED ", (now-starttime)/1e6, "ms: gomaxprocs=", gomaxprocs, " idleprocs=", sched.npidle, " threads=", sched.mcount, " spinningthreads=", sched.nmspinning, " idlethreads=", sched.nmidle, " runqueue=", sched.runqsize) if detailed { @@ -531,3 +1852,416 @@ func schedtrace(detailed bool) { unlock(&allglock) unlock(&sched.lock) } + +// Put mp on midle list. +// Sched must be locked. +// May run during STW, so write barriers are not allowed. +//go:nowritebarrier +func mput(mp *m) { + mp.schedlink = sched.midle + sched.midle.set(mp) + sched.nmidle++ + checkdead() +} + +// Try to get an m from midle list. +// Sched must be locked. +// May run during STW, so write barriers are not allowed. +//go:nowritebarrier +func mget() *m { + mp := sched.midle.ptr() + if mp != nil { + sched.midle = mp.schedlink + sched.nmidle-- + } + return mp +} + +// Put gp on the global runnable queue. +// Sched must be locked. +// May run during STW, so write barriers are not allowed. +//go:nowritebarrier +func globrunqput(gp *g) { + gp.schedlink = 0 + if sched.runqtail != 0 { + sched.runqtail.ptr().schedlink.set(gp) + } else { + sched.runqhead.set(gp) + } + sched.runqtail.set(gp) + sched.runqsize++ +} + +// Put gp at the head of the global runnable queue. +// Sched must be locked. +// May run during STW, so write barriers are not allowed. +//go:nowritebarrier +func globrunqputhead(gp *g) { + gp.schedlink = sched.runqhead + sched.runqhead.set(gp) + if sched.runqtail == 0 { + sched.runqtail.set(gp) + } + sched.runqsize++ +} + +// Put a batch of runnable goroutines on the global runnable queue. +// Sched must be locked. +func globrunqputbatch(ghead *g, gtail *g, n int32) { + gtail.schedlink = 0 + if sched.runqtail != 0 { + sched.runqtail.ptr().schedlink.set(ghead) + } else { + sched.runqhead.set(ghead) + } + sched.runqtail.set(gtail) + sched.runqsize += n +} + +// Try get a batch of G's from the global runnable queue. +// Sched must be locked. +func globrunqget(_p_ *p, max int32) *g { + if sched.runqsize == 0 { + return nil + } + + n := sched.runqsize/gomaxprocs + 1 + if n > sched.runqsize { + n = sched.runqsize + } + if max > 0 && n > max { + n = max + } + if n > int32(len(_p_.runq))/2 { + n = int32(len(_p_.runq)) / 2 + } + + sched.runqsize -= n + if sched.runqsize == 0 { + sched.runqtail = 0 + } + + gp := sched.runqhead.ptr() + sched.runqhead = gp.schedlink + n-- + for ; n > 0; n-- { + gp1 := sched.runqhead.ptr() + sched.runqhead = gp1.schedlink + runqput(_p_, gp1, false) + } + return gp +} + +// Put p to on _Pidle list. +// Sched must be locked. +// May run during STW, so write barriers are not allowed. +//go:nowritebarrier +func pidleput(_p_ *p) { + if !runqempty(_p_) { + throw("pidleput: P has non-empty run queue") + } + _p_.link = sched.pidle + sched.pidle.set(_p_) + atomic.Xadd(&sched.npidle, 1) // TODO: fast atomic +} + +// Try get a p from _Pidle list. +// Sched must be locked. +// May run during STW, so write barriers are not allowed. +//go:nowritebarrier +func pidleget() *p { + _p_ := sched.pidle.ptr() + if _p_ != nil { + sched.pidle = _p_.link + atomic.Xadd(&sched.npidle, -1) // TODO: fast atomic + } + return _p_ +} + +// runqempty returns true if _p_ has no Gs on its local run queue. +// It never returns true spuriously. +func runqempty(_p_ *p) bool { + // Defend against a race where 1) _p_ has G1 in runqnext but runqhead == runqtail, + // 2) runqput on _p_ kicks G1 to the runq, 3) runqget on _p_ empties runqnext. + // Simply observing that runqhead == runqtail and then observing that runqnext == nil + // does not mean the queue is empty. + for { + head := atomic.Load(&_p_.runqhead) + tail := atomic.Load(&_p_.runqtail) + runnext := atomic.Loaduintptr((*uintptr)(unsafe.Pointer(&_p_.runnext))) + if tail == atomic.Load(&_p_.runqtail) { + return head == tail && runnext == 0 + } + } +} + +// To shake out latent assumptions about scheduling order, +// we introduce some randomness into scheduling decisions +// when running with the race detector. +// The need for this was made obvious by changing the +// (deterministic) scheduling order in Go 1.5 and breaking +// many poorly-written tests. +// With the randomness here, as long as the tests pass +// consistently with -race, they shouldn't have latent scheduling +// assumptions. +const randomizeScheduler = raceenabled + +// runqput tries to put g on the local runnable queue. +// If next if false, runqput adds g to the tail of the runnable queue. +// If next is true, runqput puts g in the _p_.runnext slot. +// If the run queue is full, runnext puts g on the global queue. +// Executed only by the owner P. +func runqput(_p_ *p, gp *g, next bool) { + if randomizeScheduler && next && fastrand1()%2 == 0 { + next = false + } + + if next { + retryNext: + oldnext := _p_.runnext + if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) { + goto retryNext + } + if oldnext == 0 { + return + } + // Kick the old runnext out to the regular run queue. + gp = oldnext.ptr() + } + +retry: + h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers + t := _p_.runqtail + if t-h < uint32(len(_p_.runq)) { + _p_.runq[t%uint32(len(_p_.runq))].set(gp) + atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption + return + } + if runqputslow(_p_, gp, h, t) { + return + } + // the queue is not full, now the put above must succeed + goto retry +} + +// Put g and a batch of work from local runnable queue on global queue. +// Executed only by the owner P. +func runqputslow(_p_ *p, gp *g, h, t uint32) bool { + var batch [len(_p_.runq)/2 + 1]*g + + // First, grab a batch from local queue. + n := t - h + n = n / 2 + if n != uint32(len(_p_.runq)/2) { + throw("runqputslow: queue is not full") + } + for i := uint32(0); i < n; i++ { + batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr() + } + if !atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume + return false + } + batch[n] = gp + + if randomizeScheduler { + for i := uint32(1); i <= n; i++ { + j := fastrand1() % (i + 1) + batch[i], batch[j] = batch[j], batch[i] + } + } + + // Link the goroutines. + for i := uint32(0); i < n; i++ { + batch[i].schedlink.set(batch[i+1]) + } + + // Now put the batch on global queue. + lock(&sched.lock) + globrunqputbatch(batch[0], batch[n], int32(n+1)) + unlock(&sched.lock) + return true +} + +// Get g from local runnable queue. +// If inheritTime is true, gp should inherit the remaining time in the +// current time slice. Otherwise, it should start a new time slice. +// Executed only by the owner P. +func runqget(_p_ *p) (gp *g, inheritTime bool) { + // If there's a runnext, it's the next G to run. + for { + next := _p_.runnext + if next == 0 { + break + } + if _p_.runnext.cas(next, 0) { + return next.ptr(), true + } + } + + for { + h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers + t := _p_.runqtail + if t == h { + return nil, false + } + gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() + if atomic.Cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume + return gp, false + } + } +} + +// Grabs a batch of goroutines from _p_'s runnable queue into batch. +// Batch is a ring buffer starting at batchHead. +// Returns number of grabbed goroutines. +// Can be executed by any P. +func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 { + for { + h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers + t := atomic.Load(&_p_.runqtail) // load-acquire, synchronize with the producer + n := t - h + n = n - n/2 + if n == 0 { + if stealRunNextG { + // Try to steal from _p_.runnext. + if next := _p_.runnext; next != 0 { + // Sleep to ensure that _p_ isn't about to run the g we + // are about to steal. + // The important use case here is when the g running on _p_ + // ready()s another g and then almost immediately blocks. + // Instead of stealing runnext in this window, back off + // to give _p_ a chance to schedule runnext. This will avoid + // thrashing gs between different Ps. + // A sync chan send/recv takes ~50ns as of time of writing, + // so 3us gives ~50x overshoot. + if GOOS != "windows" { + usleep(3) + } else { + // On windows system timer granularity is 1-15ms, + // which is way too much for this optimization. + // So just yield. + osyield() + } + if !_p_.runnext.cas(next, 0) { + continue + } + batch[batchHead%uint32(len(batch))] = next + return 1 + } + } + return 0 + } + if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t + continue + } + for i := uint32(0); i < n; i++ { + g := _p_.runq[(h+i)%uint32(len(_p_.runq))] + batch[(batchHead+i)%uint32(len(batch))] = g + } + if atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume + return n + } + } +} + +// Steal half of elements from local runnable queue of p2 +// and put onto local runnable queue of p. +// Returns one of the stolen elements (or nil if failed). +func runqsteal(_p_, p2 *p, stealRunNextG bool) *g { + t := _p_.runqtail + n := runqgrab(p2, &_p_.runq, t, stealRunNextG) + if n == 0 { + return nil + } + n-- + gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr() + if n == 0 { + return gp + } + h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers + if t-h+n >= uint32(len(_p_.runq)) { + throw("runqsteal: runq overflow") + } + atomic.Store(&_p_.runqtail, t+n) // store-release, makes the item available for consumption + return gp +} + +// Active spinning for sync.Mutex. +//go:linkname sync_runtime_canSpin sync.runtime_canSpin +//go:nosplit +func sync_runtime_canSpin(i int) bool { + // sync.Mutex is cooperative, so we are conservative with spinning. + // Spin only few times and only if running on a multicore machine and + // GOMAXPROCS>1 and there is at least one other running P and local runq is empty. + // As opposed to runtime mutex we don't do passive spinning here, + // because there can be work on global runq on on other Ps. + if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { + return false + } + if p := getg().m.p.ptr(); !runqempty(p) { + return false + } + return true +} + +//go:linkname sync_runtime_doSpin sync.runtime_doSpin +//go:nosplit +func sync_runtime_doSpin() { + procyield(active_spin_cnt) +} + +var stealOrder randomOrder + +// randomOrder/randomEnum are helper types for randomized work stealing. +// They allow to enumerate all Ps in different pseudo-random orders without repetitions. +// The algorithm is based on the fact that if we have X such that X and GOMAXPROCS +// are coprime, then a sequences of (i + X) % GOMAXPROCS gives the required enumeration. +type randomOrder struct { + count uint32 + coprimes []uint32 +} + +type randomEnum struct { + i uint32 + count uint32 + pos uint32 + inc uint32 +} + +func (ord *randomOrder) reset(count uint32) { + ord.count = count + ord.coprimes = ord.coprimes[:0] + for i := uint32(1); i <= count; i++ { + if gcd(i, count) == 1 { + ord.coprimes = append(ord.coprimes, i) + } + } +} + +func (ord *randomOrder) start(i uint32) randomEnum { + return randomEnum{ + count: ord.count, + pos: i % ord.count, + inc: ord.coprimes[i%uint32(len(ord.coprimes))], + } +} + +func (enum *randomEnum) done() bool { + return enum.i == enum.count +} + +func (enum *randomEnum) next() { + enum.i++ + enum.pos = (enum.pos + enum.inc) % enum.count +} + +func (enum *randomEnum) position() uint32 { + return enum.pos +} + +func gcd(a, b uint32) uint32 { + for b != 0 { + a, b = b, a%b + } + return a +} diff --git a/libgo/go/runtime/proc_test.go b/libgo/go/runtime/proc_test.go index cc390174a74..813c92912b9 100644 --- a/libgo/go/runtime/proc_test.go +++ b/libgo/go/runtime/proc_test.go @@ -556,19 +556,14 @@ func nonleaf(stop chan int) bool { } } -/* func TestSchedLocalQueue(t *testing.T) { - runtime.TestSchedLocalQueue1() + runtime.RunSchedLocalQueueTest() } -*/ -/* func TestSchedLocalQueueSteal(t *testing.T) { - runtime.TestSchedLocalQueueSteal1() + runtime.RunSchedLocalQueueStealTest() } -*/ -/* func TestSchedLocalQueueEmpty(t *testing.T) { if runtime.NumCPU() == 1 { // Takes too long and does not trigger the race. @@ -586,7 +581,6 @@ func TestSchedLocalQueueEmpty(t *testing.T) { } runtime.RunSchedLocalQueueEmptyTest(iters) } -*/ func benchmarkStackGrowth(b *testing.B, rec int) { b.RunParallel(func(pb *testing.PB) { diff --git a/libgo/go/runtime/runtime2.go b/libgo/go/runtime/runtime2.go index 571972c1a83..755bc5f6380 100644 --- a/libgo/go/runtime/runtime2.go +++ b/libgo/go/runtime/runtime2.go @@ -5,6 +5,7 @@ package runtime import ( + "runtime/internal/atomic" "runtime/internal/sys" "unsafe" ) @@ -203,12 +204,10 @@ func (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) } //go:nosplit func (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) } -/* //go:nosplit func (gp *guintptr) cas(old, new guintptr) bool { return atomic.Casuintptr((*uintptr)(unsafe.Pointer(gp)), uintptr(old), uintptr(new)) } -*/ type puintptr uintptr @@ -358,8 +357,8 @@ type g struct { sigpc uintptr gopc uintptr // pc of go statement that created this goroutine startpc uintptr // pc of goroutine function - racectx uintptr - waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order + // Not for gccgo: racectx uintptr + waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order // Not for gccgo: cgoCtxt []uintptr // cgo traceback context // Per-G GC state @@ -521,16 +520,16 @@ type p struct { gfreecnt int32 sudogcache []*sudog - // Not for gccgo for now: sudogbuf [128]*sudog + sudogbuf [128]*sudog - // Not for gccgo for now: tracebuf traceBufPtr + tracebuf traceBufPtr // Not for gccgo for now: palloc persistentAlloc // per-P to avoid mutex // Per-P GC state - // Not for gccgo for now: gcAssistTime int64 // Nanoseconds in assistAlloc - // Not for gccgo for now: gcBgMarkWorker guintptr - // Not for gccgo for now: gcMarkWorkerMode gcMarkWorkerMode + gcAssistTime int64 // Nanoseconds in assistAlloc + gcBgMarkWorker guintptr + gcMarkWorkerMode gcMarkWorkerMode // gcw is this P's GC work buffer cache. The work buffer is // filled by write barriers, drained by mutator assists, and @@ -760,18 +759,13 @@ var ( // allm *m - allp [_MaxGomaxprocs + 1]*p - - // gomaxprocs int32 - - panicking uint32 - ncpu int32 - - // forcegc forcegcstate - - sched schedt - - // newprocs int32 + allp [_MaxGomaxprocs + 1]*p + gomaxprocs int32 + panicking uint32 + ncpu int32 + forcegc forcegcstate + sched schedt + newprocs int32 // Information about what cpu features are available. // Set on startup. diff --git a/libgo/go/runtime/stubs.go b/libgo/go/runtime/stubs.go index 3d184083d55..6787476456b 100644 --- a/libgo/go/runtime/stubs.go +++ b/libgo/go/runtime/stubs.go @@ -304,6 +304,7 @@ const ( _64bit = 1 << (^uintptr(0) >> 63) / 2 _MHeapMap_TotalBits = (_64bit*sys.GoosWindows)*35 + (_64bit*(1-sys.GoosWindows)*(1-sys.GoosDarwin*sys.GoarchArm64))*39 + sys.GoosDarwin*sys.GoarchArm64*31 + (1-_64bit)*32 _MaxMem = uintptr(1<<_MHeapMap_TotalBits - 1) + _MaxGcproc = 32 ) // Here for gccgo until we port malloc.go. @@ -350,7 +351,6 @@ func entersyscallblock(int32) func exitsyscall(int32) func gopark(func(*g, unsafe.Pointer) bool, unsafe.Pointer, string, byte, int) func goparkunlock(*mutex, string, byte, int) -func goready(*g, int) // Temporary hack for gccgo until we port proc.go. //go:nosplit @@ -411,12 +411,6 @@ func roundupsize(uintptr) uintptr // Here for gccgo until we port mgc.go. func GC() -// Here for gccgo until we port proc.go. -var worldsema uint32 = 1 - -func stopTheWorldWithSema() -func startTheWorldWithSema() - // For gccgo to call from C code. //go:linkname acquireWorldsema runtime.acquireWorldsema func acquireWorldsema() { @@ -429,26 +423,6 @@ func releaseWorldsema() { semrelease(&worldsema) } -// Here for gccgo until we port proc.go. -func stopTheWorld(reason string) { - semacquire(&worldsema, false) - getg().m.preemptoff = reason - getg().m.gcing = 1 - systemstack(stopTheWorldWithSema) -} - -// Here for gccgo until we port proc.go. -func startTheWorld() { - getg().m.gcing = 0 - getg().m.locks++ - systemstack(startTheWorldWithSema) - // worldsema must be held over startTheWorldWithSema to ensure - // gomaxprocs cannot change while worldsema is held. - semrelease(&worldsema) - getg().m.preemptoff = "" - getg().m.locks-- -} - // For gccgo to call from C code, so that the C code and the Go code // can share the memstats variable for now. //go:linkname getMstats runtime.getMstats @@ -461,6 +435,7 @@ func setcpuprofilerate_m(hz int32) // Temporary for gccgo until we port mem_GOOS.go. func sysAlloc(n uintptr, sysStat *uint64) unsafe.Pointer +func sysFree(v unsafe.Pointer, n uintptr, sysStat *uint64) // Temporary for gccgo until we port proc.go, so that the C signal // handler can call into cpuprof. @@ -522,7 +497,6 @@ func getZerobase() *uintptr { func sigprof() func mcount() int32 func goexit1() -func freezetheworld() // Get signal trampoline, written in C. func getSigtramp() uintptr @@ -592,6 +566,7 @@ func getPanicking() uint32 { // Temporary for gccgo until we port mcache.go. func allocmcache() *mcache +func freemcache(*mcache) // Temporary for gccgo until we port mgc.go. // This is just so that allgadd will compile. @@ -616,3 +591,60 @@ func gcount() int32 { unlock(&allglock) return n } + +// Temporary for gccgo until we port mgc.go. +var gcBlackenEnabled uint32 + +// Temporary for gccgo until we port mgc.go. +func gcMarkWorkAvailable(p *p) bool { + return false +} + +// Temporary for gccgo until we port mgc.go. +var gcController gcControllerState + +// Temporary for gccgo until we port mgc.go. +type gcControllerState struct { +} + +// Temporary for gccgo until we port mgc.go. +func (c *gcControllerState) findRunnableGCWorker(_p_ *p) *g { + return nil +} + +// Temporary for gccgo until we port mgc.go. +var gcphase uint32 + +// Temporary for gccgo until we port mgc.go. +const ( + _GCoff = iota + _GCmark + _GCmarktermination +) + +// Temporary for gccgo until we port mgc.go. +type gcMarkWorkerMode int + +// Temporary for gccgo until we port mgc.go. +const ( + gcMarkWorkerDedicatedMode gcMarkWorkerMode = iota + gcMarkWorkerFractionalMode + gcMarkWorkerIdleMode +) + +// Temporary for gccgo until we port mheap.go. +type mheap struct { +} + +// Temporary for gccgo until we port mheap.go. +var mheap_ mheap + +// Temporary for gccgo until we port mheap.go. +func (h *mheap) scavenge(k int32, now, limit uint64) { +} + +// Temporary for gccgo until we initialize ncpu in Go. +//go:linkname setncpu runtime.setncpu +func setncpu(n int32) { + ncpu = n +} diff --git a/libgo/go/runtime/trace.go b/libgo/go/runtime/trace.go index 35126f19a29..09a150f6e63 100644 --- a/libgo/go/runtime/trace.go +++ b/libgo/go/runtime/trace.go @@ -127,10 +127,10 @@ var trace struct { // traceBufHeader is per-P tracing buffer. type traceBufHeader struct { - link traceBufPtr // in trace.empty/full - lastTicks uint64 // when we wrote the last event - pos int // next write offset in arr - stk [traceStackSize]uintptr // scratch buffer for traceback + link traceBufPtr // in trace.empty/full + lastTicks uint64 // when we wrote the last event + pos int // next write offset in arr + stk [traceStackSize]location // scratch buffer for traceback } // traceBuf is per-P tracing buffer. @@ -152,9 +152,6 @@ func traceBufPtrOf(b *traceBuf) traceBufPtr { return traceBufPtr(unsafe.Pointer(b)) } -/* -Commented out for gccgo for now. - // StartTrace enables tracing for the current process. // While tracing, the data will be buffered and available via ReadTrace. // StartTrace returns an error if tracing is already enabled. @@ -522,13 +519,7 @@ func traceEvent(ev byte, skip int, args ...uint64) { if gp == _g_ { nstk = callers(skip, buf.stk[:]) } else if gp != nil { - gp = mp.curg - // This may happen when tracing a system call, - // so we must lock the stack. - if gcTryLockStackBarriers(gp) { - nstk = gcallers(gp, skip, buf.stk[:]) - gcUnlockStackBarriers(gp) - } + // FIXME: get stack trace of different goroutine. } if nstk > 0 { nstk-- // skip runtime.goexit @@ -647,8 +638,6 @@ func (buf *traceBuf) byte(v byte) { buf.pos++ } -*/ - // traceStackTable maps stack traces (arrays of PC's) to unique uint32 ids. // It is lock-free for reading. type traceStackTable struct { @@ -664,28 +653,30 @@ type traceStack struct { hash uintptr id uint32 n int - stk [0]uintptr // real type [n]uintptr + stk [0]location // real type [n]location } type traceStackPtr uintptr -/* -Commented out for gccgo for now. - func (tp traceStackPtr) ptr() *traceStack { return (*traceStack)(unsafe.Pointer(tp)) } // stack returns slice of PCs. -func (ts *traceStack) stack() []uintptr { - return (*[traceStackSize]uintptr)(unsafe.Pointer(&ts.stk))[:ts.n] +func (ts *traceStack) stack() []location { + return (*[traceStackSize]location)(unsafe.Pointer(&ts.stk))[:ts.n] } // put returns a unique id for the stack trace pcs and caches it in the table, // if it sees the trace for the first time. -func (tab *traceStackTable) put(pcs []uintptr) uint32 { +func (tab *traceStackTable) put(pcs []location) uint32 { if len(pcs) == 0 { return 0 } - hash := memhash(unsafe.Pointer(&pcs[0]), 0, uintptr(len(pcs))*unsafe.Sizeof(pcs[0])) + var hash uintptr + for _, loc := range pcs { + hash += loc.pc + hash += hash << 10 + hash ^= hash >> 6 + } // First, search the hashtable w/o the mutex. if id := tab.find(pcs, hash); id != 0 { return id @@ -714,7 +705,7 @@ func (tab *traceStackTable) put(pcs []uintptr) uint32 { } // find checks if the stack trace pcs is already present in the table. -func (tab *traceStackTable) find(pcs []uintptr, hash uintptr) uint32 { +func (tab *traceStackTable) find(pcs []location, hash uintptr) uint32 { part := int(hash % uintptr(len(tab.tab))) Search: for stk := tab.tab[part].ptr(); stk != nil; stk = stk.link.ptr() { @@ -732,13 +723,12 @@ Search: // newStack allocates a new stack of size n. func (tab *traceStackTable) newStack(n int) *traceStack { - return (*traceStack)(tab.mem.alloc(unsafe.Sizeof(traceStack{}) + uintptr(n)*sys.PtrSize)) + return (*traceStack)(tab.mem.alloc(unsafe.Sizeof(traceStack{}) + uintptr(n)*unsafe.Sizeof(location{}))) } // dump writes all previously cached stacks to trace buffers, // releases all memory and resets state. func (tab *traceStackTable) dump() { - frames := make(map[uintptr]traceFrame) var tmp [(2 + 4*traceStackSize) * traceBytesPerNumber]byte buf := traceFlush(0).ptr() for _, stk := range tab.tab { @@ -749,8 +739,8 @@ func (tab *traceStackTable) dump() { tmpbuf = traceAppend(tmpbuf, uint64(stk.n)) for _, pc := range stk.stack() { var frame traceFrame - frame, buf = traceFrameForPC(buf, frames, pc) - tmpbuf = traceAppend(tmpbuf, uint64(pc)) + frame, buf = traceFrameForPC(buf, pc) + tmpbuf = traceAppend(tmpbuf, uint64(pc.pc)) tmpbuf = traceAppend(tmpbuf, uint64(frame.funcID)) tmpbuf = traceAppend(tmpbuf, uint64(frame.fileID)) tmpbuf = traceAppend(tmpbuf, uint64(frame.line)) @@ -780,25 +770,15 @@ type traceFrame struct { line uint64 } -func traceFrameForPC(buf *traceBuf, frames map[uintptr]traceFrame, pc uintptr) (traceFrame, *traceBuf) { - if frame, ok := frames[pc]; ok { - return frame, buf - } - +func traceFrameForPC(buf *traceBuf, loc location) (traceFrame, *traceBuf) { var frame traceFrame - f := findfunc(pc) - if f == nil { - frames[pc] = frame - return frame, buf - } - - fn := funcname(f) + fn := loc.function const maxLen = 1 << 10 if len(fn) > maxLen { fn = fn[len(fn)-maxLen:] } frame.funcID, buf = traceString(buf, fn) - file, line := funcline(f, pc-sys.PCQuantum) + file, line := loc.filename, loc.lineno frame.line = uint64(line) if len(file) > maxLen { file = file[len(file)-maxLen:] @@ -807,8 +787,6 @@ func traceFrameForPC(buf *traceBuf, frames map[uintptr]traceFrame, pc uintptr) ( return frame, buf } -*/ - // traceAlloc is a non-thread-safe region allocator. // It holds a linked list of traceAllocBlock. type traceAlloc struct { @@ -831,9 +809,6 @@ type traceAllocBlockPtr uintptr func (p traceAllocBlockPtr) ptr() *traceAllocBlock { return (*traceAllocBlock)(unsafe.Pointer(p)) } func (p *traceAllocBlockPtr) set(x *traceAllocBlock) { *p = traceAllocBlockPtr(unsafe.Pointer(x)) } -/* -Commented out for gccgo for now. - // alloc allocates n-byte block. func (a *traceAlloc) alloc(n uintptr) unsafe.Pointer { n = round(n, sys.PtrSize) @@ -841,6 +816,8 @@ func (a *traceAlloc) alloc(n uintptr) unsafe.Pointer { if n > uintptr(len(a.head.ptr().data)) { throw("trace: alloc too large") } + // This is only safe because the strings returned by callers + // are stored in a location that is not in the Go heap. block := (*traceAllocBlock)(sysAlloc(unsafe.Sizeof(traceAllocBlock{}), &memstats.other_sys)) if block == nil { throw("trace: out of memory") @@ -913,7 +890,7 @@ func traceGoCreate(newg *g, pc uintptr) { newg.traceseq = 0 newg.tracelastp = getg().m.p // +PCQuantum because traceFrameForPC expects return PCs and subtracts PCQuantum. - id := trace.stackTab.put([]uintptr{pc + sys.PCQuantum}) + id := trace.stackTab.put([]location{location{pc: pc + sys.PCQuantum}}) traceEvent(traceEvGoCreate, 2, uint64(newg.goid), uint64(id)) } @@ -1004,5 +981,3 @@ func traceHeapAlloc() { func traceNextGC() { traceEvent(traceEvNextGC, -1, memstats.next_gc) } - -*/ diff --git a/libgo/runtime/heapdump.c b/libgo/runtime/heapdump.c index c050541db9d..c9d50023770 100644 --- a/libgo/runtime/heapdump.c +++ b/libgo/runtime/heapdump.c @@ -618,8 +618,7 @@ runtime_debug_WriteHeapDump(uintptr fd) // Stop the world. runtime_acquireWorldsema(); m = runtime_m(); - m->gcing = 1; - m->locks++; + m->preemptoff = runtime_gostringnocopy((const byte*)"write heap dump"); runtime_stopTheWorldWithSema(); // Update stats so we can dump them. @@ -640,10 +639,9 @@ runtime_debug_WriteHeapDump(uintptr fd) dumpfd = 0; // Start up the world again. - m->gcing = 0; - runtime_releaseWorldsema(); runtime_startTheWorldWithSema(); - m->locks--; + runtime_releaseWorldsema(); + m->preemptoff = runtime_gostringnocopy(nil); } // Runs the specified gc program. Calls the callback for every diff --git a/libgo/runtime/malloc.goc b/libgo/runtime/malloc.goc index 1e6704c6290..987431219ca 100644 --- a/libgo/runtime/malloc.goc +++ b/libgo/runtime/malloc.goc @@ -99,7 +99,8 @@ runtime_mallocgc(uintptr size, uintptr typ, uint32 flag) flag |= FlagNoInvokeGC; } - if(runtime_gcwaiting() && g != m->g0 && m->locks == 0 && !(flag & FlagNoInvokeGC) && m->preemptoff.len == 0) { + if((g->preempt || runtime_gcwaiting()) && g != m->g0 && m->locks == 0 && !(flag & FlagNoInvokeGC) && m->preemptoff.len == 0) { + g->preempt = false; runtime_gosched(); m = runtime_m(); } diff --git a/libgo/runtime/malloc.h b/libgo/runtime/malloc.h index f13d5b3a99e..00e4166d812 100644 --- a/libgo/runtime/malloc.h +++ b/libgo/runtime/malloc.h @@ -132,12 +132,6 @@ enum #else MHeapMap_Bits = 32 - PageShift, #endif - - // Max number of threads to run garbage collection. - // 2, 3, and 4 are all plausible maximums depending - // on the hardware details of the machine. The garbage - // collector scales well to 8 cpus. - MaxGcproc = 8, }; // Maximum memory allocation size, a hint for callers. @@ -186,7 +180,8 @@ enum void* runtime_SysAlloc(uintptr nbytes, uint64 *stat) __asm__ (GOSYM_PREFIX "runtime.sysAlloc"); -void runtime_SysFree(void *v, uintptr nbytes, uint64 *stat); +void runtime_SysFree(void *v, uintptr nbytes, uint64 *stat) + __asm__ (GOSYM_PREFIX "runtime.sysFree"); void runtime_SysUnused(void *v, uintptr nbytes); void runtime_SysUsed(void *v, uintptr nbytes); void runtime_SysMap(void *v, uintptr nbytes, bool reserved, uint64 *stat); @@ -467,11 +462,15 @@ void runtime_MProf_GC(void) __asm__ (GOSYM_PREFIX "runtime.mProf_GC"); void runtime_iterate_memprof(FuncVal* callback) __asm__ (GOSYM_PREFIX "runtime.iterate_memprof"); -int32 runtime_gcprocs(void); -void runtime_helpgc(int32 nproc); -void runtime_gchelper(void); +int32 runtime_gcprocs(void) + __asm__ (GOSYM_PREFIX "runtime.gcprocs"); +void runtime_helpgc(int32 nproc) + __asm__ (GOSYM_PREFIX "runtime.helpgc"); +void runtime_gchelper(void) + __asm__ (GOSYM_PREFIX "runtime.gchelper"); void runtime_createfing(void); -G* runtime_wakefing(void); +G* runtime_wakefing(void) + __asm__ (GOSYM_PREFIX "runtime.wakefing"); extern bool runtime_fingwait; extern bool runtime_fingwake; diff --git a/libgo/runtime/mgc0.c b/libgo/runtime/mgc0.c index 5d6275a6357..156db0f1f92 100644 --- a/libgo/runtime/mgc0.c +++ b/libgo/runtime/mgc0.c @@ -7,7 +7,7 @@ // GC is: // - mark&sweep // - mostly precise (with the exception of some C-allocated objects, assembly frames/arguments, etc) -// - parallel (up to MaxGcproc threads) +// - parallel (up to _MaxGcproc threads) // - partially concurrent (mark is stop-the-world, while sweep is concurrent) // - non-moving/non-compacting // - full (non-partial) @@ -389,7 +389,7 @@ struct BufferList uint32 busy; byte pad[CacheLineSize]; }; -static BufferList bufferList[MaxGcproc]; +static BufferList bufferList[_MaxGcproc]; static void enqueue(Obj obj, Workbuf **_wbuf, Obj **_wp, uintptr *_nobj); @@ -2228,7 +2228,7 @@ gc(struct gc_args *args) m->locks++; // disable gc during mallocs in parforalloc if(work.markfor == nil) - work.markfor = runtime_parforalloc(MaxGcproc); + work.markfor = runtime_parforalloc(_MaxGcproc); m->locks--; tm1 = 0; @@ -2355,7 +2355,7 @@ gc(struct gc_args *args) sweep.g = __go_go(bgsweep, nil); else if(sweep.parked) { sweep.parked = false; - runtime_ready(sweep.g); + runtime_ready(sweep.g, 0, true); } runtime_unlock(&gclock); } else { @@ -2429,7 +2429,7 @@ gchelperstart(void) M *m; m = runtime_m(); - if(m->helpgc < 0 || m->helpgc >= MaxGcproc) + if(m->helpgc < 0 || m->helpgc >= _MaxGcproc) runtime_throw("gchelperstart: bad m->helpgc"); if(runtime_xchg(&bufferList[m->helpgc].busy, 1)) runtime_throw("gchelperstart: already busy"); @@ -2541,6 +2541,20 @@ runtime_createfing(void) runtime_unlock(&gclock); } +bool getfingwait() __asm__(GOSYM_PREFIX "runtime.getfingwait"); +bool +getfingwait() +{ + return runtime_fingwait; +} + +bool getfingwake() __asm__(GOSYM_PREFIX "runtime.getfingwake"); +bool +getfingwake() +{ + return runtime_fingwake; +} + G* runtime_wakefing(void) { diff --git a/libgo/runtime/proc.c b/libgo/runtime/proc.c index 8a7a2d76ae6..c4a52839ea8 100644 --- a/libgo/runtime/proc.c +++ b/libgo/runtime/proc.c @@ -365,9 +365,14 @@ extern P** runtime_getAllP() __asm__ (GOSYM_PREFIX "runtime.getAllP"); extern G* allocg(void) __asm__ (GOSYM_PREFIX "runtime.allocg"); +extern bool needaddgcproc(void) + __asm__ (GOSYM_PREFIX "runtime.needaddgcproc"); +extern void startm(P*, bool) + __asm__(GOSYM_PREFIX "runtime.startm"); +extern void newm(void(*)(void), P*) + __asm__(GOSYM_PREFIX "runtime.newm"); Sched* runtime_sched; -int32 runtime_gomaxprocs; M runtime_m0; G runtime_g0; // idle goroutine for m0 G* runtime_lastg; @@ -376,51 +381,58 @@ P** runtime_allp; int8* runtime_goos; int32 runtime_ncpu; bool runtime_precisestack; -static int32 newprocs; bool runtime_isarchive; void* runtime_mstart(void*); -static void runqput(P*, G*); -static G* runqget(P*); -static bool runqputslow(P*, G*, uint32, uint32); -static G* runqsteal(P*, P*); -static void mput(M*); -static M* mget(void); static void mcommoninit(M*); -static void schedule(void); -static void procresize(int32); -static void acquirep(P*); -static P* releasep(void); -static void newm(void(*)(void), P*); -static void stopm(void); -static void startm(P*, bool); -static void handoffp(P*); -static void wakep(void); -static void stoplockedm(void); -static void startlockedm(G*); -static void sysmon(void); -static uint32 retake(int64); -static void incidlelocked(int32); static void exitsyscall0(G*); static void park0(G*); static void goexit0(G*); static void gfput(P*, G*); static G* gfget(P*); -static void gfpurge(P*); -static void globrunqput(G*); -static void globrunqputbatch(G*, G*, int32); -static G* globrunqget(P*, int32); -static P* pidleget(void); -static void pidleput(P*); -static void injectglist(G*); -static bool preemptall(void); static bool exitsyscallfast(void); -void allgadd(G*) +extern void setncpu(int32) + __asm__(GOSYM_PREFIX "runtime.setncpu"); +extern void allgadd(G*) __asm__(GOSYM_PREFIX "runtime.allgadd"); -void checkdead(void) +extern void stopm(void) + __asm__(GOSYM_PREFIX "runtime.stopm"); +extern void handoffp(P*) + __asm__(GOSYM_PREFIX "runtime.handoffp"); +extern void wakep(void) + __asm__(GOSYM_PREFIX "runtime.wakep"); +extern void stoplockedm(void) + __asm__(GOSYM_PREFIX "runtime.stoplockedm"); +extern void schedule(void) + __asm__(GOSYM_PREFIX "runtime.schedule"); +extern void execute(G*, bool) + __asm__(GOSYM_PREFIX "runtime.execute"); +extern void procresize(int32) + __asm__(GOSYM_PREFIX "runtime.procresize"); +extern void acquirep(P*) + __asm__(GOSYM_PREFIX "runtime.acquirep"); +extern P* releasep(void) + __asm__(GOSYM_PREFIX "runtime.releasep"); +extern void incidlelocked(int32) + __asm__(GOSYM_PREFIX "runtime.incidlelocked"); +extern void checkdead(void) __asm__(GOSYM_PREFIX "runtime.checkdead"); +extern void sysmon(void) + __asm__(GOSYM_PREFIX "runtime.sysmon"); +extern void mput(M*) + __asm__(GOSYM_PREFIX "runtime.mput"); +extern M* mget(void) + __asm__(GOSYM_PREFIX "runtime.mget"); +extern void globrunqput(G*) + __asm__(GOSYM_PREFIX "runtime.globrunqput"); +extern P* pidleget(void) + __asm__(GOSYM_PREFIX "runtime.pidleget"); +extern bool runqempty(P*) + __asm__(GOSYM_PREFIX "runtime.runqempty"); +extern void runqput(P*, G*, bool) + __asm__(GOSYM_PREFIX "runtime.runqput"); bool runtime_isstarted; @@ -441,6 +453,7 @@ runtime_schedinit(void) const byte *p; Eface i; + setncpu(runtime_ncpu); runtime_sched = runtime_getsched(); m = &runtime_m0; @@ -660,234 +673,6 @@ mcommoninit(M *mp) runtime_unlock(&runtime_sched->lock); } -// Mark gp ready to run. -void -runtime_ready(G *gp) -{ - // Mark runnable. - g->m->locks++; // disable preemption because it can be holding p in a local var - if(gp->atomicstatus != _Gwaiting) { - runtime_printf("goroutine %D has status %d\n", gp->goid, gp->atomicstatus); - runtime_throw("bad g->atomicstatus in ready"); - } - gp->atomicstatus = _Grunnable; - runqput((P*)g->m->p, gp); - if(runtime_atomicload(&runtime_sched->npidle) != 0 && runtime_atomicload(&runtime_sched->nmspinning) == 0) // TODO: fast atomic - wakep(); - g->m->locks--; -} - -void goready(G*, int) __asm__ (GOSYM_PREFIX "runtime.goready"); - -void -goready(G* gp, int traceskip __attribute__ ((unused))) -{ - runtime_ready(gp); -} - -int32 -runtime_gcprocs(void) -{ - int32 n; - - // Figure out how many CPUs to use during GC. - // Limited by gomaxprocs, number of actual CPUs, and MaxGcproc. - runtime_lock(&runtime_sched->lock); - n = runtime_gomaxprocs; - if(n > runtime_ncpu) - n = runtime_ncpu > 0 ? runtime_ncpu : 1; - if(n > MaxGcproc) - n = MaxGcproc; - if(n > runtime_sched->nmidle+1) // one M is currently running - n = runtime_sched->nmidle+1; - runtime_unlock(&runtime_sched->lock); - return n; -} - -static bool -needaddgcproc(void) -{ - int32 n; - - runtime_lock(&runtime_sched->lock); - n = runtime_gomaxprocs; - if(n > runtime_ncpu) - n = runtime_ncpu; - if(n > MaxGcproc) - n = MaxGcproc; - n -= runtime_sched->nmidle+1; // one M is currently running - runtime_unlock(&runtime_sched->lock); - return n > 0; -} - -void -runtime_helpgc(int32 nproc) -{ - M *mp; - int32 n, pos; - - runtime_lock(&runtime_sched->lock); - pos = 0; - for(n = 1; n < nproc; n++) { // one M is currently running - if(runtime_allp[pos]->mcache == g->m->mcache) - pos++; - mp = mget(); - if(mp == nil) - runtime_throw("runtime_gcprocs inconsistency"); - mp->helpgc = n; - mp->mcache = runtime_allp[pos]->mcache; - pos++; - runtime_notewakeup(&mp->park); - } - runtime_unlock(&runtime_sched->lock); -} - -// Similar to stoptheworld but best-effort and can be called several times. -// There is no reverse operation, used during crashing. -// This function must not lock any mutexes. -void -runtime_freezetheworld(void) -{ - int32 i; - - if(runtime_gomaxprocs == 1) - return; - // stopwait and preemption requests can be lost - // due to races with concurrently executing threads, - // so try several times - for(i = 0; i < 5; i++) { - // this should tell the scheduler to not start any new goroutines - runtime_sched->stopwait = 0x7fffffff; - runtime_atomicstore((uint32*)&runtime_sched->gcwaiting, 1); - // this should stop running goroutines - if(!preemptall()) - break; // no running goroutines - runtime_usleep(1000); - } - // to be sure - runtime_usleep(1000); - preemptall(); - runtime_usleep(1000); -} - -void -runtime_stopTheWorldWithSema(void) -{ - int32 i; - uint32 s; - P *p; - bool wait; - - runtime_lock(&runtime_sched->lock); - runtime_sched->stopwait = runtime_gomaxprocs; - runtime_atomicstore((uint32*)&runtime_sched->gcwaiting, 1); - preemptall(); - // stop current P - ((P*)g->m->p)->status = _Pgcstop; - runtime_sched->stopwait--; - // try to retake all P's in _Psyscall status - for(i = 0; i < runtime_gomaxprocs; i++) { - p = runtime_allp[i]; - s = p->status; - if(s == _Psyscall && runtime_cas(&p->status, s, _Pgcstop)) - runtime_sched->stopwait--; - } - // stop idle P's - while((p = pidleget()) != nil) { - p->status = _Pgcstop; - runtime_sched->stopwait--; - } - wait = runtime_sched->stopwait > 0; - runtime_unlock(&runtime_sched->lock); - - // wait for remaining P's to stop voluntarily - if(wait) { - runtime_notesleep(&runtime_sched->stopnote); - runtime_noteclear(&runtime_sched->stopnote); - } - if(runtime_sched->stopwait) - runtime_throw("stoptheworld: not stopped"); - for(i = 0; i < runtime_gomaxprocs; i++) { - p = runtime_allp[i]; - if(p->status != _Pgcstop) - runtime_throw("stoptheworld: not stopped"); - } -} - -static void -mhelpgc(void) -{ - g->m->helpgc = -1; -} - -void -runtime_startTheWorldWithSema(void) -{ - P *p, *p1; - M *mp; - G *gp; - bool add; - - g->m->locks++; // disable preemption because it can be holding p in a local var - gp = runtime_netpoll(false); // non-blocking - injectglist(gp); - add = needaddgcproc(); - runtime_lock(&runtime_sched->lock); - if(newprocs) { - procresize(newprocs); - newprocs = 0; - } else - procresize(runtime_gomaxprocs); - runtime_sched->gcwaiting = 0; - - p1 = nil; - while((p = pidleget()) != nil) { - // procresize() puts p's with work at the beginning of the list. - // Once we reach a p without a run queue, the rest don't have one either. - if(p->runqhead == p->runqtail) { - pidleput(p); - break; - } - p->m = (uintptr)mget(); - p->link = (uintptr)p1; - p1 = p; - } - if(runtime_sched->sysmonwait) { - runtime_sched->sysmonwait = false; - runtime_notewakeup(&runtime_sched->sysmonnote); - } - runtime_unlock(&runtime_sched->lock); - - while(p1) { - p = p1; - p1 = (P*)p1->link; - if(p->m) { - mp = (M*)p->m; - p->m = 0; - if(mp->nextp) - runtime_throw("startTheWorldWithSema: inconsistent mp->nextp"); - mp->nextp = (uintptr)p; - runtime_notewakeup(&mp->park); - } else { - // Start M to run P. Do not start another M below. - newm(nil, p); - add = false; - } - } - - if(add) { - // If GC could have used another helper proc, start one now, - // in the hope that it will be available next time. - // It would have been even better to start it before the collection, - // but doing so requires allocating memory, so it's tricky to - // coordinate. This lazy approach works out in practice: - // we don't mind if the first couple gc rounds don't have quite - // the maximum number of procs. - newm(mhelpgc, nil); - } - g->m->locks--; -} - // Called to start an M. void* runtime_mstart(void* mp) @@ -1055,7 +840,7 @@ makeGContext(G* gp, byte* sp, uintptr spsize) { } // Create a new m. It will start off with a call to fn, or else the scheduler. -static void +void newm(void(*fn)(void), P *p) { M *mp; @@ -1067,40 +852,6 @@ newm(void(*fn)(void), P *p) runtime_newosproc(mp); } -// Stops execution of the current m until new work is available. -// Returns with acquired P. -static void -stopm(void) -{ - M* m; - - m = g->m; - if(m->locks) - runtime_throw("stopm holding locks"); - if(m->p) - runtime_throw("stopm holding p"); - if(m->spinning) { - m->spinning = false; - runtime_xadd(&runtime_sched->nmspinning, -1); - } - -retry: - runtime_lock(&runtime_sched->lock); - mput(m); - runtime_unlock(&runtime_sched->lock); - runtime_notesleep(&m->park); - m = g->m; - runtime_noteclear(&m->park); - if(m->helpgc) { - runtime_gchelper(); - m->helpgc = 0; - m->mcache = nil; - goto retry; - } - acquirep((P*)m->nextp); - m->nextp = 0; -} - static void mspinning(void) { @@ -1109,7 +860,7 @@ mspinning(void) // Schedules some M to run the p (creates an M if necessary). // If p==nil, tries to get an idle P, if no idle P's does nothing. -static void +void startm(P *p, bool spinning) { M *mp; @@ -1138,361 +889,12 @@ startm(P *p, bool spinning) runtime_throw("startm: m is spinning"); if(mp->nextp) runtime_throw("startm: m has p"); - mp->spinning = spinning; - mp->nextp = (uintptr)p; - runtime_notewakeup(&mp->park); -} - -// Hands off P from syscall or locked M. -static void -handoffp(P *p) -{ - // if it has local work, start it straight away - if(p->runqhead != p->runqtail || runtime_sched->runqsize) { - startm(p, false); - return; - } - // no local work, check that there are no spinning/idle M's, - // otherwise our help is not required - if(runtime_atomicload(&runtime_sched->nmspinning) + runtime_atomicload(&runtime_sched->npidle) == 0 && // TODO: fast atomic - runtime_cas(&runtime_sched->nmspinning, 0, 1)) { - startm(p, true); - return; - } - runtime_lock(&runtime_sched->lock); - if(runtime_sched->gcwaiting) { - p->status = _Pgcstop; - if(--runtime_sched->stopwait == 0) - runtime_notewakeup(&runtime_sched->stopnote); - runtime_unlock(&runtime_sched->lock); - return; - } - if(runtime_sched->runqsize) { - runtime_unlock(&runtime_sched->lock); - startm(p, false); - return; + if(spinning && !runqempty(p)) { + runtime_throw("startm: p has runnable gs"); } - // If this is the last running P and nobody is polling network, - // need to wakeup another M to poll network. - if(runtime_sched->npidle == (uint32)runtime_gomaxprocs-1 && runtime_atomicload64(&runtime_sched->lastpoll) != 0) { - runtime_unlock(&runtime_sched->lock); - startm(p, false); - return; - } - pidleput(p); - runtime_unlock(&runtime_sched->lock); -} - -// Tries to add one more P to execute G's. -// Called when a G is made runnable (newproc, ready). -static void -wakep(void) -{ - // be conservative about spinning threads - if(!runtime_cas(&runtime_sched->nmspinning, 0, 1)) - return; - startm(nil, true); -} - -// Stops execution of the current m that is locked to a g until the g is runnable again. -// Returns with acquired P. -static void -stoplockedm(void) -{ - M *m; - P *p; - - m = g->m; - if(m->lockedg == nil || m->lockedg->lockedm != m) - runtime_throw("stoplockedm: inconsistent locking"); - if(m->p) { - // Schedule another M to run this p. - p = releasep(); - handoffp(p); - } - incidlelocked(1); - // Wait until another thread schedules lockedg again. - runtime_notesleep(&m->park); - m = g->m; - runtime_noteclear(&m->park); - if(m->lockedg->atomicstatus != _Grunnable) - runtime_throw("stoplockedm: not runnable"); - acquirep((P*)m->nextp); - m->nextp = 0; -} - -// Schedules the locked m to run the locked gp. -static void -startlockedm(G *gp) -{ - M *mp; - P *p; - - mp = gp->lockedm; - if(mp == g->m) - runtime_throw("startlockedm: locked to me"); - if(mp->nextp) - runtime_throw("startlockedm: m has p"); - // directly handoff current P to the locked m - incidlelocked(-1); - p = releasep(); + mp->spinning = spinning; mp->nextp = (uintptr)p; runtime_notewakeup(&mp->park); - stopm(); -} - -// Stops the current m for stoptheworld. -// Returns when the world is restarted. -static void -gcstopm(void) -{ - P *p; - - if(!runtime_sched->gcwaiting) - runtime_throw("gcstopm: not waiting for gc"); - if(g->m->spinning) { - g->m->spinning = false; - runtime_xadd(&runtime_sched->nmspinning, -1); - } - p = releasep(); - runtime_lock(&runtime_sched->lock); - p->status = _Pgcstop; - if(--runtime_sched->stopwait == 0) - runtime_notewakeup(&runtime_sched->stopnote); - runtime_unlock(&runtime_sched->lock); - stopm(); -} - -// Schedules gp to run on the current M. -// Never returns. -static void -execute(G *gp) -{ - int32 hz; - - if(gp->atomicstatus != _Grunnable) { - runtime_printf("execute: bad g status %d\n", gp->atomicstatus); - runtime_throw("execute: bad g status"); - } - gp->atomicstatus = _Grunning; - gp->waitsince = 0; - ((P*)g->m->p)->schedtick++; - g->m->curg = gp; - gp->m = g->m; - - // Check whether the profiler needs to be turned on or off. - hz = runtime_sched->profilehz; - if(g->m->profilehz != hz) - runtime_resetcpuprofiler(hz); - - runtime_gogo(gp); -} - -// Finds a runnable goroutine to execute. -// Tries to steal from other P's, get g from global queue, poll network. -static G* -findrunnable(void) -{ - G *gp; - P *p; - int32 i; - -top: - if(runtime_sched->gcwaiting) { - gcstopm(); - goto top; - } - if(runtime_fingwait && runtime_fingwake && (gp = runtime_wakefing()) != nil) - runtime_ready(gp); - // local runq - gp = runqget((P*)g->m->p); - if(gp) - return gp; - // global runq - if(runtime_sched->runqsize) { - runtime_lock(&runtime_sched->lock); - gp = globrunqget((P*)g->m->p, 0); - runtime_unlock(&runtime_sched->lock); - if(gp) - return gp; - } - // poll network - gp = runtime_netpoll(false); // non-blocking - if(gp) { - injectglist((G*)gp->schedlink); - gp->atomicstatus = _Grunnable; - return gp; - } - // If number of spinning M's >= number of busy P's, block. - // This is necessary to prevent excessive CPU consumption - // when GOMAXPROCS>>1 but the program parallelism is low. - if(!g->m->spinning && 2 * runtime_atomicload(&runtime_sched->nmspinning) >= runtime_gomaxprocs - runtime_atomicload(&runtime_sched->npidle)) // TODO: fast atomic - goto stop; - if(!g->m->spinning) { - g->m->spinning = true; - runtime_xadd(&runtime_sched->nmspinning, 1); - } - // random steal from other P's - for(i = 0; i < 2*runtime_gomaxprocs; i++) { - if(runtime_sched->gcwaiting) - goto top; - p = runtime_allp[runtime_fastrand1()%runtime_gomaxprocs]; - if(p == (P*)g->m->p) - gp = runqget(p); - else - gp = runqsteal((P*)g->m->p, p); - if(gp) - return gp; - } -stop: - // return P and block - runtime_lock(&runtime_sched->lock); - if(runtime_sched->gcwaiting) { - runtime_unlock(&runtime_sched->lock); - goto top; - } - if(runtime_sched->runqsize) { - gp = globrunqget((P*)g->m->p, 0); - runtime_unlock(&runtime_sched->lock); - return gp; - } - p = releasep(); - pidleput(p); - runtime_unlock(&runtime_sched->lock); - if(g->m->spinning) { - g->m->spinning = false; - runtime_xadd(&runtime_sched->nmspinning, -1); - } - // check all runqueues once again - for(i = 0; i < runtime_gomaxprocs; i++) { - p = runtime_allp[i]; - if(p && p->runqhead != p->runqtail) { - runtime_lock(&runtime_sched->lock); - p = pidleget(); - runtime_unlock(&runtime_sched->lock); - if(p) { - acquirep(p); - goto top; - } - break; - } - } - // poll network - if(runtime_xchg64(&runtime_sched->lastpoll, 0) != 0) { - if(g->m->p) - runtime_throw("findrunnable: netpoll with p"); - if(g->m->spinning) - runtime_throw("findrunnable: netpoll with spinning"); - gp = runtime_netpoll(true); // block until new work is available - runtime_atomicstore64(&runtime_sched->lastpoll, runtime_nanotime()); - if(gp) { - runtime_lock(&runtime_sched->lock); - p = pidleget(); - runtime_unlock(&runtime_sched->lock); - if(p) { - acquirep(p); - injectglist((G*)gp->schedlink); - gp->atomicstatus = _Grunnable; - return gp; - } - injectglist(gp); - } - } - stopm(); - goto top; -} - -static void -resetspinning(void) -{ - int32 nmspinning; - - if(g->m->spinning) { - g->m->spinning = false; - nmspinning = runtime_xadd(&runtime_sched->nmspinning, -1); - if(nmspinning < 0) - runtime_throw("findrunnable: negative nmspinning"); - } else - nmspinning = runtime_atomicload(&runtime_sched->nmspinning); - - // M wakeup policy is deliberately somewhat conservative (see nmspinning handling), - // so see if we need to wakeup another P here. - if (nmspinning == 0 && runtime_atomicload(&runtime_sched->npidle) > 0) - wakep(); -} - -// Injects the list of runnable G's into the scheduler. -// Can run concurrently with GC. -static void -injectglist(G *glist) -{ - int32 n; - G *gp; - - if(glist == nil) - return; - runtime_lock(&runtime_sched->lock); - for(n = 0; glist; n++) { - gp = glist; - glist = (G*)gp->schedlink; - gp->atomicstatus = _Grunnable; - globrunqput(gp); - } - runtime_unlock(&runtime_sched->lock); - - for(; n && runtime_sched->npidle; n--) - startm(nil, false); -} - -// One round of scheduler: find a runnable goroutine and execute it. -// Never returns. -static void -schedule(void) -{ - G *gp; - uint32 tick; - - if(g->m->locks) - runtime_throw("schedule: holding locks"); - -top: - if(runtime_sched->gcwaiting) { - gcstopm(); - goto top; - } - - gp = nil; - // Check the global runnable queue once in a while to ensure fairness. - // Otherwise two goroutines can completely occupy the local runqueue - // by constantly respawning each other. - tick = ((P*)g->m->p)->schedtick; - // This is a fancy way to say tick%61==0, - // it uses 2 MUL instructions instead of a single DIV and so is faster on modern processors. - if(tick - (((uint64)tick*0x4325c53fu)>>36)*61 == 0 && runtime_sched->runqsize > 0) { - runtime_lock(&runtime_sched->lock); - gp = globrunqget((P*)g->m->p, 1); - runtime_unlock(&runtime_sched->lock); - if(gp) - resetspinning(); - } - if(gp == nil) { - gp = runqget((P*)g->m->p); - if(gp && g->m->spinning) - runtime_throw("schedule: spinning with local work"); - } - if(gp == nil) { - gp = findrunnable(); // blocks until work is available - resetspinning(); - } - - if(gp->lockedm) { - // Hands off own p to the locked m, - // then blocks waiting for a new p. - startlockedm(gp); - goto top; - } - - execute(gp); } // Puts the current goroutine into a waiting state and calls unlockf. @@ -1572,12 +974,12 @@ park0(G *gp) m->waitlock = nil; if(!ok) { gp->atomicstatus = _Grunnable; - execute(gp); // Schedule it back, never returns. + execute(gp, true); // Schedule it back, never returns. } } if(m->lockedg) { stoplockedm(); - execute(gp); // Never returns. + execute(gp, true); // Never returns. } schedule(); } @@ -1606,7 +1008,7 @@ runtime_gosched0(G *gp) runtime_unlock(&runtime_sched->lock); if(m->lockedg) { stoplockedm(); - execute(gp); // Never returns. + execute(gp, true); // Never returns. } schedule(); } @@ -1643,6 +1045,7 @@ goexit0(G *gp) gp->writebuf.__capacity = 0; gp->waitreason = runtime_gostringnocopy(nil); gp->param = nil; + m->curg->m = nil; m->curg = nil; m->lockedg = nil; if(m->locked & ~_LockExternal) { @@ -1896,12 +1299,12 @@ exitsyscall0(G *gp) runtime_unlock(&runtime_sched->lock); if(p) { acquirep(p); - execute(gp); // Never returns. + execute(gp, false); // Never returns. } if(m->lockedg) { // Wait until another thread schedules gp and so m again. stoplockedm(); - execute(gp); // Never returns. + execute(gp, false); // Never returns. } stopm(); schedule(); // Never returns. @@ -2069,7 +1472,7 @@ __go_go(void (*fn)(void*), void* arg) makeGContext(newg, sp, (uintptr)spsize); - runqput(p, newg); + runqput(p, newg, true); if(runtime_atomicload(&runtime_sched->npidle) != 0 && runtime_atomicload(&runtime_sched->nmspinning) == 0 && fn != runtime_main) // TODO: fast atomic wakep(); @@ -2126,23 +1529,6 @@ retry: return gp; } -// Purge all cached G's from gfree list to the global list. -static void -gfpurge(P *p) -{ - G *gp; - - runtime_lock(&runtime_sched->gflock); - while(p->gfreecnt) { - p->gfreecnt--; - gp = p->gfree; - p->gfree = (G*)gp->schedlink; - gp->schedlink = (uintptr)runtime_sched->gfree; - runtime_sched->gfree = gp; - } - runtime_unlock(&runtime_sched->gflock); -} - void runtime_Breakpoint(void) { @@ -2157,38 +1543,6 @@ runtime_Gosched(void) runtime_gosched(); } -// Implementation of runtime.GOMAXPROCS. -// delete when scheduler is even stronger - -intgo runtime_GOMAXPROCS(intgo) - __asm__(GOSYM_PREFIX "runtime.GOMAXPROCS"); - -intgo -runtime_GOMAXPROCS(intgo n) -{ - intgo ret; - - if(n > _MaxGomaxprocs) - n = _MaxGomaxprocs; - runtime_lock(&runtime_sched->lock); - ret = (intgo)runtime_gomaxprocs; - if(n <= 0 || n == ret) { - runtime_unlock(&runtime_sched->lock); - return ret; - } - runtime_unlock(&runtime_sched->lock); - - runtime_acquireWorldsema(); - g->m->gcing = 1; - runtime_stopTheWorldWithSema(); - newprocs = (int32)n; - g->m->gcing = 0; - runtime_releaseWorldsema(); - runtime_startTheWorldWithSema(); - - return ret; -} - // lockOSThread is called by runtime.LockOSThread and runtime.lockOSThread below // after they modify m->locked. Do not allow preemption during this call, // or else the m might be different in this function than in the caller. @@ -2365,599 +1719,6 @@ runtime_setcpuprofilerate_m(int32 hz) g->m->locks--; } -// Change number of processors. The world is stopped, sched is locked. -static void -procresize(int32 new) -{ - int32 i, old; - bool pempty; - G *gp; - P *p; - intgo j; - - old = runtime_gomaxprocs; - if(old < 0 || old > _MaxGomaxprocs || new <= 0 || new >_MaxGomaxprocs) - runtime_throw("procresize: invalid arg"); - // initialize new P's - for(i = 0; i < new; i++) { - p = runtime_allp[i]; - if(p == nil) { - p = (P*)runtime_mallocgc(sizeof(*p), 0, FlagNoInvokeGC); - p->id = i; - p->status = _Pgcstop; - p->deferpool.__values = &p->deferpoolbuf[0]; - p->deferpool.__count = 0; - p->deferpool.__capacity = nelem(p->deferpoolbuf); - runtime_atomicstorep(&runtime_allp[i], p); - } - if(p->mcache == nil) { - if(old==0 && i==0) - p->mcache = g->m->mcache; // bootstrap - else - p->mcache = runtime_allocmcache(); - } - } - - // redistribute runnable G's evenly - // collect all runnable goroutines in global queue preserving FIFO order - // FIFO order is required to ensure fairness even during frequent GCs - // see http://golang.org/issue/7126 - pempty = false; - while(!pempty) { - pempty = true; - for(i = 0; i < old; i++) { - p = runtime_allp[i]; - if(p->runqhead == p->runqtail) - continue; - pempty = false; - // pop from tail of local queue - p->runqtail--; - gp = (G*)p->runq[p->runqtail%nelem(p->runq)]; - // push onto head of global queue - gp->schedlink = runtime_sched->runqhead; - runtime_sched->runqhead = (uintptr)gp; - if(runtime_sched->runqtail == 0) - runtime_sched->runqtail = (uintptr)gp; - runtime_sched->runqsize++; - } - } - // fill local queues with at most nelem(p->runq)/2 goroutines - // start at 1 because current M already executes some G and will acquire allp[0] below, - // so if we have a spare G we want to put it into allp[1]. - for(i = 1; (uint32)i < (uint32)new * nelem(p->runq)/2 && runtime_sched->runqsize > 0; i++) { - gp = (G*)runtime_sched->runqhead; - runtime_sched->runqhead = gp->schedlink; - if(runtime_sched->runqhead == 0) - runtime_sched->runqtail = 0; - runtime_sched->runqsize--; - runqput(runtime_allp[i%new], gp); - } - - // free unused P's - for(i = new; i < old; i++) { - p = runtime_allp[i]; - for(j = 0; j < p->deferpool.__count; j++) { - ((struct _defer**)p->deferpool.__values)[j] = nil; - } - p->deferpool.__count = 0; - runtime_freemcache(p->mcache); - p->mcache = nil; - gfpurge(p); - p->status = _Pdead; - // can't free P itself because it can be referenced by an M in syscall - } - - if(g->m->p) - ((P*)g->m->p)->m = 0; - g->m->p = 0; - g->m->mcache = nil; - p = runtime_allp[0]; - p->m = 0; - p->status = _Pidle; - acquirep(p); - for(i = new-1; i > 0; i--) { - p = runtime_allp[i]; - p->status = _Pidle; - pidleput(p); - } - runtime_atomicstore((uint32*)&runtime_gomaxprocs, new); -} - -// Associate p and the current m. -static void -acquirep(P *p) -{ - M *m; - - m = g->m; - if(m->p || m->mcache) - runtime_throw("acquirep: already in go"); - if(p->m || p->status != _Pidle) { - runtime_printf("acquirep: p->m=%p(%d) p->status=%d\n", p->m, p->m ? ((M*)p->m)->id : 0, p->status); - runtime_throw("acquirep: invalid p state"); - } - m->mcache = p->mcache; - m->p = (uintptr)p; - p->m = (uintptr)m; - p->status = _Prunning; -} - -// Disassociate p and the current m. -static P* -releasep(void) -{ - M *m; - P *p; - - m = g->m; - if(m->p == 0 || m->mcache == nil) - runtime_throw("releasep: invalid arg"); - p = (P*)m->p; - if((M*)p->m != m || p->mcache != m->mcache || p->status != _Prunning) { - runtime_printf("releasep: m=%p m->p=%p p->m=%p m->mcache=%p p->mcache=%p p->status=%d\n", - m, m->p, p->m, m->mcache, p->mcache, p->status); - runtime_throw("releasep: invalid p state"); - } - m->p = 0; - m->mcache = nil; - p->m = 0; - p->status = _Pidle; - return p; -} - -static void -incidlelocked(int32 v) -{ - runtime_lock(&runtime_sched->lock); - runtime_sched->nmidlelocked += v; - if(v > 0) - checkdead(); - runtime_unlock(&runtime_sched->lock); -} - -static void -sysmon(void) -{ - uint32 idle, delay; - int64 now, lastpoll, lasttrace; - G *gp; - - lasttrace = 0; - idle = 0; // how many cycles in succession we had not wokeup somebody - delay = 0; - for(;;) { - if(idle == 0) // start with 20us sleep... - delay = 20; - else if(idle > 50) // start doubling the sleep after 1ms... - delay *= 2; - if(delay > 10*1000) // up to 10ms - delay = 10*1000; - runtime_usleep(delay); - if(runtime_debug.schedtrace <= 0 && - (runtime_sched->gcwaiting || runtime_atomicload(&runtime_sched->npidle) == (uint32)runtime_gomaxprocs)) { // TODO: fast atomic - runtime_lock(&runtime_sched->lock); - if(runtime_atomicload(&runtime_sched->gcwaiting) || runtime_atomicload(&runtime_sched->npidle) == (uint32)runtime_gomaxprocs) { - runtime_atomicstore(&runtime_sched->sysmonwait, 1); - runtime_unlock(&runtime_sched->lock); - runtime_notesleep(&runtime_sched->sysmonnote); - runtime_noteclear(&runtime_sched->sysmonnote); - idle = 0; - delay = 20; - } else - runtime_unlock(&runtime_sched->lock); - } - // poll network if not polled for more than 10ms - lastpoll = runtime_atomicload64(&runtime_sched->lastpoll); - now = runtime_nanotime(); - if(lastpoll != 0 && lastpoll + 10*1000*1000 < now) { - runtime_cas64(&runtime_sched->lastpoll, lastpoll, now); - gp = runtime_netpoll(false); // non-blocking - if(gp) { - // Need to decrement number of idle locked M's - // (pretending that one more is running) before injectglist. - // Otherwise it can lead to the following situation: - // injectglist grabs all P's but before it starts M's to run the P's, - // another M returns from syscall, finishes running its G, - // observes that there is no work to do and no other running M's - // and reports deadlock. - incidlelocked(-1); - injectglist(gp); - incidlelocked(1); - } - } - // retake P's blocked in syscalls - // and preempt long running G's - if(retake(now)) - idle = 0; - else - idle++; - - if(runtime_debug.schedtrace > 0 && lasttrace + runtime_debug.schedtrace*1000000ll <= now) { - lasttrace = now; - runtime_schedtrace(runtime_debug.scheddetail); - } - } -} - -typedef struct Pdesc Pdesc; -struct Pdesc -{ - uint32 schedtick; - int64 schedwhen; - uint32 syscalltick; - int64 syscallwhen; -}; -static Pdesc pdesc[_MaxGomaxprocs]; - -static uint32 -retake(int64 now) -{ - uint32 i, s, n; - int64 t; - P *p; - Pdesc *pd; - - n = 0; - for(i = 0; i < (uint32)runtime_gomaxprocs; i++) { - p = runtime_allp[i]; - if(p==nil) - continue; - pd = &pdesc[i]; - s = p->status; - if(s == _Psyscall) { - // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us). - t = p->syscalltick; - if(pd->syscalltick != t) { - pd->syscalltick = t; - pd->syscallwhen = now; - continue; - } - // On the one hand we don't want to retake Ps if there is no other work to do, - // but on the other hand we want to retake them eventually - // because they can prevent the sysmon thread from deep sleep. - if(p->runqhead == p->runqtail && - runtime_atomicload(&runtime_sched->nmspinning) + runtime_atomicload(&runtime_sched->npidle) > 0 && - pd->syscallwhen + 10*1000*1000 > now) - continue; - // Need to decrement number of idle locked M's - // (pretending that one more is running) before the CAS. - // Otherwise the M from which we retake can exit the syscall, - // increment nmidle and report deadlock. - incidlelocked(-1); - if(runtime_cas(&p->status, s, _Pidle)) { - n++; - handoffp(p); - } - incidlelocked(1); - } else if(s == _Prunning) { - // Preempt G if it's running for more than 10ms. - t = p->schedtick; - if(pd->schedtick != t) { - pd->schedtick = t; - pd->schedwhen = now; - continue; - } - if(pd->schedwhen + 10*1000*1000 > now) - continue; - // preemptone(p); - } - } - return n; -} - -// Tell all goroutines that they have been preempted and they should stop. -// This function is purely best-effort. It can fail to inform a goroutine if a -// processor just started running it. -// No locks need to be held. -// Returns true if preemption request was issued to at least one goroutine. -static bool -preemptall(void) -{ - return false; -} - -// Put mp on midle list. -// Sched must be locked. -static void -mput(M *mp) -{ - mp->schedlink = runtime_sched->midle; - runtime_sched->midle = (uintptr)mp; - runtime_sched->nmidle++; - checkdead(); -} - -// Try to get an m from midle list. -// Sched must be locked. -static M* -mget(void) -{ - M *mp; - - if((mp = (M*)runtime_sched->midle) != nil){ - runtime_sched->midle = mp->schedlink; - runtime_sched->nmidle--; - } - return mp; -} - -// Put gp on the global runnable queue. -// Sched must be locked. -static void -globrunqput(G *gp) -{ - gp->schedlink = 0; - if(runtime_sched->runqtail) - ((G*)runtime_sched->runqtail)->schedlink = (uintptr)gp; - else - runtime_sched->runqhead = (uintptr)gp; - runtime_sched->runqtail = (uintptr)gp; - runtime_sched->runqsize++; -} - -// Put a batch of runnable goroutines on the global runnable queue. -// Sched must be locked. -static void -globrunqputbatch(G *ghead, G *gtail, int32 n) -{ - gtail->schedlink = 0; - if(runtime_sched->runqtail) - ((G*)runtime_sched->runqtail)->schedlink = (uintptr)ghead; - else - runtime_sched->runqhead = (uintptr)ghead; - runtime_sched->runqtail = (uintptr)gtail; - runtime_sched->runqsize += n; -} - -// Try get a batch of G's from the global runnable queue. -// Sched must be locked. -static G* -globrunqget(P *p, int32 max) -{ - G *gp, *gp1; - int32 n; - - if(runtime_sched->runqsize == 0) - return nil; - n = runtime_sched->runqsize/runtime_gomaxprocs+1; - if(n > runtime_sched->runqsize) - n = runtime_sched->runqsize; - if(max > 0 && n > max) - n = max; - if((uint32)n > nelem(p->runq)/2) - n = nelem(p->runq)/2; - runtime_sched->runqsize -= n; - if(runtime_sched->runqsize == 0) - runtime_sched->runqtail = 0; - gp = (G*)runtime_sched->runqhead; - runtime_sched->runqhead = gp->schedlink; - n--; - while(n--) { - gp1 = (G*)runtime_sched->runqhead; - runtime_sched->runqhead = gp1->schedlink; - runqput(p, gp1); - } - return gp; -} - -// Put p to on pidle list. -// Sched must be locked. -static void -pidleput(P *p) -{ - p->link = runtime_sched->pidle; - runtime_sched->pidle = (uintptr)p; - runtime_xadd(&runtime_sched->npidle, 1); // TODO: fast atomic -} - -// Try get a p from pidle list. -// Sched must be locked. -static P* -pidleget(void) -{ - P *p; - - p = (P*)runtime_sched->pidle; - if(p) { - runtime_sched->pidle = p->link; - runtime_xadd(&runtime_sched->npidle, -1); // TODO: fast atomic - } - return p; -} - -// Try to put g on local runnable queue. -// If it's full, put onto global queue. -// Executed only by the owner P. -static void -runqput(P *p, G *gp) -{ - uint32 h, t; - -retry: - h = runtime_atomicload(&p->runqhead); // load-acquire, synchronize with consumers - t = p->runqtail; - if(t - h < nelem(p->runq)) { - p->runq[t%nelem(p->runq)] = (uintptr)gp; - runtime_atomicstore(&p->runqtail, t+1); // store-release, makes the item available for consumption - return; - } - if(runqputslow(p, gp, h, t)) - return; - // the queue is not full, now the put above must suceed - goto retry; -} - -// Put g and a batch of work from local runnable queue on global queue. -// Executed only by the owner P. -static bool -runqputslow(P *p, G *gp, uint32 h, uint32 t) -{ - G *batch[nelem(p->runq)/2+1]; - uint32 n, i; - - // First, grab a batch from local queue. - n = t-h; - n = n/2; - if(n != nelem(p->runq)/2) - runtime_throw("runqputslow: queue is not full"); - for(i=0; irunq[(h+i)%nelem(p->runq)]; - if(!runtime_cas(&p->runqhead, h, h+n)) // cas-release, commits consume - return false; - batch[n] = gp; - // Link the goroutines. - for(i=0; ischedlink = (uintptr)batch[i+1]; - // Now put the batch on global queue. - runtime_lock(&runtime_sched->lock); - globrunqputbatch(batch[0], batch[n], n+1); - runtime_unlock(&runtime_sched->lock); - return true; -} - -// Get g from local runnable queue. -// Executed only by the owner P. -static G* -runqget(P *p) -{ - G *gp; - uint32 t, h; - - for(;;) { - h = runtime_atomicload(&p->runqhead); // load-acquire, synchronize with other consumers - t = p->runqtail; - if(t == h) - return nil; - gp = (G*)p->runq[h%nelem(p->runq)]; - if(runtime_cas(&p->runqhead, h, h+1)) // cas-release, commits consume - return gp; - } -} - -// Grabs a batch of goroutines from local runnable queue. -// batch array must be of size nelem(p->runq)/2. Returns number of grabbed goroutines. -// Can be executed by any P. -static uint32 -runqgrab(P *p, G **batch) -{ - uint32 t, h, n, i; - - for(;;) { - h = runtime_atomicload(&p->runqhead); // load-acquire, synchronize with other consumers - t = runtime_atomicload(&p->runqtail); // load-acquire, synchronize with the producer - n = t-h; - n = n - n/2; - if(n == 0) - break; - if(n > nelem(p->runq)/2) // read inconsistent h and t - continue; - for(i=0; irunq[(h+i)%nelem(p->runq)]; - if(runtime_cas(&p->runqhead, h, h+n)) // cas-release, commits consume - break; - } - return n; -} - -// Steal half of elements from local runnable queue of p2 -// and put onto local runnable queue of p. -// Returns one of the stolen elements (or nil if failed). -static G* -runqsteal(P *p, P *p2) -{ - G *gp; - G *batch[nelem(p->runq)/2]; - uint32 t, h, n, i; - - n = runqgrab(p2, batch); - if(n == 0) - return nil; - n--; - gp = batch[n]; - if(n == 0) - return gp; - h = runtime_atomicload(&p->runqhead); // load-acquire, synchronize with consumers - t = p->runqtail; - if(t - h + n >= nelem(p->runq)) - runtime_throw("runqsteal: runq overflow"); - for(i=0; irunq[t%nelem(p->runq)] = (uintptr)batch[i]; - runtime_atomicstore(&p->runqtail, t); // store-release, makes the item available for consumption - return gp; -} - -void runtime_testSchedLocalQueue(void) - __asm__("runtime.testSchedLocalQueue"); - -void -runtime_testSchedLocalQueue(void) -{ - P p; - G gs[nelem(p.runq)]; - int32 i, j; - - runtime_memclr((byte*)&p, sizeof(p)); - - for(i = 0; i < (int32)nelem(gs); i++) { - if(runqget(&p) != nil) - runtime_throw("runq is not empty initially"); - for(j = 0; j < i; j++) - runqput(&p, &gs[i]); - for(j = 0; j < i; j++) { - if(runqget(&p) != &gs[i]) { - runtime_printf("bad element at iter %d/%d\n", i, j); - runtime_throw("bad element"); - } - } - if(runqget(&p) != nil) - runtime_throw("runq is not empty afterwards"); - } -} - -void runtime_testSchedLocalQueueSteal(void) - __asm__("runtime.testSchedLocalQueueSteal"); - -void -runtime_testSchedLocalQueueSteal(void) -{ - P p1, p2; - G gs[nelem(p1.runq)], *gp; - int32 i, j, s; - - runtime_memclr((byte*)&p1, sizeof(p1)); - runtime_memclr((byte*)&p2, sizeof(p2)); - - for(i = 0; i < (int32)nelem(gs); i++) { - for(j = 0; j < i; j++) { - gs[j].sig = 0; - runqput(&p1, &gs[j]); - } - gp = runqsteal(&p2, &p1); - s = 0; - if(gp) { - s++; - gp->sig++; - } - while((gp = runqget(&p2)) != nil) { - s++; - gp->sig++; - } - while((gp = runqget(&p1)) != nil) - gp->sig++; - for(j = 0; j < i; j++) { - if(gs[j].sig != 1) { - runtime_printf("bad element %d(%d) at iter %d\n", j, gs[j].sig, i); - runtime_throw("bad element"); - } - } - if(s != i/2 && s != i/2+1) { - runtime_printf("bad steal %d, want %d or %d, iter %d\n", - s, i/2, i/2+1, i); - runtime_throw("bad steal"); - } - } -} - intgo runtime_setmaxthreads(intgo in) { @@ -3041,56 +1802,15 @@ os_beforeExit() { } -// Active spinning for sync.Mutex. -//go:linkname sync_runtime_canSpin sync.runtime_canSpin - -enum -{ - ACTIVE_SPIN = 4, - ACTIVE_SPIN_CNT = 30, -}; - -extern _Bool sync_runtime_canSpin(intgo i) - __asm__ (GOSYM_PREFIX "sync.runtime_canSpin"); - -_Bool -sync_runtime_canSpin(intgo i) -{ - P *p; - - // sync.Mutex is cooperative, so we are conservative with spinning. - // Spin only few times and only if running on a multicore machine and - // GOMAXPROCS>1 and there is at least one other running P and local runq is empty. - // As opposed to runtime mutex we don't do passive spinning here, - // because there can be work on global runq on on other Ps. - if (i >= ACTIVE_SPIN || runtime_ncpu <= 1 || runtime_gomaxprocs <= (int32)(runtime_sched->npidle+runtime_sched->nmspinning)+1) { - return false; - } - p = (P*)g->m->p; - return p != nil && p->runqhead == p->runqtail; -} - -//go:linkname sync_runtime_doSpin sync.runtime_doSpin -//go:nosplit - -extern void sync_runtime_doSpin(void) - __asm__ (GOSYM_PREFIX "sync.runtime_doSpin"); - -void -sync_runtime_doSpin() -{ - runtime_procyield(ACTIVE_SPIN_CNT); -} - // For Go code to look at variables, until we port proc.go. -extern M** runtime_go_allm(void) +extern M* runtime_go_allm(void) __asm__ (GOSYM_PREFIX "runtime.allm"); -M** +M* runtime_go_allm() { - return &runtime_allm; + return runtime_allm; } intgo NumCPU(void) __asm__ (GOSYM_PREFIX "runtime.NumCPU"); diff --git a/libgo/runtime/runtime.h b/libgo/runtime/runtime.h index c8f490f20de..438349871c9 100644 --- a/libgo/runtime/runtime.h +++ b/libgo/runtime/runtime.h @@ -240,7 +240,6 @@ extern G* runtime_lastg; extern M* runtime_allm; extern P** runtime_allp; extern Sched* runtime_sched; -extern int32 runtime_gomaxprocs; extern uint32 runtime_panicking(void) __asm__ (GOSYM_PREFIX "runtime.getPanicking"); extern int8* runtime_goos; @@ -260,7 +259,8 @@ extern bool runtime_isarchive; intgo runtime_findnull(const byte*) __asm__ (GOSYM_PREFIX "runtime.findnull"); -void runtime_gogo(G*); +void runtime_gogo(G*) + __asm__ (GOSYM_PREFIX "runtime.gogo"); struct __go_func_type; void runtime_args(int32, byte**) __asm__ (GOSYM_PREFIX "runtime.args"); @@ -294,7 +294,8 @@ void runtime_printtrace(Slice, G*) #define runtime_read(d, v, n) read((d), (v), (n)) #define runtime_write(d, v, n) write((d), (v), (n)) #define runtime_close(d) close(d) -void runtime_ready(G*); +void runtime_ready(G*, intgo, bool) + __asm__ (GOSYM_PREFIX "runtime.ready"); String runtime_getenv(const char*); int32 runtime_atoi(const byte*, intgo); void* runtime_mstart(void*); @@ -307,7 +308,8 @@ void runtime_signalstack(byte*, uintptr) __asm__ (GOSYM_PREFIX "runtime.signalstack"); MCache* runtime_allocmcache(void) __asm__ (GOSYM_PREFIX "runtime.allocmcache"); -void runtime_freemcache(MCache*); +void runtime_freemcache(MCache*) + __asm__ (GOSYM_PREFIX "runtime.freemcache"); void runtime_mallocinit(void); void runtime_mprofinit(void); #define runtime_getcallersp(p) __builtin_frame_address(0) @@ -368,8 +370,6 @@ int64 runtime_unixnanotime(void) // real time, can skip void runtime_dopanic(int32) __attribute__ ((noreturn)); void runtime_startpanic(void) __asm__ (GOSYM_PREFIX "runtime.startpanic"); -void runtime_freezetheworld(void) - __asm__ (GOSYM_PREFIX "runtime.freezetheworld"); void runtime_unwindstack(G*, byte*); void runtime_sigprof() __asm__ (GOSYM_PREFIX "runtime.sigprof"); -- 2.30.2