C++ 标准库 thread 类的用法

Posted by KalosAner on March 19, 2025

一、引言

std::thread 是 C++11 引入的线程管理类,用于创建和管理多线程程序,可以跨平台使用。使用该类需要在头文件引入 #include <thread> 并且在 Linux 下编译时需要链接 pthread 库。

常用的函数的函数原型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <thread>

// 创建空线程
thread() noexcept;
// 创建线程并绑定可以调用对象及参数
template <class Fn, class... Args>
explicit thread(Fn&& fn, Args&&... args);
// 转移线程所有权
thread(thread&& x) noexcept;
thread& operator=(thread&& other) noexcept;
// 不支持复制线程
thread(const thread&) = delete;
// 收束线程
void join();
// 分离线程
void detach();
// 交换两个线程对象的句柄
void swap(thread& other) noexcept;
// 获取线程 ID
id get_id() const noexcept;
// 检查线程是否可以收束
bool joinable() const noexcept;
// 返回平台相关的原生线程句柄,Linux 返回 pthread_t,Windows 返回 HANDEL
native_handle_type native_handle();
// 返回当前硬件支持的并发线程数,通常是 CPU 核心数
static unsigned int hardware_concurrency() noexcept;

二、简单使用

std::thread 使用起来很简单,大部分功能都已经被封装好。std::thread本身不支持直接获取返回值,获取返回值通常使用 std::future + std::promise方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <iostream>
#include <thread>
#include <future>
#include <mutex>
#include <queue>

#define SIZE 10

using namespace std;

mutex mtx;

typedef struct {
    thread::id id;
    int get_time;
} data, *pdata;

int cur_time = 0;

void thread_task(promise<pdata> prom, int flag) {  // 按值传递promise
    if (flag % 2 == 1) {
        cout << "线程分离" << endl;
    } else {
        pdata res = new data();
        {
            lock_guard<mutex> lock(mtx);  // RAII管理锁,离开作用域自动释放锁
            res->get_time = cur_time++;
        }
        res->id = this_thread::get_id();
        prom.set_value(res);
    }
}

int main() {
    queue<thread> threads;
    vector<future<pdata>> futures;

    for (int i = 0; i < SIZE; ++i) {
        promise<pdata> prom;
        auto fut = prom.get_future();
        thread t(thread_task, std::move(prom), i);

        if (i % 2 == 0) {
            threads.push(std::move(t));  // 移动线程到队列
            futures.push_back(std::move(fut));
        } else {
            t.detach();  // 分离奇数线程
        }
    }

    // 处理结果
    for (auto& fut : futures) {
        pdata res = fut.get();
        cout << res->id << ' ' << res->get_time << endl;
        delete res;
    }

    // 等待所有未分离线程
    while (!threads.empty()) {
        threads.front().join();
        threads.pop();
    }

    return 0;
}

三、线程池

简单的线程池可以直接套用线程池模板。

大概原理就是:

首先创建一个空的任务队列,该队列中存放可调用对象,然后创建多个线程进行死循环重复一个业务,这个业务调用条件变量阻塞等待任务队列中出现新的任务,如果出现新的任务就调用任务对象。

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#include <type_traits>

class ThreadPool {
public:
    // 构造函数,初始化指定数量的工作线程,explicit:禁止隐式类型转换
    explicit ThreadPool(size_t threads)
        : stop(false)
    {
        for (size_t i = 0; i < threads; ++i) {
            workers.emplace_back([this] {
                while(true) {
                    std::function<void()> task;
                    {   // 获取任务时加锁
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock, [this] {
                            return this->stop || !this->tasks.empty();
                        });
                        // 若线程池已停止且任务队列为空,则退出线程函数
                        if (this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    // 执行任务
                    task();
                }
            });
        }
    }

    // 添加任务到线程池,返回一个 std::future 对象用于获取任务结果
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args)
        -> std::future<typename std::result_of<F(Args...)>::type>
    {
        using return_type = typename std::result_of<F(Args...)>::type;
        
        // 封装任务
        auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            // 如果线程池已关闭,则抛出异常
            if (stop)
                throw std::runtime_error("enqueue on stopped ThreadPool");
            // 将任务包装为无参函数加入任务队列
            tasks.emplace([task]() { (*task)(); });
        }
        condition.notify_one();  // 通知一个工作线程
        return res;
    }

    // 析构函数,等待所有线程结束
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();  // 唤醒所有线程
        for (std::thread &worker : workers)
            worker.join();  // 等待线程结束
    }

private:
    // 工作线程容器
    std::vector<std::thread> workers;
    // 任务队列,每个任务均封装为 std::function<void()>
    std::queue<std::function<void()>> tasks;
    
    // 任务队列的互斥锁和条件变量
    std::mutex queue_mutex;
    std::condition_variable condition;
    // 标识线程池是否停止
    bool stop;
};

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>

int main() {
    ThreadPool pool(4);  // 创建4个线程的线程池

    // 提交多个任务
    std::vector<std::future<int>> results;
    for (int i = 0; i < 8; ++i) {
        results.emplace_back(
            pool.enqueue([i] {
                std::this_thread::sleep_for(std::chrono::seconds(1));
                return i * i;
            })
        );
    }

    // 获取结果
    for (auto& result : results) {
        std::cout << result.get() << std::endl;
    }

    // 提交带异常的任务
    auto future = pool.enqueue([] {
        throw std::runtime_error("Intentional error");
        return 42;
    });

    try {
        future.get();
    } catch (const std::exception& e) {
        std::cerr << "Caught exception: " << e.what() << std::endl;
    }

    return 0;
}