A thread pool is, in essence, a hack to avoid the cost of spinning up new threads, so that you can offload tasks of almost any size into a separate thread. A thread pool can be a really simple way to do asynchronous programming, if designed in such a way.

Scope

In this article, I walk through building a thread pool in C++17. Of course, this is an exercise in yak shaving / reinventing the wheel, and for any serious application, a library like boost’s thread_pool, or even just asio, should be used. That said, the resulting code is usable, and I use a similar implementation, optimized for my use-case, in one of my projects.

The goal is to create a thread pool which you can give a function to, and for that function to be run whenever a thread is free.

Project Setup

The project uses C++17, CMake, and is hosted on GitHub under CC0–1.0 (“Creative Commons Zero”). The code for the entire thread pool lives in one class, called ThreadPool.

Designing the Interface

The first step is drafting what the interface should look like. So, without having written a single line of our thread pool, let’s just start using it as if we did!

#include "threadpool.h"
#include <iostream>

int main() {
    ThreadPool pool { 8 };

    pool.add_task([] {
        std::cout << "Hello, ThreadPool!" << std::endl;
    });
}

Sketching this out, we can already note some requirements;

  • A way to tell the thread pool that we’d like N threads. We want to make sure we can control the resources the thread pool uses.
  • A way to add tasks. In this case, we stick to std::function as the data type for a task. This could be solved with inheritance and some base object, a bit of overloading of operator(), but this would blow the article out of scope. Lambdas and std::function have everything we need.
  • The program may terminate before our task is done, so we need to execute all pending tasks before terminating.

Our interface now looks like this:

class ThreadPool {
public:
    using Task = std::function<void()>;

    /// constructs the threadpool with n_threads threads
    explicit ThreadPool(size_t n_threads);
    /// blocks the destruction until all tasks are finished
    ~ThreadPool();

    /// adds a task to be executed
    void add_task(Task task);
};

Notice how we use a using declaration to “fake” having a Task class. In a real implementation, we could return some sort of “future” from the add_task method. This is left as an exercise to the reader.

Pooling Threads

To actually have threads, or “workers”, for our task, we need to be able to store N of them, and send tasks to them without re-constructing each thread for each task.

So we simply run N threads, store them in a vector, and run a function in each of them which takes tasks out of a queue whenever one is available. For now, we need <vector>, <queue> (yes std::queue) and <thread>.

In the private members of our thread pool we add:

private:
    std::queue<Task> m_tasks;
    std::vector<std::thread> m_threads;

Those m_tasks are accessed by separate threads at “random” times, so we need to protect it with a std::mutex.

private:
    std::queue<Task> m_tasks;
    std::mutex m_tasks_mtx;
    std::condition_variable m_tasks_cnd;
    std::vector<std::thread> m_threads;

If you’re not familiar with a mutex; it’s an object that can be lock()-ed and unlock()-ed in a thread-safe way. This can be used to ensure that a resource is only used while a mutex is locked, by simply agreeing that we lock the mutex before accessing m_tasks. There is no inherent quality to the mutex that lets it “lock” another resource, other than our willingness to always lock it before we use that resource, and unlock it after. It’s basically a verbal contract with ourselves.

A condition_variable is a concept which lets us avoid polling for new tasks.

Consider this: We have 8 threads all waiting for a new task, and if we get a new task in our queue, we would like to handle it pretty much immediately. The most primitive way to keep checking for a new task is to have each thread check m_tasks.empty() every 10 milliseconds or so. This, however, wastes a lot of time and energy, and is not even that fast, since we may have tasks with sub-10-milliseconds runtime. This is called polling.

To avoid polling and get even better “response-times” from our thread pool, we use a condition variable. Embedded engineers can think about this as an interrupt, and our tasks as our ISRs. Threads are put to sleep by wait()-ing on a condition variable. Then, once a magic notify() function is called on the condition variable, one thread which is waiting/sleeping on that condition variable, is woken up. While asleep, a thread has practically no CPU time, and the wake-up is pretty quick. There is a bit more to this, which we discuss at the end. For now, a condition variable can be .wait()-ed on, and one or more waiting threads can be woken up with .notify(). There’s one more part to this — a condition variable also gives us a lock to a mutex when woken up, which helps us immensely.

Each of our threads are now started with a function called thread_main, and with that, we have pooled some threads together!

private:
    void thread_main();

    std::queue<Task> m_tasks;
    std::mutex m_tasks_mtx;
    std::condition_variable m_tasks_cnd;
    std::vector<std::thread> m_threads;
ThreadPool::ThreadPool(size_t n_threads) {
    m_threads.reserve(n_threads);
    for (size_t i = 0; i < n_threads; ++i) {
        m_threads.emplace_back(&ThreadPool::thread_main, this);
    }
}

