virtual ~_Result_base();
};
- /// Result.
+ /// A unique_ptr for result objects.
+ template<typename _Res>
+ using _Ptr = unique_ptr<_Res, _Result_base::_Deleter>;
+
+ /// A result object that has storage for an object of type _Res.
template<typename _Res>
struct _Result : _Result_base
{
void _M_destroy() { delete this; }
};
- /// A unique_ptr based on the instantiating type.
- template<typename _Res>
- using _Ptr = unique_ptr<_Res, _Result_base::_Deleter>;
-
- /// Result_alloc.
+ /// A result object that uses an allocator.
template<typename _Res, typename _Alloc>
struct _Result_alloc final : _Result<_Res>, _Alloc
{
}
};
+ // Create a result object that uses an allocator.
template<typename _Res, typename _Allocator>
static _Ptr<_Result_alloc<_Res, _Allocator>>
_S_allocate_result(const _Allocator& __a)
return _Ptr<__result_type>(__p);
}
+ // Keep it simple for std::allocator.
template<typename _Res, typename _Tp>
static _Ptr<_Result<_Res>>
_S_allocate_result(const std::allocator<_Tp>& __a)
return _Ptr<_Result<_Res>>(new _Result<_Res>);
}
- /// Base class for state between a promise and one or more
- /// associated futures.
+ // Base class for various types of shared state created by an
+ // asynchronous provider (such as a std::promise) and shared with one
+ // or more associated futures.
class _State_baseV2
{
typedef _Ptr<_Result_base> _Ptr_type;
_Ptr_type _M_result;
mutex _M_mutex;
condition_variable _M_cond;
- atomic_flag _M_retrieved;
+ atomic_flag _M_retrieved = ATOMIC_FLAG_INIT;
+ bool _M_ready = false;
once_flag _M_once;
public:
- _State_baseV2() noexcept : _M_result(), _M_retrieved(ATOMIC_FLAG_INIT)
- { }
+ _State_baseV2() noexcept = default;
_State_baseV2(const _State_baseV2&) = delete;
_State_baseV2& operator=(const _State_baseV2&) = delete;
virtual ~_State_baseV2() = default;
_Result_base&
wait()
{
+ // Run any deferred function or join any asynchronous thread:
_M_complete_async();
+
unique_lock<mutex> __lock(_M_mutex);
- _M_cond.wait(__lock, [&] { return _M_ready(); });
+ _M_cond.wait(__lock, [&] { return _M_ready; });
return *_M_result;
}
wait_for(const chrono::duration<_Rep, _Period>& __rel)
{
unique_lock<mutex> __lock(_M_mutex);
- if (_M_ready())
+ if (_M_ready)
return future_status::ready;
if (_M_has_deferred())
return future_status::deferred;
- if (_M_cond.wait_for(__lock, __rel, [&] { return _M_ready(); }))
+ if (_M_cond.wait_for(__lock, __rel, [&] { return _M_ready; }))
{
// _GLIBCXX_RESOLVE_LIB_DEFECTS
// 2100. timed waiting functions must also join
+ // This call is a no-op by default except on an async future,
+ // in which case the async thread is joined. It's also not a
+ // no-op for a deferred future, but such a future will never
+ // reach this point because it returns future_status::deferred
+ // instead of waiting for the future to become ready (see
+ // above). Async futures synchronize in this call, so we need
+ // no further synchronization here.
_M_complete_async();
+
return future_status::ready;
}
return future_status::timeout;
wait_until(const chrono::time_point<_Clock, _Duration>& __abs)
{
unique_lock<mutex> __lock(_M_mutex);
- if (_M_ready())
+ if (_M_ready)
return future_status::ready;
if (_M_has_deferred())
return future_status::deferred;
- if (_M_cond.wait_until(__lock, __abs, [&] { return _M_ready(); }))
+ if (_M_cond.wait_until(__lock, __abs, [&] { return _M_ready; }))
{
// _GLIBCXX_RESOLVE_LIB_DEFECTS
// 2100. timed waiting functions must also join
+ // See wait_for(...) above.
_M_complete_async();
+
return future_status::ready;
}
return future_status::timeout;
}
+ // Provide a result to the shared state and make it ready.
+ // Atomically performs:
+ // if (!_M_ready) {
+ // _M_result = __res();
+ // _M_ready = true;
+ // }
void
_M_set_result(function<_Ptr_type()> __res, bool __ignore_failure = false)
{
call_once(_M_once, &_State_baseV2::_M_do_set, this,
std::__addressof(__res), std::__addressof(__lock));
if (__lock.owns_lock())
- _M_cond.notify_all();
+ {
+ _M_ready = true;
+ _M_cond.notify_all();
+ }
else if (!__ignore_failure)
__throw_future_error(int(future_errc::promise_already_satisfied));
}
+ // Provide a result to the shared state but delay making it ready
+ // until the calling thread exits.
+ // Atomically performs:
+ // if (!_M_ready) {
+ // _M_result = __res();
+ // }
+ void
+ _M_set_delayed_result(function<_Ptr_type()> __res,
+ weak_ptr<_State_baseV2> __self)
+ {
+ unique_ptr<_Make_ready> __mr{new _Make_ready};
+ unique_lock<mutex> __lock(_M_mutex, defer_lock);
+ // all calls to this function are serialized,
+ // side-effects of invoking __res only happen once
+ call_once(_M_once, &_State_baseV2::_M_do_set, this,
+ std::__addressof(__res), std::__addressof(__lock));
+ if (!__lock.owns_lock())
+ __throw_future_error(int(future_errc::promise_already_satisfied));
+ __mr->_M_shared_state = std::move(__self);
+ __mr->_M_set();
+ __mr.release();
+ }
+
+ // Abandon this shared state.
void
_M_break_promise(_Ptr_type __res)
{
{
error_code __ec(make_error_code(future_errc::broken_promise));
__res->_M_error = make_exception_ptr(future_error(__ec));
+ // This function is only called when the last asynchronous result
+ // provider is abandoning this shared state, so noone can be
+ // trying to make the shared state ready at the same time, and
+ // we can access _M_result directly instead of through call_once.
{
lock_guard<mutex> __lock(_M_mutex);
_M_result.swap(__res);
+ _M_ready = true;
}
_M_cond.notify_all();
}
}
- // Called when this object is passed to a future.
+ // Called when this object is first passed to a future.
void
_M_set_retrieved_flag()
{
|| is_same<const _Res, _Arg>::value, // promise<R>
"Invalid specialisation");
+ // Used by std::promise to copy construct the result.
typename promise<_Res>::_Ptr_type operator()()
{
_State_baseV2::_S_check(_M_promise->_M_future);
template<typename _Res>
struct _Setter<_Res, _Res&&>
{
+ // Used by std::promise to move construct the result.
typename promise<_Res>::_Ptr_type operator()()
{
_State_baseV2::_S_check(_M_promise->_M_future);
template<typename _Res>
struct _Setter<_Res, __exception_ptr_tag>
{
+ // Used by std::promise to store an exception as the result.
typename promise<_Res>::_Ptr_type operator()()
{
_State_baseV2::_S_check(_M_promise->_M_future);
}
private:
+ // The function invoked with std::call_once(_M_once, ...).
void
_M_do_set(function<_Ptr_type()>* __f, unique_lock<mutex>* __lock)
{
_M_result.swap(__res);
}
- bool _M_ready() const noexcept { return static_cast<bool>(_M_result); }
-
// Wait for completion of async function.
virtual void _M_complete_async() { }
- // Return true if state contains a deferred function.
- // Caller must own _M_mutex.
+ // Return true if state corresponds to a deferred function.
virtual bool _M_has_deferred() const { return false; }
+
+ struct _Make_ready final : __at_thread_exit_elt
+ {
+ weak_ptr<_State_baseV2> _M_shared_state;
+ static void _S_run(void*);
+ void _M_set();
+ };
};
#ifdef _GLIBCXX_ASYNC_ABI_COMPAT
_Result() noexcept : _M_value_ptr() { }
- void _M_set(_Res& __res) noexcept { _M_value_ptr = &__res; }
+ void
+ _M_set(_Res& __res) noexcept
+ { _M_value_ptr = std::addressof(__res); }
_Res& _M_get() noexcept { return *_M_value_ptr; }
void
set_exception(exception_ptr __p)
{ _M_future->_M_set_result(_State::__setter(__p, this)); }
+
+ void
+ set_value_at_thread_exit(const _Res& __r)
+ {
+ _M_future->_M_set_delayed_result(_State::__setter(this, __r),
+ _M_future);
+ }
+
+ void
+ set_value_at_thread_exit(_Res&& __r)
+ {
+ _M_future->_M_set_delayed_result(
+ _State::__setter(this, std::move(__r)), _M_future);
+ }
+
+ void
+ set_exception_at_thread_exit(exception_ptr __p)
+ {
+ _M_future->_M_set_delayed_result(_State::__setter(__p, this),
+ _M_future);
+ }
};
template<typename _Res>
void
set_exception(exception_ptr __p)
{ _M_future->_M_set_result(_State::__setter(__p, this)); }
+
+ void
+ set_value_at_thread_exit(_Res& __r)
+ {
+ _M_future->_M_set_delayed_result(_State::__setter(this, __r),
+ _M_future);
+ }
+
+ void
+ set_exception_at_thread_exit(exception_ptr __p)
+ {
+ _M_future->_M_set_delayed_result(_State::__setter(__p, this),
+ _M_future);
+ }
};
/// Explicit specialization for promise<void>
void
set_exception(exception_ptr __p)
{ _M_future->_M_set_result(_State::__setter(__p, this)); }
+
+ void
+ set_value_at_thread_exit();
+
+ void
+ set_exception_at_thread_exit(exception_ptr __p)
+ {
+ _M_future->_M_set_delayed_result(_State::__setter(__p, this),
+ _M_future);
+ }
};
// set void
promise<void>::set_value()
{ _M_future->_M_set_result(_State::_Setter<void, void>{ this }); }
+ inline void
+ promise<void>::set_value_at_thread_exit()
+ {
+ _M_future->_M_set_delayed_result(_State::_Setter<void, void>{this},
+ _M_future);
+ }
+
template<typename _Ptr_type, typename _Fn, typename _Res>
struct __future_base::_Task_setter
{
+ // Invoke the function and provide the result to the caller.
_Ptr_type operator()()
{
__try
_Fn* _M_fn;
};
+ // Holds storage for a packaged_task's result.
template<typename _Res, typename... _Args>
struct __future_base::_Task_state_base<_Res(_Args...)>
: __future_base::_State_base
: _M_result(_S_allocate_result<_Res>(__a))
{ }
+ // Invoke the stored task and make the state ready.
virtual void
- _M_run(_Args... __args) = 0;
+ _M_run(_Args&&... __args) = 0;
+
+ // Invoke the stored task and make the state ready at thread exit.
+ virtual void
+ _M_run_delayed(_Args&&... __args, weak_ptr<_State_base>) = 0;
virtual shared_ptr<_Task_state_base>
_M_reset() = 0;
_Ptr_type _M_result;
};
+ // Holds a packaged_task's stored task.
template<typename _Fn, typename _Alloc, typename _Res, typename... _Args>
struct __future_base::_Task_state<_Fn, _Alloc, _Res(_Args...)> final
: __future_base::_Task_state_base<_Res(_Args...)>
private:
virtual void
- _M_run(_Args... __args)
+ _M_run(_Args&&... __args)
{
// bound arguments decay so wrap lvalue references
auto __boundfn = std::__bind_simple(std::ref(_M_impl._M_fn),
this->_M_set_result(_S_task_setter(this->_M_result, __boundfn));
}
+ virtual void
+ _M_run_delayed(_Args&&... __args, weak_ptr<_State_base> __self)
+ {
+ // bound arguments decay so wrap lvalue references
+ auto __boundfn = std::__bind_simple(std::ref(_M_impl._M_fn),
+ _S_maybe_wrap_ref(std::forward<_Args>(__args))...);
+ this->_M_set_delayed_result(_S_task_setter(this->_M_result, __boundfn),
+ std::move(__self));
+ }
+
virtual shared_ptr<_Task_state_base<_Res(_Args...)>>
_M_reset();
_M_state->_M_run(std::forward<_ArgTypes>(__args)...);
}
+ void
+ make_ready_at_thread_exit(_ArgTypes... __args)
+ {
+ __future_base::_State_base::_S_check(_M_state);
+ _M_state->_M_run_delayed(std::forward<_ArgTypes>(__args)..., _M_state);
+ }
+
void
reset()
{
: public true_type { };
+ // Shared state created by std::async().
+ // Holds a deferred function and storage for its result.
template<typename _BoundFn, typename _Res>
class __future_base::_Deferred_state final
: public __future_base::_State_base
virtual void
_M_complete_async()
{
- // safe to call multiple times so ignore failure
+ // Multiple threads can call a waiting function on the future and
+ // reach this point at the same time. The call_once in _M_set_result
+ // ensures only the first one run the deferred function, stores the
+ // result in _M_result, swaps that with the base _M_result and makes
+ // the state ready. Tell _M_set_result to ignore failure so all later
+ // calls do nothing.
_M_set_result(_S_task_setter(_M_result, _M_fn), true);
}
- virtual bool
- _M_has_deferred() const { return static_cast<bool>(_M_result); }
+ // Caller should check whether the state is ready first, because this
+ // function will return true even after the deferred function has run.
+ virtual bool _M_has_deferred() const { true; }
};
+ // Common functionality hoisted out of the _Async_state_impl template.
class __future_base::_Async_state_commonV2
: public __future_base::_State_base
{
~_Async_state_commonV2() = default;
// Make waiting functions block until the thread completes, as if joined.
+ //
+ // This function is used by wait() to satisfy the first requirement below
+ // and by wait_for() / wait_until() to satisfy the second.
+ //
+ // [futures.async]:
+ //
+ // — a call to a waiting function on an asynchronous return object that
+ // shares the shared state created by this async call shall block until
+ // the associated thread has completed, as if joined, or else time out.
+ //
+ // — the associated thread completion synchronizes with the return from
+ // the first function that successfully detects the ready status of the
+ // shared state or with the return from the last function that releases
+ // the shared state, whichever happens first.
virtual void _M_complete_async() { _M_join(); }
void _M_join() { std::call_once(_M_once, &thread::join, ref(_M_thread)); }
once_flag _M_once;
};
+ // Shared state created by std::async().
+ // Starts a new thread that runs a function and makes the shared state ready.
template<typename _BoundFn, typename _Res>
class __future_base::_Async_state_impl final
: public __future_base::_Async_state_commonV2
} };
}
- ~_Async_state_impl() { _M_join(); }
+ // Must not destroy _M_result and _M_fn until the thread finishes.
+ // Call join() directly rather than through _M_join() because no other
+ // thread can be referring to this state if it is being destroyed.
+ ~_Async_state_impl() { if (_M_thread.joinable()) _M_thread.join(); }
private:
typedef __future_base::_Ptr<_Result<_Res>> _Ptr_type;