The C++ standard library gained some new concurrency features with C++20:
- Wait and notify operations on
std::atomic<T>
- Semaphores
- Latches
- Barriers
In this article, I will cover the current implementation approach for atomic wait/notify, as these are basis operations required to implement the remaining coordination primitives introduced with C++20. Subsequent articles in this series will cover the details of the other types.
Note: The implementation presented here is considered experimental and the details will almost certainly change in a future version of GCC as we commit to an ABI for these features.
Let's start by taking a look at what the C++ standard says about atomic waiting:
1 Atomic waiting operations and atomic notifying operations provide a mechanism to wait for the value of an atomic object to change more efficiently than can be achieved with polling. An atomic waiting operation may block until it is unblocked by an atomic notifying operation, according to each function's effects.
[Note 1 : Programs are not guaranteed to observe transient atomic values, an issue known as the A-B-A problem, resulting in continued blocking if a condition is only temporarily met. — end note]
2 [Note 2 : The following functions are atomic waiting operations:
(2.1) —
atomic<T>::wait
,(2.2) —
atomic_flag::wait
,(2.3) —
atomic_wait and atomic_wait_explicit
(2.4) —
atomic_flag_wait and atomic_flag_wait_explicit
, and(2.5) —
atomic_ref::wait
. — end note]3 [Note 3 : The following functions are atomic notifying operations:
(3.1) —
atomic<T>::notify_one
and atomic<T>::notify_all,(3.2) —
atomic_flag::notify_one
and atomic_flag::notify_all,(3.3) —
atomic_notify_one and atomic_notify_all
,(3.4) —
atomic_flag_notify_one and atomic_flag_notify_all
, and(3.5) —
atomic_ref<T>::notify_one and atomic_ref<T>::notify_all
. — end note]4 A call to an atomic waiting operation on an atomic object M is eligible to be unblocked by a call to an atomic notifying operation on M if there exist side effects X and Y on M such that:
(4.1) — the atomic waiting operation has blocked after observing the result of X,
(4.2) — X precedes Y in the modification order of M, and
(4.3) — Y happens before the call to the atomic notifying operation.
How can we implement atomic waiting?
The only universal strategy for implementing atomic waiting is to spin in an atomic load-compare loop. This isn't particularly efficient if the waiter is likely to block for some extended period, but it is advantageous in terms of application responsiveness in many cases to do a bit of spinning before calling into a more expensive operating system level primitive.
libstdc++ implements its spin logic as follows:
-
If supported, this is how we let the CPU or kernel know we are able to yield or relax our spinning:
inline void __thread_yield() noexcept { #if defined _GLIBCXX_HAS_GTHREADS && defined _GLIBCXX_USE_SCHED_YIELD __gthread_yield(); #endif } inline void __thread_relax() noexcept { #if defined __i386__ || defined __x86_64__ __builtin_ia32_pause(); #else __thread_yield(); #endif }
-
We use these constants in the spin loop:
constexpr auto __atomic_spin_count_1 = 12;
constexpr auto __atomic_spin_count_2 = 4;
-
We provide for a pluggable policy in the form of a callable that is invoked as the last step in the spin algorithm. This will be used later in the implementation of timed waiting. The default policy is to exit the spin loop.
struct __default_spin_policy { bool operator()() const noexcept { return false; } };
-
The spin loop itself performs the following steps, in order:
- Spin for a few iterations evaluating the spin predicate and then performing the
relax
operation to either issue apause
instruction or yield the thread. - Spin for a few iterations and yield the thread if the spin predicate is not satisfied.
- Spin until the spin policy indicates we should stop.
The spin returns true if the predicate is satisfied.
template<typename _Pred, typename _Spin = __default_spin_policy> bool __atomic_spin(_Pred& __pred, _Spin __spin = _Spin{ }) noexcept { for (auto __i = 0; __i < __atomic_spin_count; ++__i) { if (__pred()) return true; if (__i < __atomic_spin_count_relax) __detail::__thread_relax(); else __detail::__thread_yield(); } while (__spin()) { if (__pred()) return true; } return false; }
- Spin for a few iterations evaluating the spin predicate and then performing the
A more efficient wait
On some platforms, the operating system provides an efficient waiting primitive; we should use that facility where available. For Linux, this waiting primitive is futex(2); on Darwin, it is ulock_wait/ulock_wake. If present, these facilities are typically restricted in the type on which you can wait; either a 32-bit int, in the case of a futex, or a 64-bit unsigned int in the case ulock_wait. Clearly, atomic can (and is required by the standard to) support specialization on more types than int32_t or uint64_t, but we'd certainly like to take advantage of the platform-specific wait if it's possible to do so.
libstdc++ implements the low-level details of the platform-specific waiting strategy conditionally as follows. First, we define the fundamental type we can wait efficiently on:
#ifdef _GLIBCXX_HAVE_LINUX_FUTEX
using __platform_wait_t = int;
static constexpr size_t __platform_wait_alignment = 4;
#else
using __platform_wait_t = uint64_t;
static constexpr size_t __platform_wait_alignment
= __alignof__(__platform_wait_t);
#endif
libstdc++ defines its own versions of the constants it uses to invoke the futex syscall rather than relying on the presence of <linux/futex.h>
:
#ifdef _GLIBCXX_HAVE_LINUX_FUTEX
#define _GLIBCXX_HAVE_PLATFORM_WAIT 1
enum class __futex_wait_flags : int
{
#ifdef _GLIBCXX_HAVE_LINUX_FUTEX_PRIVATE
__private_flag = 128,
#else
__private_flag = 0,
#endif
__wait = 0,
__wake = 1,
__wait_bitset = 9,
__wake_bitset = 10,
__wait_private = __wait | __private_flag,
__wake_private = __wake | __private_flag,
__wait_bitset_private = __wait_bitset | __private_flag,
__wake_bitset_private = __wake_bitset | __private_flag,
__bitset_match_any = -1
};
We invoke the futex syscall to wait as follows:
__platform_wait(const _Tp* __addr, __platform_wait_t __val) noexcept
{
auto __e = syscall (SYS_futex, static_cast<const void*>(__addr),
static_cast<int>(__futex_wait_flags::__wait_private),
__val, nullptr);
if (!__e || errno == EAGAIN)
return;
if (errno != EINTR)
__throw_system_error(errno);
}
Then we invoke the futex syscall to notify one or more waiting threads as follows:
template<typename _Tp>
void
__platform_notify(const _Tp* __addr, bool __all) noexcept
{
syscall (SYS_futex, static_cast<const void*>(__addr),
static_cast<int>(__futex_wait_flags::__wake_private),
__all ? INT_MAX : 1);
}
The current (as of GCC 12) implementation does not support ulock_wait
-based waiting. That (and presumably other similar OS facilities) can be conditionally supported in the future:
// define _GLIBCX_HAVE_PLATFORM_WAIT and implement __platform_wait()
// and __platform_notify() if there is a more efficient primitive supported
// by the platform (e.g. __ulock_wait()/__ulock_wake()) which is better than
// a mutex/condvar based wait
How to handle those types that do not fit in a __platform_wait_t
To solve this problem, we employ the timeless advice of adding another layer of indirection, of course. Just because we need to atomically modify the contents of an atomic<T>
, we aren't necessarily restricted to implementing wait and notify in terms of that type; we can carry a __platform_wait_t
that is used only for this purpose.
One approach would be to add a new __platform_wait_t
member to atomic<T>
. This would be a fairly expensive approach in terms of ballooning the size of atomic<T>
and pessimizing every use of it. We are also precluded from taking this approach because of ABI stability.
Instead, we create a side table to hold these proxy __platform_wait_t
s and hash into that table by the address of the atomic value.
Making notify cheaper
The syscall()
to notify waiting threads is fairly expensive compared to an atomic load. A simple micro-benchmark bears this out -
------------------------------------------------------------------
Benchmark Time CPU Iterations
------------------------------------------------------------------
BM_empty_notify_checked 3.81 ns 3.81 ns 183296078
BM_empty_notify_syscall 96.9 ns 96.8 ns 7124788
As this is a micro-benchmark, there are number of caveats to these numbers (and discussion on the subject is a topic for a future article). That said, the cost of blindly making a futex syscall is roughly 30 times that of the optimization outlined here.
Waiters entering the wait syscall are going to pay for an expensive operation anyway, so making them pay a bit extra in terms of an atomic increment is reasonable. Notifiers can then perform an atomic load to determine if the syscall is likely to wake any waiters.
libstdc++'s side table elements could then be defined as follows:
struct __waiter_pool
{
__platform_wait_t _M_wait = 0; // Count of waiters
__platform_wait_t _M_ver = 0; // Proxy address to use for a futex
};
What about platforms that don't provide some low-level efficient wait?
Wait and notify can be implemented in terms of a mutex and condition variable that the C++ standard library is required to provide, so libstdc++'s side table conditionally carries a mutex and condition_variable for those platforms:
struct __waiter_pool
{
#ifdef __cpp_lib_hardware_interference_size
static constexpr auto _S_align = hardware_destructive_interference_size;
#else
static constexpr auto _S_align = 64;
#endif
alignas(_S_align) __platform_wait_t _M_wait = 0;
#ifndef _GLIBCXX_HAVE_PLATFORM_WAIT
mutex _M_mtx;
#endif
alignas(_S_align) __platform_wait_t _M_ver = 0;
#ifndef _GLIBCXX_HAVE_PLATFORM_WAIT
__condvar _M_cv;
#endif
// Note entry into a wait
void
_M_enter_wait() noexcept
{ __atomic_fetch_add(&_M_wait, 1, __ATOMIC_SEQ_CST); }
// Note exit from a wait
void
_M_leave_wait() noexcept
{ __atomic_fetch_sub(&_M_wait, 1, __ATOMIC_RELEASE); }
// Hello? Is there anybody in there? Just nod if you can hear me.
bool
_M_waiting() const noexcept
{
__platform_wait_t __res;
__atomic_load(&_M_wait, &__res, __ATOMIC_SEQ_CST);
return __res > 0;
}
};
This struct
fits on two hardware cache lines (assuming 64-byte cache lines) and places the atomic counts on different lines.
The same consideration regarding pessimizing notifiers when no thread is waiting applies to platforms that fall back to the mutex/condvar implementation strategy. And the side table itself is a static instance, defined as follows:
struct __waiter_pool
{
// ...
static __waiter_pool&
_S_for(const void* __addr) noexcept
{
constexpr uintptr_t __ct = 16;
static __waiter_pool __w[__ct];
auto __key = (uintptr_t(__addr) >> 2) % __ct;
return __w[__key];
}
};
Putting together the pieces for a wait primitive
The C++ standard has the following to say about what wait()
should do:
void wait(T old, memory_order order = memory_order::seq_cst) const noexcept
;22 Preconditions: order is neither
memory_order::release
normemory_order::acq_rel
.23 Effects: Repeatedly performs the following steps, in order:
(23.1) — Evaluates
load(order)
and compares its value representation for equality against that of old.(23.2) — If they compare unequal, returns.
(23.3) — Blocks until it is unblocked by an atomic notifying operation or is unblocked spuriously.
For atomic<T>
s where T
is compatible with (and support is present for) __platform_wait()
, we would perform the following steps:
- Obtain
__waiter_pool
entry__w
for the address to be waited - Call
__w._M_enter_wait()
to signal that a waiter is available to be notified - Repeatedly perform the following steps, in order:
- Evaluate
load(order)
and compare its value representation for equality against that of__old
- If they compare unequal:
- Call
__w._M_leave_wait()
to signal that the waiter has left the wait - return
- Call
- If they compare equal, then perform the following steps in order:
- Spin for a bit, performing
load(order)
to see if value representation changes - If it doesn't change, call
__platform_wait(__addr)
, where__addr
is the address to be notified
- Spin for a bit, performing
- Call
__w._M_leave_wait()
to signal the waiter has left the wait
- Evaluate
For atomic<T>
s where T
is not compatible with __platform_wait()
but __platform_wait()
is available, we would perform the following steps:
- Obtain
__waiter_pool
entry__w
for the address to be waited - Call
__w._M_enter_wait()
to signal that a waiter is available to be notified - Repeatedly perform the following steps, in order:
- Evaluate
load(order)
and compare its value representation for equality against that of__old
- If they compare unequal:
- Call
__w._M_leave_wait()
to signal that the waiter has left the wait - return
- Call
- If they compare equal, then perform the following steps in order:
- Spin for a bit, performing
load(order)
to see if value representation changes - If it doesn't change, call
__platform_wait(&__w._M_ver)
- Spin for a bit, performing
- Evaluate
- Call
__w._M_leave_wait()
to signal that the waiter has left the wait
When no __platform_wait()
is available, we would perform the following steps:
- Obtain
__waiter_pool
entry__w
for the address to be waited - Call
__w._M_enter_wait()
to signal that a waiter is available to be notified - Repeatedly perform the following steps, in order:
- Evaluate
load(order)
and compare its value representation for equality against that of__old
- If they compare unequal:
- Call
__w._M_leave_wait()
to signal that the waiter has left the wait - return
- Call
- If they compare equal, then perform the following steps in order:
- Spin for a bit, performing relaxed load of
__w._M_ver
and checking to see if its value changes; if so, exit the spin indicating success - Otherwise, acquire
__w._M_mtx
- Atomically load
__w._M_ver
and compare its result to the value last observed during the spin loop; if they compare equal, enter__w._M_cv.wait(__w._M_mtx)
- Release
__w._M_mtx
- Spin for a bit, performing relaxed load of
- Evaluate
- Call
__w._M_leave_wait()
to signal that the waiter has left the wait
Putting together the pieces for a notify primitive
The C++ standard has the following to say about what notify_one()
and notify_all()
should do:
void notify_one() const noexcept;
25 Effects: Unblocks the execution of at least one atomic waiting operation on *ptr that is eligible to be unblocked (31.6) by this call, if any such atomic waiting operations exist. 26 Remarks: This function is an atomic notifying operation (31.6) on atomic object *ptr.
void notify_all() const noexcept;
27 Effects: Unblocks the execution of all atomic waiting operations on *ptr that are eligible to be unblocked (31.6) by this call.
For atomic<T>
s where T
is compatible with (and support is preset for) __platform_notify()
, we would perform the following steps:
- Obtain
__waiter_pool
entry__w
for the address to be notified - Check if there are any
__w._M_waiting()
, and, if so, call__platform_notify(__addr, __all)
, where:__all
indicates whether a wake one or wake all is to be performed__addr
is the address to be notified
For atomic<T>
s where T
is not compatible with __platform_notify()
but __platform_notify()
is available, we would perform the following steps:
- Obtain
__waiter_pool
entry__w
for the address to be notified - Check if there are any
__w._M_waiting
, and, if so, perform the following steps:- Perform an atomic increment on
w._M_ver
- Call
__platform_notify(&._M_ver, __all)
, where__all
indicates whether a wake one or wake all is to be performed
- Perform an atomic increment on
When no __platform_notify()
is available, we would perform the following steps:
- Obtain
__waiter_pool
entry__w
for the address to be notified - Check if there are any
__w._M_waiting
, and, if so, perform the following steps:- Perform an atomic increment on
__w._M_ver
- Call
__w._M_cv.notify_one/all()
- Perform an atomic increment on
Handling the various type and platform choices
The libstdc++ code to determine whether or not a given type T
supports __platform_wait()
is:
template<typename _Tp>
inline constexpr bool __platform_wait_uses_type
#ifdef _GLIBCXX_HAVE_PLATFORM_WAIT
= is_scalar_v<_Tp>
&& ((sizeof(_Tp) == sizeof(__detail::__platform_wait_t))
&& (alignof(_Tp*) >= __platform_wait_alignment));
#else
= false;
#endif
If the platform supports efficient notification, and if sizeof(T)
is the same as __platform_wait_t
and has the same alignment, then T
uses __platform_wait()
. In all other cases, it does not.
In all cases, we will end up doing some sort of wait against the value of a __platform_wait_t
, as follows:
struct __waiter_pool
{
// ...
void
_M_do_wait(const __platform_wait_t* __addr, __platform_wait_t __old) noexcept
{
#ifdef _GLIBCXX_HAVE_PLATFORM_WAIT
__platform_wait(__addr, __old);
#else
__platform_wait_t __val;
__atomic_load(__addr, &__val, __ATOMIC_SEQ_CST);
if (__val == __old)
{
lock_guard<mutex> __l(_M_mtx);
_M_cv.wait(_M_mtx);
}
#endif // __GLIBCXX_HAVE_PLATFORM_WAIT
}
};
And the corresponding notification is implemented as follows:
struct __waiter_pool
{
// ...
void
_M_notify(const __platform_wait_t* __addr, bool __all) noexcept
{
if (!_M_waiting())
return;
#ifdef _GLIBCXX_HAVE_PLATFORM_WAIT
__platform_notify(__addr, __all);
#else
{ lock_guard __l(_M_mtx); }
if (__all)
_M_cv.notify_all();
else
_M_cv.notify_one();
#endif
}
};
Note: the seemingly useless mutex acquisition above is significant, as it ensures that the state of the mutex is synchronized in this thread.
In each wait strategy, there are multiple points where __w._M_leave_wait()
needs to be called. An RAII wrapper type can take care of making sure this always happens at scope exit:
struct __waiter
{
__waiter_pool& _M_w;
__waiter(__waiter_pool& __w)
: _M_w(__w)
{ _M_w._M_enter_wait(); }
~__waiter()
{ _M_w._M_leave_wait(); }
};
In each wait strategy, there is always a __platform_wait_t*
that will be used for determining if a wait has been notified; the particular address is a function of __platform_wait_uses_type<>
. The __waiter
RAII type can take care of initializing itself with the correct address:
struct __waiter
{
__waiter_pool& _M_w;
__platform_wait_t* _M_addr;
template<typename _Up>
static __platform_wait_t*
_S_wait_addr(const _Up* __a, __platform_wait_t* __b)
{
if constexpr (__platform_wait_uses_type<_Up>)
return reinterpret_cast<__platform_wait_t*>(const_cast<_Up*>(__a));
else
return __b;
}
template<typename _Up>
explicit __waiter(const _Up* __addr) noexcept
: _M_w(__waiter_pool::_S_for(__addr))
, _M_addr(_S_wait_addr(__addr, &_M_w._M_ver))
{ _M_w._M_enter_wait(); }
~__waiter()
{ _M_w._M_leave_wait(); }
};
Our wait and notify primitives can now construct a __waiter
__w(__addr)
for any type and get a handle to the A type initialized with the correct address to wait and notify on. However, notify does not need to manipulate that waiter count, so we have two kinds of waiter:
template<typename _EntersWait>
struct __waiter
{
__waiter_pool& _M_w;
__platform_wait_t* _M_addr;
template<typename _Up>
static __platform_wait_t*
_S_wait_addr(const _Up* __a, __platform_wait_t* __b)
{
if constexpr (__platform_wait_uses_type<_Up>)
return reinterpret_cast<__platform_wait_t*>(const_cast<_Up*>(__a));
else
return __b;
}
template<typename _Up>
explicit __waiter(const _Up* __addr) noexcept
: _M_w(__waiter_pool::_S_for(__addr))
, _M_addr(_S_wait_addr(__addr, &_M_w._M_ver))
{
if constexpr (_EntersWait::value)
_M_w._M_enter_wait();
}
~__waiter()
{
if constexpr (_EntersWait::value)
_M_w._M_leave_wait();
}
};
using __enters_wait = __waiter<std::true_type>;
using __bare_wait = __waiter<std::false_type>;
Wait operations use the __enters_wait
type alias, and notify operations use the __bare_wait
type alias.
If we are on a platform that does not support __platform_wait
, it is necessary to communicate the last value seen during the spin loop to the wait. In libstdc++, this is implemented as follows:
struct __waiter
{
// ...
template<typename _Up, typename _ValFn,
typename _Spin = __default_spin_policy>
static bool
_S_do_spin_v(__platform_wait_t* __addr,
const _Up& __old, _ValFn __vfn,
__platform_wait_t& __val,
_Spin __spin = _Spin{ })
{
auto const __pred = [=]
{ return !__atomic_compare(__old, __vfn()); };
if constexpr (__platform_wait_uses_type<_Up>)
{ __val == __old; }
else
// if we implement wait/notify by mutex/condvar we
// the current value of _M_w._M_ver, which is the
// the address in __addr
{ __atomic_load(__addr, &__val, __ATOMIC_SEQ_CST); }
return __atomic_spin(__pred, __spin);
}
template<typename _Up, typename _ValFn,
typename _Spin = __default_spin_policy>
bool
_M_do_spin_v(const _Up& __old, _ValFn __vfn,
__platform_wait_t& __val,
_Spin __spin = _Spin{ })
{ return _S_do_spin_v(_M_addr, __old, __vfn, __val, __spin); }
};
_ValFn
is a unary callable that knows how to return the current value for the address being waited. All functions that receive one have a _v()
suffix.
The primitive wait implementation is:
struct __waiter
{
// ...
template<typename _Tp, typename _ValFn>
void
_M_do_wait_v(_Tp __old, _ValFn __vfn)
{
do
{
__platform_wait_t __val;
if (__base_type::_M_do_spin_v(__old, __vfn, __val))
return;
__base_type::_M_w._M_do_wait(__base_type::_M_addr, __val);
} while (__atomic_compare(__old, __vfn()));
}
};
The wrapper that atomic<T>::wait
calls into is:
template<typename _Tp, typename _ValFn>
void
__atomic_wait_address_v(const _Tp* __addr, _Tp __old,
_ValFn __vfn) noexcept
{
__detail::__enters_wait __w(__addr);
__w._M_do_wait_v(__old, __vfn);
}
And the corresponding primitive notify function is:
template<typename _EntersWait>
struct __waiter
{
// ...
void
_M_notify(bool __all)
{
if (_M_addr == &_M_w._M_ver)
__atomic_fetch_add(_M_addr, 1, __ATOMIC_SEQ_CST);
_M_w._M_notify(_M_addr, __all);
}
};
However, there is a subtle problem with this formulation. If we are proxying notifications for a type not supported by __platform_notify()
, or if __platform_notify()
is not available, the call _M_notify(false)
could wake the incorrect thread. So we add another member to __waiter
, _M_launder
, to capture this:
template<typename _EntersWait>
struct __waiter
{
__waiter_pool& _M_w;
__platform_wait_t* _M_addr;
// ...
template<typename _Up>
explicit __waiter(const _Up* __addr) noexcept
: _M_w(__waiter_pool::_S_for(__addr))
, _M_addr(_S_wait_addr(__addr, &_M_w._M_ver))
, _M_laundered(!__platform_wait_uses_type<_Up>)
{ ... }
bool
_M_laundered() const noexcept
{ return _M_addr == &_M_w._M_ver; }
void
_M_notify(bool __all)
{
if (_M_laundered())
{
__atomic_fetch_add(_M_addr, 1, __ATOMIC_SEQ_CST);
__all = true;
}
_M_w._M_notify(_M_addr, __all);
}
};
And the wrapper that atomic<T>::notify
calls into is:
template<typename _Tp>
void
__atomic_notify_address(const _Tp* __addr, bool __all) noexcept
{
__detail::__bare_wait __w(__addr);
__w._M_notify(__all, true);
}
Next time
C++20 introduces counting_semaphore
and binary_semaphore
(which support blocking acquire()
), non-blocking try_acquire()
, as well as timed try_acquire_for()
and try_acquire_until()
. The next article in this series will look at the implementation of counting_semaphore
and binary_semaphore
and the functionality to implement timed atomic waiting.