Certain idioms that are commonly used in concurrent programming are missing
from the standard libraries. Although many of these these can be relatively
straightforward to implement, we believe it is more efficient to have a
standard version.
In addition, although some idioms can be provided using
mutexes, higher performance can often be obtained with atomic operations and
lock-free algorithms. However, these algorithms are more complex to write, and
are prone to error.
Other standard concurrency idioms may have difficult corner cases, and can be
hard to implement correctly. For these reasons, we believe that it is
valuable to provide these in the standard library.
We propose a set of commonly-used concurrency classes, some of which may be
implemented using efficient lock-free algorithms where appropriate. This paper
describes the
latch
and
barrier
classes.
Latches are a thread co-ordination mechanism that allow one or more threads to
block until an operation is completed. An individual latch is a single-use
object; once the operation has been completed, it cannot be re-used.
Barriers are a thread co-ordination mechanism that allow multiple threads to
block until an operation is completed. Unlike a latch, a barrier is re-usable;
once the operation has been completed, the threads can re-use the same
barrier. It is thus useful for managing repeated tasks handled by multiple
threads.
A reference implementation of these two classes has been written.
A latch maintains an internal counter that is initialized when the latch is
created. One or more threads may block waiting until the counter is decremented
to 0.
constructor( size_t );
The parameter is the initial value of the internal counter.
destructor( );
Destroys the latch. If the latch is destroyed while other threads are in
wait()
, or are invoking
count_down()
, the behaviour
is undefined.
void count_down( );
Decrements the internal count by 1, and returns. If the count reaches 0, any
threads blocked in
wait()
will be released.
Throws
std::logic_error
if the internal count is already 0.
void wait( );
Blocks the calling thread until the internal count is decremented to 0 by one
or more other threads calling
count_down()
. If the count is
already 0, this is a no-op.
bool try_wait( );
Returns true if the internal count has been decremented to 0 by one
or more other threads calling
count_down()
, and false otherwise.
Does not block the calling thread.
void count_down_and_wait( );
Decrements the internal count by 1. If the resulting count is not 0, blocks the
calling thread until the internal count is decremented to 0 by one or more
other threads calling
count_down()
.
Memory Ordering
All calls to
count_down()
synchronize with any thread
returning from
wait()
. All calls to
count_down()
synchronize with any thread that gets a true value from
try_wait()
.
A barrier maintains an internal thread counter that is initialized when the
barrier is created. Threads may decrement the counter and then block waiting
until the specified number of threads are blocked. All threads will then be
woken up, and the barrier will reset. In addition, there is a mechanism to
change the thread count value after the count reaches 0.
constructor( size_t );
The parameter is the initial value of the internal thread counter.
Throws
std::invalid_argument
if the specified count is 0.
constructor( size_t, std::function<void()> );
The parameters are the initial value of the internal thread counter, and a
function that will be invoked when the counter reaches 0.
Throws
std::invalid_argument
if the specified count is 0.
destructor( );
Destroys the barrier. If the barrier is destroyed while other threads are in
count_down_and_wait()
, the behaviour is undefined.
void count_down_and_wait( );
Decrements the internal thread count by 1. If the resulting count is not 0,
blocks the calling thread until the internal count is decremented to 0 by one
or more other threads calling
count_down_and_wait()
.
Before any threads are released, the completion function registered in the
constructor will be invoked (if specified and non-NULL). Note that the
completion function may be invoked in the context of one of the blocked
threads. When the completion function returns, the internal thread count will
be reset to its initial value, and all blocked threads will be unblocked.
Note that it is safe for a thread to re-enter
count_down_and_wait()
immediately. It is not necessary to ensure that all blocked threads have exited
count_down_and_wait()
before one thread re-enters it.
reset( size_t );
Resets the barrier with a new value for the initial thread count. This method may
only be invoked when there are no other threads currently inside the
count_down_and_wait()
method. It may also be invoked from within
the registered completion function.
Once
reset()
is called, the barrier will automatically reset
itself to the new thread count as soon as the internal count reaches 0 and all
blocked threads are released.
reset( std::function<void()> );
Resets the barrier with the new completion function. The next time the internal
thread count reaches 0, this function will be invoked. This method may only be
invoked when there are no other threads currently inside the
count_down_and_wait()
method. It may also be invoked from within
the registered completion function.
If NULL is passed in, no function will be invoked when the count reaches 0.
Note that the barrier does not have separate
count_down()
and
wait()
methods. Every thread that counts down will then block until
all threads have counted down. Hence only the
count_down_and_wait()
method is supported.
Memory Ordering
For threads X and Y that call
count_down_and_wait()
, the
call to
count_down_and_wait()
in X synchronizes with the return from
count_down_and_wait()
in Y.
Sample use cases for the latch include:
Setting multiple threads to perform a task, and then waiting until all threads
have reached a common point.
Creating multiple threads, which wait for a signal before advancing beyond a
common point.
An example of the first use case would be as follows:
void DoWork(threadpool* pool) {
latch completion_latch(NTASKS);
for (int i = 0; i < NTASKS; ++i) {
pool->add_task([&] {
// perform work
completion_latch.count_down();
// Block until work is done
completion_latch.wait();
An example of the second use case is shown below. We need to load data and then
process it using a number of threads. Loading the data is I/O bound, whereas
starting threads and creating data structures is CPU bound. By running these in
parallel, throughput can be increased.
void DoWork() {
latch start_latch(1);
vector<thread*> workers;
for (int i = 0; i < NTHREADS; ++i) {
workers.push_back(new thread([&] {
// Initialize data structures. This is CPU bound.
start_latch.wait();
// perform work
// Load input data. This is I/O bound.
// Threads can now start processing
start_latch.count_down();
The barrier can be used to co-ordinate a set of threads carrying out a repeated
task. The number of threads can be adjusted dynamically to respond to changing
requirements.
In the example below, a number of threads are performing a multi-stage
task. Some tasks may require fewer steps thanothers, meaning that some threads
may finish before others. We reduce the number of threads waiting on the
barrier when this happens.
size_t initial_threads;
atomic<size_t> current_threads(initial_threads)
vector<thread*> workers;
// Create a barrier, and set a lambda that will be invoked every time the
// barrier counts down. If one or more active threads have completed,
// reduce the number of threads.
barrier task_barrier(n_threads);
task_barrier.reset([&] {
task_barrier.reset(current_threads);
for (int i = 0; i < n_threads; ++i) {
workers.push_back(new thread([&] {
bool active = true;
while(active) {
Task task = tasks.get();
// perform task
if (finished(task)) {
current_threads--;
active = false;
task_barrier.count_down_and_wait();
// Read each stage of the task until all stages are complete.
while (!finished()) {
GetNextStage(tasks);
The synopsis is as follows.
class barrier {
public:
explicit barrier(size_t num_threads) throw (std::invalid_argument);
explicit barrier(size_t num_threads, std::function<void()> f) throw (std::invalid_argument);
~barrier();
void count_down_and_wait();
void reset(size_t num_threads);
void reset(function f);