void ThreadPool::thread_main() {
}

ThreadPool::~ThreadPool() {
    for (auto& thread : m_threads) {
        if (thread.joinable()) {
            thread.join();
        }
    }
}

In the constructor, we now create N threads, start them all with the thread_main as their entry function, and join them all in the destructor. RAII!

So far, so good!

Task Queue

As we already discussed, our task queue is be accessed by multiple threads, so we need to protect access to it. Further, every time we’ve enqueued a new Task, we want to wake up one thread.

To protect access, we use a mutex, like so:

m_tasks_mtx.lock();
m_tasks.emplace(std::move(task));
m_tasks_mtx.unlock();

But, because it’s C++, we can do this with RAII and not worry about the unlock():

void ThreadPool::add_task(Task task) {
    std::unique_lock lock(m_tasks_mtx);
    m_tasks.emplace(std::move(task));
}

The mutex is locked in the constructor of std::unique_lock, and unlocked in the destructor (at the }).

Same exercise in the thread_main — but in this case we also need to keep checking the queue. Let’s do polling first, and then we add the condition variable magic from before! We first add:

bool m_shutdown { false };

Which lets each thread keep looking for tasks until we get the signal to shut down. Note how it’s set to false, and, at no point, we ever set it to true. This is deliberate for now, as we want the destructor to block indefinitely for now.

void ThreadPool::thread_main() {
    while (!m_shutdown) {
        // if we have tasks, we want to move the
        // first task out, remove it from the queue,
        // and then execute it
        std::unique_lock lock(m_tasks_mtx);
        if (!m_tasks.empty()) {
            Task task = std::move(m_tasks.front());
            m_tasks.pop();
            lock.unlock();
            task();
        }
    }
}

You see a few curiosities in here (other than the fact we’re polling here); in line 10, we unlock the lock before RAII gets to do it. This is a deliberate decision, and the unique_lock behaves properly, checking for a lock before unlocking in the destructor. We unlock, since we’re done dealing with the m_tasks queue at that point. We also really don’t want to keep a lock for however long our task takes! It may take multiple seconds, and if we held the lock during that time, all other threads would be waiting for this lock to unlock the whole time! This comes with the benefit that each task can also call add_task() without causing a deadlock.

At this point, we can actually run our example from earlier, which is now valid code:

#include "threadpool.h"
#include <iostream>

int main() {
    ThreadPool pool { 8 };

    pool.add_task([] {
        std::cout << "Hello, ThreadPool!" << std::endl;
    });
}

The output is, as expected, Hello, ThreadPool!, followed by an infinite loop (since we don’t change m_shutdown).

We can even dare to do:

    while (true) {
        pool.add_task([] {
            std::cout << "Hello, ThreadPool!" << std::endl;
        });
    }

We only do this to demonstrate that we can push and pop tasks into/from the task queue, and they are being handled. You can instead print out the std::this_thread::get_id() to verify that there are different threads handling them, too!

Optimization

You’re free to benchmark this, but we can already measure that the “idle” threads are very busy spinning, polling.

To optimize the CPU time taken to wait for tasks, we finally use our condition variable!

void ThreadPool::add_task(Task task) {
    std::unique_lock lock(m_tasks_mtx);
    m_tasks.emplace(std::move(task));
    m_tasks_cnd.notify_one();
}

Notice we only add line 4, the condition variable notify_one(), here. This wakes up one of all the waiting threads. Now in thread_main:

void ThreadPool::thread_main() {
    while (!m_shutdown) {
        // if we have tasks, we want to move the
        // first task out, remove it from the queue,
        // and then execute it
        std::unique_lock lock(m_tasks_mtx);
        // with the lock, we can now wait on it with our condition variable
        m_tasks_cnd.wait(lock);
        if (!m_tasks.empty()) {
            Task task = std::move(m_tasks.front());
            m_tasks.pop();
            lock.unlock();
            task();
        }
    }
}

We add line 8, in which we give the condition variable a locked lock on the task queue’s mutex. This is an important mechanism, as we don’t just wake up when a notification happens, we also grab a lock to the mutex. This mechanism ensures that, while the thread is sleeping on .wait(), the lock is unlocked, but gets locked as soon as the thread is woken up. Once we arrive at line 9, we are woken up (either by notify() or a random spurious wake up), we hold the lock, and can safely check for new elements in the queue, etc.

Destructor

For the destructor, we need to consider which behavior we want. In your case, you may want tasks to be discarded, for example, but in our case we want to just handle all tasks before destruction.

ThreadPool::~ThreadPool() {
    std::unique_lock lock(m_tasks_mtx);
    while (!m_tasks.empty()) {
        lock.unlock();
        m_tasks_cnd.notify_all();
        lock.lock();
    }
    lock.unlock();
    for (auto& thread : m_threads) {
        if (thread.joinable()) {
            thread.join();
        }
    }
}

