simple thread pool implementation using the C11 thread support library.
mule_init(mule, nthreads, kernel, userdata)
to initialize the queuemule_start(mule)
to start threadsmule_stop(mule)
to stop threadsmule_submit(mule,n)
to queue workmule_sync(mule)
to quench the queuemule_reset(mule)
to clear counters
mule is a simple thread worker pool that dispatches one dimension of work
indices to a kernel function, either in batch or incrementally with multiple
submissions, to be run by a pool of threads. mule uses three counters:
queued
, processing
, and processed
. Kernel execution is ordered with
respect to the counters. It is up to the caller to provide array storage for
input and output.
/* workitem_idx = processing + 1, updated conditionally using compare-and-swap */
atomic_thread_fence(__ATOMIC_ACQUIRE);
(mule->kernel)(userdata, thread_idx, workitem_idx);
atomic_thread_fence(__ATOMIC_RELEASE);
/* processed = processed + 1, updated unconditionally with fetch-add */
mule attempts to be lock-free in the common case, that is the main thread
can submit work without taking a mutex and the worker threads can accept new
work without taking a mutex, with the design goal that expensive locking and
unlocking of the mutex after each work-item can be avoided. mule achieves
this by using lock-free atomic operations on counters using primitives from
<stdatomic.h>
.
Alas POSIX/C condition variables have a well-known design flaw called
"the lost wakeup problem".
POSIX/C condition variables do not support "edge-triggered-events", instead
they require a thread to be waiting at the time an event is signaled to
detect it otherwise an event from a call to cnd_signal
may be lost if
cnd_wait
is called too late. The issue can be solved by operating on the
counters within a critical section guarded by the queue mutex, however, the
goal is to avoid locking and unlocking a mutex which may add up to tens of
microseconds between each work item.
A "lost wakeup" can occur in mule while attempting to cnd_signal
the
queue-complete edge condition in the worker processing the last item to
the dispatcher within cnd_wait
in mule_sync
. The code tries to do this
precisely but the problem occurs between checking the queue-complete
condition and sleeping, whereby one can miss a state change if pre-empted
between checking the condition (processed < queued) and calling cnd_wait
thereby causing a deadlock if timeouts were not used.
This design flaw in POSIX/C condition variables is remedied by "futexes"
which can recheck the condition in the kernel while interrupts are disabled
and atomically sleep if the condition still holds, but "futexes" are not
portable to other operating systems. mule instead tries to make the race
condition as narrow as possible, immediately waiting after checking the
condition and using cnd_timedwait
so that if a wakeup is missed, the
dispatcher thread will retry in a loop testing the condition again after 1ms.
The race condition does not appear in practice but will appear if the
cnd_timedwait
is changed to cnd_wait
and a yield or print statement is
inserted between evaluating the condition (processed < queued)
and calling
cnd_wait
, and would occasionally cause a deadlock without the timeout.
see mule_thread
:
/* signal dispatcher precisely when the last item is processed */
if (processed + 1 == queued) {
cnd_signal(&mule->wake_dispatcher);
}
and mule_sync
:
/* wait for queue to quench */
if (processed < queued) {
cnd_timedwait(&mule->wake_dispatcher, &mule->mutex, &abstime);
}
These definitions give a summary of the mule data structure:
typedef void(*mu_work_fn)(void *arg, size_t thr_idx, size_t item_idx);
enum {
mule_max_threads = 8,
/*
* condition revalidation timeouts - time between revalidation of the
* work available condition for worker threads is 10 ms (100Hz).
* if workers are busy they will only perform an atomic increment,
* dispatching thread has a shorter timeout in mule_sync. timeouts are
* only necessary if thread is pre-empted before calling cnd_timedwait.
*/
mule_revalidate_work_available_ns = 10000000, /* 10 milliseconds */
mule_revalidate_queue_complete_ns = 1000000, /* 1 millisecond */
};
struct mu_thread { mu_pool *mule; size_t idx; thrd_t thread; };
struct mu_pool
{
mtx_t mutex;
cnd_t wake_dispatcher;
cnd_t wake_worker;
void* userdata;
mu_work_fn kernel;
size_t num_threads;
_Atomic(size_t) running;
_Atomic(size_t) threads_running;
mu_thread threads[mule_max_threads];
ALIGNED(64) _Atomic(size_t) queued;
ALIGNED(64) _Atomic(size_t) processing;
ALIGNED(64) _Atomic(size_t) processed;
};
The following is a brief description of the mule api:
Initialize mu_pool, set number of threads, kernel function and userdata pointer.
the kernel
function takes three arguments: void *userdata
— pointer passed
to mule_init
, size_t thr_idx
— the thread index (0 ... nthreads)
and item_idx
— the workitem index (0 ... nqueued) which is added to with
the count
argument of mule_submit
.
typedef void(*mu_work_fn)(void *arg, size_t thr_idx, size_t item_idx);
Start threads and process workitems. mule_start
can be called either before
or after mule_submit
.
Add count
to the queued limit of workitems. successive calls to mule_submit
will atomically add to count
and notify worker threads that there is work.
Wait for worker threads to complete all outstanding workitems in the queue.
Synchronizes on the queue then resets all counters to zero.
Shuts down threads. the user can start them again with mule_start
.
Shuts down threads then frees resources (mutexes and condition variables).
The following example launches two threads with eight workitems.
#include <assert.h>
#include <stdatomic.h>
#include "mule.h"
_Atomic(size_t) counter = 0;
void work(void *arg, size_t thr_idx, size_t item_idx)
{
atomic_fetch_add_explicit(&counter, 1, __ATOMIC_SEQ_CST);
}
int main(int argc, const char **argv)
{
mu_pool mule;
mule_init(&mule, 2, work, NULL);
mule_submit(&mule, 8);
mule_start(&mule);
mule_sync(&mule);
mule_stop(&mule);
mule_destroy(&mule);
assert(atomic_load(&counter) == 8);
}
Tested with Clang and GCC on Linux (Ubuntu 20.04 LTS).
cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -B build -G Ninja
cmake --build build -- --verbose
Run with build/test_mule -v
to enable verbose debug messages:
mule_submit: queue-start
mule_start: starting-threads
mule_sync: quench-queue
mule_thread-0: worker-started
mule_thread-1: worker-started
arg=(nil) thr_idx=1 item_idx=2
arg=(nil) thr_idx=1 item_idx=3
arg=(nil) thr_idx=1 item_idx=4
arg=(nil) thr_idx=1 item_idx=5
arg=(nil) thr_idx=1 item_idx=6
arg=(nil) thr_idx=0 item_idx=1
arg=(nil) thr_idx=0 item_idx=8
arg=(nil) thr_idx=1 item_idx=7
mule_sync: queue-complete
mule_stop: stopping-threads
mule_thread-1: worker-exiting
mule_thread-0: worker-exiting
mule source code is released under an ISC License.