仿RabbitMQ实现消息队列(五)--C++异步线操作实现程池
future
介绍
std::future 是 C++11 标准库中的一个模板类,它表示一个异步操作的结果。当我们在
多线程编程中使用异步任务时,std::future 可以帮助我们在需要的时候获取任务的执行
结果。std::future 的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保
我们在获取结果时不会遇到未完成的操作
应用场景
- 异步任务:当我们需要在后台执行一些耗时操作时,如网络请求或计算密集型任
务等,std::future 可以用来表示这些异步任务的结果。通过将任务与主线程分离,我们
可以实现任务的并行处理,从而提高程序的执行效率 - 并发控制:: 在多线程编程中,我们可能需要等待某些任务完成后才能继续执行其
他操作。通过使用 std::future,我们可以实现线程之间的同步,确保任务完成后再获取
结果并继续执行后续操作 - 结果获取::std::future 提供了一种安全的方式来获取异步任务的结果。我们可以使
用 std::future::get()函数来获取任务的结果,此函数会阻塞当前线程,直到异步操作完
成。这样,在调用 get()函数时,我们可以确保已经获取到了所需的结果
用法示例
- std::async关联异步任务
async是一种将任务与future关联的简单方法。它创建并运行一个异步任务。并返回一个与该任务结果关联的future对象。默认情况下,async是否启动一个新线程,或者在等待future时,任务是否同步运行都取决于你给的参数。这个参数为launch类型:
○ std::launch::deferred 表明该函数会被延迟调用,直到在 future 上调用 get()或
者 wait()才会开始执行任务
○ std::launch::async 表明函数会在自己创建的线程上运行
○ std::launch::deferred | std::launch::async 内部通过系统等条件自动选择策略
1 |
|
编译运行:1
2
3
4
5g++ async.cc -o async -lpthread
./async
hello bit!
Result: 2
- 使用 std::packaged_task 和 std::future 配合
std::packaged_task 就是将任务和 std::future 绑定在一起的模板,是一种对任务的封
装。我们可以通过 std::packaged_task 对象获取任务相关联的 std::future 对象,通过
调用 get_future()方法获得。std::packaged_task 的模板参数是函数签名。
可以把 std::future 和 std::async 看成是分开的, 而 std::packaged_task 则是一个整体。
1 |
|
编译运行:1
2
3
4
5g++ packaged_task.cc -o packaged_task -lpthread
$ ./packaged_task
hello bit!
Result: 3
异步执行 std::packaged_task 任务1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
int add(int num1, int num2)
{
std::this_thread::sleep_for(std::chrono::seconds(3));
return num1 + num2;
}
int main() {
// 封装任务
// std::packaged_task<int(int, int)> task(add);
// 此处可执行其他操作, 无需等待
// std::cout << "hello bit!" << std::endl;
// std::future<int> result_future = task.get_future();
//需要注意的是,task 虽然重载了()运算符,但 task 并不是一个函数,
//std::async(std::launch::async, task, 1, 2); //--错误用法
//所以导致它作为线程的入口函数时,语法上看没有问题,但是实际编译的时
候会报错
//std::thread(task, 1, 2); //---错误用法
//而 packaged_task 禁止了拷贝构造,
//且因为每个 packaged_task 所封装的函数签名都有可能不同,因此也无法
当作参数一样传递
//传引用不可取,毕竟任务在多线程下执行存在局部变量声明周期的问题,因
此不能传引用
//因此想要将一个 packaged_task 进行异步调用,
//简单方法就只能是 new packaged_task,封装函数传地址进行解引用调用
了
//而类型不同的问题,在使用的时候可以使用类型推导来解决
auto task = std::make_shared<std::packaged_task<int(int,
int)>>(add);
std::future<int> result_future = task->get_future();
std::thread thr([task](){ (*task)(1, 2); });
thr.detach();
// 获取异步任务结果
int result = result_future.get();
std::cout << "Result: " << result << std::endl;
return 0;
}
•使用 std::promise 和 std::future 配合
std::promise 提供了一种设置值的方式,它可以在设置之后通过相关联的 std::future 对
象进行读取。换种说法就是之前说过 std::future 可以读取一个异步函数的返回值了, 但是要等待就绪,而 std::promise 就提供一种 方式手动让 std::future 就绪1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void task(std::promise<int> result_promise)
{
int result = 2;
std::cout << "task result:" << result << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
result_promise.set_value(result);
}
int main() {
// 创建 promise
std::promise<int> result_promise;
std::future<int> result_future = result_promise.get_future();
// 创建一个新线程, 执行长时间运行的任务
std::thread task_thread(task, std::move(result_promise));
// 此处可执行其他操作, 无需等待
std::cout << "hello bit!" << std::endl;
// 获取异步任务结果
int result = result_future.get();
std::cout << "Result: " << result << std::endl;
task_thread.join();
return 0;
}
编译运行1
2
3
4
5
6$ g++ promise.cc -o promise -lpthread
$ ./promise
hello bit!
task result:2
Result: 2
线程池
基于线程池执行任务的时候,入口函数内部执行逻辑是固定的,因此选择packaged_task加上future的组合来实现。
工作思想:
- 用户传入要执行的函数,以及需要处理的数据,由线程池中的工作线程来执行函数来完成任务
实现: 管理的成员
任务池:
互斥锁:
一定数量的工作线程:管理的操作
入队任务
停止运行1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
using namespace std;
class ThreadPool
{
public:
using Functor = function<void(void)>;
ThreadPool(int count = 1) : _stop(false)
{
for (int i = 0; i < count; i++)
{
_threads.emplace_back(&ThreadPool::entry, this);
}
}
void stop()
{
if (_stop)
return;
_stop = true;
_cv.notify_all(); // 唤醒所有工作线程
for (auto &threads : _threads)
{
threads.join();
}
}
// push传入的首先是一个函数--用户要执行的函数,接下来是不定参,表示要处理的数据也就是函数中的参数
// push内部,会将这个传入的函数封装成一个异步任务(packaged_task),抛入到任务池中,由工作线程取出进行执行
// lambda生成一个可调用对象(内部执行异步任务),抛入到任务池中,由工作线程取出进行执行
template <typename F, typename... Args> // 不定参
auto push(F &&func, Args &&...args) -> future<decltype(func(args...))>
{
// 将传入的函数封装成packaged_task任务
using return_type = decltype(func(args...));
//由于不确定参数 先绑定函数
auto Func = std::bind(forward<F>(func), forward<Args>(args)...);
auto task = make_shared<packaged_task<return_type()>>(Func);
future<return_type> fu = task->get_future();
// 构造一个lambda匿名函数(捕获任务对象),函数内执行任务对象
{
unique_lock<mutex> lk(_mutex); // 加锁
// 将构造出来的匿名对象,抛入到任务池中
_taskpool.push_back([task](){
(*task)(); });
_cv.notify_all();
}
return fu;
}
~ThreadPool()
{
stop();
}
private:
// 线程入口函数--内部不断地从任务池中取出任务进行执行
void entry()
{
vector<Functor> task_pool;
{
// 加锁
unique_lock<mutex> lock(_mutex);
// stop被置位返回 去出任务进行执行
_cv.wait(lock, [this]()
{ return _stop || _taskpool.empty(); });
// 取出任务执行 一次性全部交换出来
task_pool.swap(_taskpool);
}
for (auto &task : task_pool)
{
task();
}
}
private:
mutex _mutex;
vector<Functor> _taskpool; // 任务池
vector<thread> _threads;
condition_variable _cv;//条件变量
atomic<bool> _stop; // 原子类型数
};
_taskpool.push_back(task{ (*task)(); })。 里面是匿名函数,捕获了task,保存在lambda所生成的可调用对象里。将生成的可调用对象Push到任务池中。
注意上面的这个函数参 auto push(F &&func, Args &&…args) -> future