This is a very primitive implementation, and there are definitely better ways. For now, we leave the task of improving this to the reader, as it’s secondary to the main issues we have to deal with.


As it is, this code still works fine, however, we have an issue which arises from our condition variable earlier:

While a condition variable may be woken up by the kernel at any random time, and also when we add a new task, there’s a situation we didn’t consider!

Bugs

Consider this:

  • All threads are busy executing task()
  • We add a new task via add_task, and by the end of it, we call .notify_one() like usual. By this time, all threads are still busy (not waiting)

Now, once all the threads finish their work and go back to wait()-ing, we end up with a task in the queue and no thread woken up. This is because notify_one() just silently fails if there is no thread wait()-ing on this condition variable.

There are multiple possible solutions, and you may pick one (or multiple together) based on your use-case.

Solution 1: Optional Polling

If your thread pool is going to be busy usually anyways, you may want to have a polling mechanism next to the condition variable wait. This works as follows:

        std::unique_lock lock(m_tasks_mtx);
        m_tasks_cnd.wait_for(lock, std::chrono::milliseconds(10));
        if (!m_tasks.empty()) {

Here, in line 2, we use wait_for instead of wait. This lets us pass a time interval after which the thread will be woken up regardless of notifications. If you know how busy your thread pool usually is, and how time-critical your tasks are, you can come up with a good timing for this.

Solution 2: While tasks are in queue

In this solution we use the fact that we already have awake threads when we have this problem, by simply changing our if to a while and adding some logic:

void ThreadPool::thread_main() {
    while (!m_shutdown) {
        // if we have tasks, we want to move the
        // first task out, remove it from the queue,
        // and then execute it
        std::unique_lock lock(m_tasks_mtx);
        // with the lock, we can now wait on it with our condition variable
        m_tasks_cnd.wait(lock);
        while (!m_tasks.empty()) {
            Task task = std::move(m_tasks.front());
            m_tasks.pop();
            lock.unlock();
            task();
            if (lock.try_lock()) {
                continue;
            } else {
                break;
            }
        }
    }
}

See in line 9, we replaced the if with a while, and at the end (line 14) we try to lock the lock. If it fails, we just break out of the loop and don’t bother any more. If it succeeds, we have a lock, so we continue the loop, check for the condition, etc.

It’s important to note that, while this helps, it doesn’t fully solve the problem, as the try_lock can fail in all cases. Then we’re left with the original problem. Combining this with solution 1 with a larger timeout would likely be pretty great!

Solution 3: Spinning hard

In this solution we do solution 2, but instead of try_lock(), we simply lock().

void ThreadPool::thread_main() {
    while (!m_shutdown) {
        // if we have tasks, we want to move the
        // first task out, remove it from the queue,
        // and then execute it
        std::unique_lock lock(m_tasks_mtx);
        // with the lock, we can now wait on it with our condition variable
        m_tasks_cnd.wait(lock);
        while (!m_tasks.empty()) {
            Task task = std::move(m_tasks.front());
            m_tasks.pop();
            lock.unlock();
            task();
            lock.lock();
        }
    }
}

So, in line 14, we lock the mutex, and simply run the queue until we have no more tasks. This is the simplest solution and, probably, the best in most cases. We still need to use solution 1 here, though, because there is a brief moment between the end of the while loop and the next wait().

In the end, we likely can’t get around some kind of polling mechanism as a fallback either way, unless we get really fancy (lock-free, etc.).

Conclusion

There is still a lot we could do, but this thread pool works as designed for the scope of this article, and is cheaper to use than to spawn a new thread per task. This solves the issue which thread pools are supposed to address. Arguably, the context switches to handle mutexes and condition variables are quite expensive still, and could be optimized out for the most part, but at that point maybe you should just use an existing, fully tested library.

I’m already working on another article, in which I attempt to walk through some optimizations, using some C++20 features, and more! Stay tuned for that!

The full source code of the resulting project can be found on GitHub: lionkor/threadpool, under CC0–1.0 (“Creative Commons Zero”).

Exercises for the Reader

The following are ideas for the reader to implement, either in their own project, or as a contribution to the GitHub linked above:

  • Ensure n_threads isn’t too low or too high, maybe even set it to the amount of hardware threads by default.
  • Add the ability to pass an argument (or multiple) to each task’s invocation.
  • Add a “future”-like object which gets returned from add_task and lets you query status, result, etc.
  • Add a way to ask the thread pool “how busy are you?”.
  • Scale the thread pool up and down automatically based on demand.
  • Use a priority queue instead.

Still hungry for more? Read and implement ideas from “Resource efficient Thread Pools with Zig” written by Zig’s kprotty.