future

介绍

std::future 是 C++11 标准库中的一个模板类,它表示一个异步操作的结果。当我们在
多线程编程中使用异步任务时,std::future 可以帮助我们在需要的时候获取任务的执行
结果。std::future 的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保
我们在获取结果时不会遇到未完成的操作

应用场景

  • 异步任务:当我们需要在后台执行一些耗时操作时,如网络请求或计算密集型任
    务等,std::future 可以用来表示这些异步任务的结果。通过将任务与主线程分离,我们
    可以实现任务的并行处理,从而提高程序的执行效率
  • 并发控制:: 在多线程编程中,我们可能需要等待某些任务完成后才能继续执行其
    他操作。通过使用 std::future,我们可以实现线程之间的同步,确保任务完成后再获取
    结果并继续执行后续操作
  • 结果获取::std::future 提供了一种安全的方式来获取异步任务的结果。我们可以使
    用 std::future::get()函数来获取任务的结果,此函数会阻塞当前线程,直到异步操作完
    成。这样,在调用 get()函数时,我们可以确保已经获取到了所需的结果

用法示例

  • std::async关联异步任务
    async是一种将任务与future关联的简单方法。它创建并运行一个异步任务。并返回一个与该任务结果关联的future对象。默认情况下,async是否启动一个新线程,或者在等待future时,任务是否同步运行都取决于你给的参数。这个参数为launch类型:
    ○ std::launch::deferred 表明该函数会被延迟调用,直到在 future 上调用 get()或
    者 wait()才会开始执行任务
    ○ std::launch::async 表明函数会在自己创建的线程上运行
    ○ std::launch::deferred | std::launch::async 内部通过系统等条件自动选择策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>
#include <future>
#include <chrono>

int aysnc_task() {
std::this_thread::sleep_for(std::chrono::seconds(3));
return 2;
}
int main() {
// 关联异步任务 aysnc_task 和 futrue
std::future<int> result_future =
std::async(std::launch::async, aysnc_task);
// 此处可执行其他操作, 无需等待
std::cout << "hello bit!" << std::endl;
// 获取异步任务结果
int result = result_future.get();
std::cout << "Result: " << result << std::endl;
return 0;

}

编译运行:

1
2
3
4
5
g++ async.cc -o async -lpthread
./async

hello bit!
Result: 2

  • 使用 std::packaged_task 和 std::future 配合
    std::packaged_task 就是将任务和 std::future 绑定在一起的模板,是一种对任务的封
    装。我们可以通过 std::packaged_task 对象获取任务相关联的 std::future 对象,通过
    调用 get_future()方法获得。std::packaged_task 的模板参数是函数签名。
    可以把 std::future 和 std::async 看成是分开的, 而 std::packaged_task 则是一个整体。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <future>
#include <chrono>

