一、引言
std::thread
是 C++11 引入的线程管理类,用于创建和管理多线程程序,可以跨平台使用。使用该类需要在头文件引入 #include <thread>
并且在 Linux 下编译时需要链接 pthread
库。
常用的函数的函数原型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <thread>
// 创建空线程
thread() noexcept;
// 创建线程并绑定可以调用对象及参数
template <class Fn, class... Args>
explicit thread(Fn&& fn, Args&&... args);
// 转移线程所有权
thread(thread&& x) noexcept;
thread& operator=(thread&& other) noexcept;
// 不支持复制线程
thread(const thread&) = delete;
// 收束线程
void join();
// 分离线程
void detach();
// 交换两个线程对象的句柄
void swap(thread& other) noexcept;
// 获取线程 ID
id get_id() const noexcept;
// 检查线程是否可以收束
bool joinable() const noexcept;
// 返回平台相关的原生线程句柄,Linux 返回 pthread_t,Windows 返回 HANDEL
native_handle_type native_handle();
// 返回当前硬件支持的并发线程数,通常是 CPU 核心数
static unsigned int hardware_concurrency() noexcept;
二、简单使用
std::thread
使用起来很简单,大部分功能都已经被封装好。std::thread本身不支持直接获取返回值,获取返回值通常使用 std::future
+ std::promise
方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <iostream>
#include <thread>
#include <future>
#include <mutex>
#include <queue>
#define SIZE 10
using namespace std;
mutex mtx;
typedef struct {
thread::id id;
int get_time;
} data, *pdata;
int cur_time = 0;
void thread_task(promise<pdata> prom, int flag) { // 按值传递promise
if (flag % 2 == 1) {
cout << "线程分离" << endl;
} else {
pdata res = new data();
{
lock_guard<mutex> lock(mtx); // RAII管理锁,离开作用域自动释放锁
res->get_time = cur_time++;
}
res->id = this_thread::get_id();
prom.set_value(res);
}
}
int main() {
queue<thread> threads;
vector<future<pdata>> futures;
for (int i = 0; i < SIZE; ++i) {
promise<pdata> prom;
auto fut = prom.get_future();
thread t(thread_task, std::move(prom), i);
if (i % 2 == 0) {
threads.push(std::move(t)); // 移动线程到队列
futures.push_back(std::move(fut));
} else {
t.detach(); // 分离奇数线程
}
}
// 处理结果
for (auto& fut : futures) {
pdata res = fut.get();
cout << res->id << ' ' << res->get_time << endl;
delete res;
}
// 等待所有未分离线程
while (!threads.empty()) {
threads.front().join();
threads.pop();
}
return 0;
}
三、线程池
简单的线程池可以直接套用线程池模板。
大概原理就是:
首先创建一个空的任务队列,该队列中存放可调用对象,然后创建多个线程进行死循环重复一个业务,这个业务调用条件变量阻塞等待任务队列中出现新的任务,如果出现新的任务就调用任务对象。
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#include <type_traits>
class ThreadPool {
public:
// 构造函数,初始化指定数量的工作线程,explicit:禁止隐式类型转换
explicit ThreadPool(size_t threads)
: stop(false)
{
for (size_t i = 0; i < threads; ++i) {
workers.emplace_back([this] {
while(true) {
std::function<void()> task;
{ // 获取任务时加锁
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this] {
return this->stop || !this->tasks.empty();
});
// 若线程池已停止且任务队列为空,则退出线程函数
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
// 执行任务
task();
}
});
}
}
// 添加任务到线程池,返回一个 std::future 对象用于获取任务结果
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
// 封装任务
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// 如果线程池已关闭,则抛出异常
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
// 将任务包装为无参函数加入任务队列
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one(); // 通知一个工作线程
return res;
}
// 析构函数,等待所有线程结束
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all(); // 唤醒所有线程
for (std::thread &worker : workers)
worker.join(); // 等待线程结束
}
private:
// 工作线程容器
std::vector<std::thread> workers;
// 任务队列,每个任务均封装为 std::function<void()>
std::queue<std::function<void()>> tasks;
// 任务队列的互斥锁和条件变量
std::mutex queue_mutex;
std::condition_variable condition;
// 标识线程池是否停止
bool stop;
};
使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>
int main() {
ThreadPool pool(4); // 创建4个线程的线程池
// 提交多个任务
std::vector<std::future<int>> results;
for (int i = 0; i < 8; ++i) {
results.emplace_back(
pool.enqueue([i] {
std::this_thread::sleep_for(std::chrono::seconds(1));
return i * i;
})
);
}
// 获取结果
for (auto& result : results) {
std::cout << result.get() << std::endl;
}
// 提交带异常的任务
auto future = pool.enqueue([] {
throw std::runtime_error("Intentional error");
return 42;
});
try {
future.get();
} catch (const std::exception& e) {
std::cerr << "Caught exception: " << e.what() << std::endl;
}
return 0;
}