int add(int num1, int num2)
{
return num1 + num2;
}
int main() {
// 封装任务
std::packaged_task<int(int, int)> task(add);
// 此处可执行其他操作, 无需等待
std::cout << "hello bit!" << std::endl;
std::future<int> result_future = task.get_future();
// 这里必须要让任务执行, 否则在 get()获取 future 的值时会一直阻塞
task(1,2);
// 获取异步任务结果
int result = result_future.get();
std::cout << "Result: " << result << std::endl;
return 0;`

}

编译运行:

1
2
3
4
5
g++ packaged_task.cc -o packaged_task -lpthread
$ ./packaged_task

hello bit!
Result: 3

异步执行 std::packaged_task 任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <iostream>
#include <future>
#include <chrono>
#include <memory>

int add(int num1, int num2)
{
std::this_thread::sleep_for(std::chrono::seconds(3));
return num1 + num2;
}
int main() {
// 封装任务
// std::packaged_task<int(int, int)> task(add);
// 此处可执行其他操作, 无需等待
// std::cout << "hello bit!" << std::endl;
// std::future<int> result_future = task.get_future();
//需要注意的是,task 虽然重载了()运算符,但 task 并不是一个函数,
//std::async(std::launch::async, task, 1, 2); //--错误用法
//所以导致它作为线程的入口函数时,语法上看没有问题,但是实际编译的时
候会报错
//std::thread(task, 1, 2); //---错误用法
//而 packaged_task 禁止了拷贝构造,
//且因为每个 packaged_task 所封装的函数签名都有可能不同,因此也无法
当作参数一样传递
//传引用不可取,毕竟任务在多线程下执行存在局部变量声明周期的问题,因
此不能传引用
//因此想要将一个 packaged_task 进行异步调用,
//简单方法就只能是 new packaged_task,封装函数传地址进行解引用调用

//而类型不同的问题,在使用的时候可以使用类型推导来解决


auto task = std::make_shared<std::packaged_task<int(int,
int)>>(add);
std::future<int> result_future = task->get_future();
std::thread thr([task](){ (*task)(1, 2); });
thr.detach();
// 获取异步任务结果
int result = result_future.get();
std::cout << "Result: " << result << std::endl;
return 0;

}

•使用 std::promise 和 std::future 配合
std::promise 提供了一种设置值的方式,它可以在设置之后通过相关联的 std::future 对
象进行读取。换种说法就是之前说过 std::future 可以读取一个异步函数的返回值了, 但是要等待就绪,而 std::promise 就提供一种 方式手动让 std::future 就绪
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <future>
#include <chrono>

void task(std::promise<int> result_promise)
{
int result = 2;
std::cout << "task result:" << result << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
result_promise.set_value(result);
}
int main() {
// 创建 promise
std::promise<int> result_promise;
std::future<int> result_future = result_promise.get_future();

// 创建一个新线程, 执行长时间运行的任务
std::thread task_thread(task, std::move(result_promise));

// 此处可执行其他操作, 无需等待
std::cout << "hello bit!" << std::endl;

// 获取异步任务结果
int result = result_future.get();
std::cout << "Result: " << result << std::endl;
task_thread.join();
return 0;

}

编译运行

1
2
3
4
5
6
$ g++ promise.cc -o promise -lpthread
$ ./promise

hello bit!
task result:2
Result: 2

线程池

基于线程池执行任务的时候,入口函数内部执行逻辑是固定的,因此选择packaged_task加上future的组合来实现。

工作思想:

  • 用户传入要执行的函数,以及需要处理的数据,由线程池中的工作线程来执行函数来完成任务
    实现:
  • 管理的成员
    任务池:
    互斥锁:
    一定数量的工作线程:

  • 管理的操作
    入队任务
    停止运行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    #include <functional>
    #include <iostream>
    #include <memory>
    #include <thread>
    #include <condition_variable>
    #include <mutex>
    #include <vector>
    using namespace std;

    class ThreadPool
    {
    public:
        using Functor = function<void(void)>;
        ThreadPool(int count = 1) : _stop(false)
        {
            for (int i = 0; i < count; i++)
            {
                _threads.emplace_back(&ThreadPool::entry, this);
            }
        }
        void stop()
        {
            if (_stop)
                return;
            _stop = true;
            _cv.notify_all(); // 唤醒所有工作线程
            for (auto &threads : _threads)
            {
                threads.join();
            }
        }
        // push传入的首先是一个函数--用户要执行的函数,接下来是不定参,表示要处理的数据也就是函数中的参数
        // push内部,会将这个传入的函数封装成一个异步任务(packaged_task),抛入到任务池中,由工作线程取出进行执行
        // lambda生成一个可调用对象(内部执行异步任务),抛入到任务池中,由工作线程取出进行执行
        template <typename F, typename... Args> // 不定参
        auto push(F &&func, Args &&...args) -> future<decltype(func(args...))>
        {
            // 将传入的函数封装成packaged_task任务
            using return_type = decltype(func(args...));
           //由于不确定参数 先绑定函数
    auto Func = std::bind(forward<F>(func), forward<Args>(args)...);
            auto task = make_shared<packaged_task<return_type()>>(Func);
            future<return_type> fu = task->get_future();
            // 构造一个lambda匿名函数(捕获任务对象),函数内执行任务对象
            {
                unique_lock<mutex> lk(_mutex); // 加锁
                // 将构造出来的匿名对象,抛入到任务池中
                _taskpool.push_back([task](){
                (*task)(); });
                _cv.notify_all();
            }
            return fu;
        }
        ~ThreadPool()
        {
            stop();
        }
    private:
      // 线程入口函数--内部不断地从任务池中取出任务进行执行
        void entry()
        {
            vector<Functor> task_pool;
            {
                // 加锁
                unique_lock<mutex> lock(_mutex);
                // stop被置位返回 去出任务进行执行
                _cv.wait(lock, [this]()
                        { return _stop || _taskpool.empty(); });

                // 取出任务执行 一次性全部交换出来
               task_pool.swap(_taskpool);
            }
            for (auto &task : task_pool)
            {
                task();
            }
        }
    private:
        mutex _mutex;
        vector<Functor> _taskpool; // 任务池
        vector<thread> _threads;  
        condition_variable _cv;//条件变量
        atomic<bool> _stop; // 原子类型数

    };

_taskpool.push_back(task{ (*task)(); })。 里面是匿名函数,捕获了task,保存在lambda所生成的可调用对象里。将生成的可调用对象Push到任务池中。
注意上面的这个函数参 auto push(F &&func, Args &&…args) -> future F &&func类型是个万能引用,这里如果加上一个const (const F&& func),那这个const F&& 就是一个带const 的右值引用 就只能接收右值对象,我们在后面调用bind函数的时候 参数结果是左值,push会出现错误。解决方法:可以用move变成右值,F&& func 就是个万能引用。去掉const后,func可以被正常绑定,修改(或修改)避免了额外的类型限制。