异步+超时等待的C++实现

异步+超时等待的C++实现与应用

问题描述

在C++开发过程中,我们最常用的编程模式还是同步编程。原因很简单:

  • 流程简介明了,能明确知道程序的步骤流程
  • 作用域简单,能够更好的控制作用域内对象的生命周期
  • 历史原因, C++11才有的lambda表达式,是的异步编程的复杂度降低, 在此之前都比较复杂

有了C++11以后,异步形式的接口越来越多,最基本就是 std::thread, std::async之类的接口

如果我们需要在某个同步流程中去处理比较耗时的操作时, 我们的同步流程就会被堵住以至于后续的流程无法继续。如果我们在编写同步流程时,能控制这段同步代码的最大耗时, 那将是一个很有效的编程模式。

于是 异步 + 超时等待的模式营运而生

思考

  • 在C++11开始以后,我们能使用到的可以带超时等待的标准库接口都有哪些呢?

    • std::future
    • std::condition_variable
  • 可以构造std::future对象的标准库接口有哪些呢?

    • std::async
    • std::promise
    • std::packaged_task

可选方案

使用std::async

1
2
3
4
5
6
7
8
9
auto future = std::async(std::lauch::async, [=] {
/// 异步操作
/// 此处比较耗时,省略
return result;
});
if (future.wait_for(10s) == std::future_status::timeout) {
/// 此处为等待结果超时
/// 但此处的return或者抛异常等退出作用域的行为都会导致阻塞直到异步操作完成
}

注意

  1. std::async名字虽然带有async但能给人很大的误导, 使用std::async的时候, 如果没有获取返回值的future对象,那该异步函数便会成为同步函数。原因是: std::async返回的future对象由于是右值,但没有左值对象来接管该对象的生命周期, 因此在调用完std::async后,该future对象会自动析构, 析构时,会发生等待操作,一直等待异步操作的返回值,因此看起来就像是同步一样(见示例代码1)

  2. 如果获取了std::async的返回值future对象, 那std::async才能叫真正的异步调用。 这样可以在发起异步操作后立马去执行其他操作,执行完了其他必要操作再使用future对象来获取异步结果(也有可能获取不到仍需等待,见示例代码2)

  3. 使用std::async的时候, 如果异步操作的时间大于等待超时的时间,那么等待超时以后return也会阻塞到异步操作完成才会完全退出作用(见示例代码3)

示例代码1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <iostream>
#include <future>
using namespace std::literals::chrono_literals;
int main(int, char**)
{
std::async(std::launch::async, [] {
std::cout << "这里是异步操作" << std::endl;
std::this_thread::sleep_for(1s);
std::cout << "异步操作结束" << std::endl;
});

std::cout << "这里是异步操作发起后的同步操作" << std::endl;

return 0;
}

示例代码1输出:

1
2
3
4
这里是异步操作
异步操作结束
这里是异步操作发起后的同步操作
Program ended with exit code: 0

示例代码2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>
#include <future>

using namespace std::literals::chrono_literals;

int main(int, char**)
{
auto future = std::async(std::launch::async, [] {
std::cout << "这里是异步操作" << std::endl;
std::this_thread::sleep_for(1s);
std::cout << "异步操作结束" << std::endl;
return 1000;
});

std::cout << "这里是异步操作发起后的同步操作" << std::endl;
std::this_thread::sleep_for(500ms);
auto result = future.get();
std::cout << "异步操作结果为: " << result << std::endl;

return 0;
}

示例代码2输出:

1
2
3
4
5
这里是异步操作发起后的同步操作
这里是异步操作
异步操作结束
异步操作结果为: 1000
Program ended with exit code: 0

示例代码3:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <iostream>
#include <future>

using namespace std::literals::chrono_literals;

double now() {
return std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::system_clock::now().time_since_epoch()).count();
}

void do_something_with_future() {
auto future = std::async(std::launch::async, [] {
std::cout << "这里是异步操作" << std::endl;
std::this_thread::sleep_for(5s);
std::cout << "异步操作结束" << std::endl;
return 1000;
});

std::cout << "这里是异步操作发起后的同步操作" << std::endl;
std::this_thread::sleep_for(500ms);
if (future.wait_for(1s) == std::future_status::timeout) {
std::cout << "等待异步结果超时" << std::endl;
return;
}
auto result = future.get();
std::cout << "异步操作结果为: " << result << std::endl;
}

int main(int, char**)
{
auto n = now();
std::cout << "还没有使用future" << std::endl;
do_something_with_future();
std::cout << "使用完future, 用时" << now() - n << "s" << std::endl;

return 0;
}

示例代码3输出:

1
2
3
4
5
6
7
还没有使用future
这里是异步操作发起后的同步操作
这里是异步操作
等待异步结果超时
异步操作结束
使用完future, 用时5.00282s
Program ended with exit code: 0

说明:

这段代码乍一看,以为耗时是500ms,其实不然, 输出的总使用耗时是5s左右。

使用std::promise

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <iostream>
#include <future>

using namespace std::literals::chrono_literals;

double now() {
return std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::system_clock::now().time_since_epoch()).count();
}


void do_something_with_promise() {
std::promise<int> promise;

std::cout << "发起异步操作:" << std::endl;

/// 暂时用std::thread模拟下异步接口
std::thread([&] {
std::this_thread::sleep_for(5s);
promise.set_value(1000);
}).detach();

auto future = promise.get_future();

if (future.wait_for(1s) == std::future_status::timeout) {
throw std::runtime_error("等待异步结果超时");
}

auto result = future.get();
std::cout << "异步操作结果为: " << result << std::endl;
}

int main(int, char**)
{
auto n = now();
std::cout << "开始使用promise" << std::endl;
try {
do_something_with_promise();
} catch(const std::exception& e) {
std::cout << e.what() << std::endl;
}
std::cout << "使用完promise, 用时: " << now() - n << "s" << std::endl;

return 0;
}

输出:

1
2
3
4
5
开始使用promise
发起异步操作:
等待异步结果超时
使用完promise, 用时: 1.00624s
Program ended with exit code: 0

代码说明:

与std::async不同的是,std::promise拿到的future,在promise还没设置值得时候, future的析构不会被阻塞。因此如果希望等待超时以后能立马退出作用域,更推荐使用 std::promise

使用std::packaged_task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <iostream>
#include <future>

using namespace std::literals::chrono_literals;

double now() {
return std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::system_clock::now().time_since_epoch()).count();
}


void do_something_with_packaged_task() {

std::packaged_task<int()> task([] {
std::this_thread::sleep_for(5s);
return 1000;
});

std::cout << "发起异步操作:" << std::endl;

/// 使用std::thread模拟异步接口
std::thread([&] {
task();
}).detach();

auto future = task.get_future();

if (future.wait_for(1s) == std::future_status::timeout) {
throw std::runtime_error("等待异步结果超时");
}

auto result = future.get();
std::cout << "异步操作结果为: " << result << std::endl;
}

int main(int, char**)
{
auto n = now();
std::cout << "开始使用packaged_task" << std::endl;
try {
do_something_with_packaged_task();
} catch(const std::exception& e) {
std::cout << e.what() << std::endl;
}
std::cout << "使用完packaged_task, 用时: " << now() - n << "s" << std::endl;

return 0;
}

输出:

1
2
3
4
5
开始使用packaged_task
发起异步操作:
等待异步结果超时
使用完packaged_task, 用时: 1.00114s
Program ended with exit code: 0

代码说明:

其实std::packaged_task可以看成是std::function与std::promise的结合体, 可以理解为把一个 返回值为T类型的std::function的返回值存到内部的std::promise后,可以通过future获取到返回值

使用std::condition_variable

使用std::condition_variable的思路是:

将异步操作的参数类型做一份拷贝,并和结果类型放在一起进行包装,创建一份包裹对象, 并在包裹对象内部使用 std::mutex和std::condition_variable对结果的访问加上条件限制。在包裹对象设置了结果值时,解除条件限制。因此可以利用 std::condition_variable的wait系列接口来限时等待或者无限等待结果的功能

综合使用示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#include <string>
#include <iostream>
#include <condition_variable>

#include <thread>
#include <future>

namespace std_srvs {
struct SetIntegerRequest {
int64_t data;
};

struct SetIntegerResponse {
bool success;
std::string message;
};


struct SetInteger {
using Request = SetIntegerRequest;
using Response = SetIntegerResponse;

Request request;
Response response;
};

}

template<typename MREQ, typename MRES>
struct Wrapper {
using Request = MREQ;
using Response = MRES;

Wrapper(const Request& req)
: m_request(req)
, m_response() {

}

const Request& request() const {
return m_request;
}

const Response& response() const {
return m_response;
}

Response& response() {
return m_response;
}

void wait() {
std::unique_lock<std::mutex> lock(m_reponse_lock);
m_response_ready.wait(lock);
}

template<typename Rep, typename Period>
std::cv_status wait_for(const std::chrono::duration<Rep, Period>& duration) {
std::unique_lock<std::mutex> lock(m_reponse_lock);
return m_response_ready.wait_for(lock, duration);
}

template<typename Clock, typename Duration>
std::cv_status wait_until(const std::chrono::time_point<Clock, Duration>& time_point) {
std::unique_lock<std::mutex> lock(m_reponse_lock);
return m_response_ready.wait_until(lock, time_point);
}

void accept() {
std::unique_lock<std::mutex> lock(m_reponse_lock);
m_response_ready.notify_all();
}

private:
Request m_request;
Response m_response;

std::mutex m_reponse_lock;
std::condition_variable m_response_ready;
};

template<typename MREQ, typename MRES>
struct PromiseWrapper {
using Request = MREQ;
using Response = MRES;

PromiseWrapper(const Request& req): m_request(req) {

}

std::future<Response> get_future() {
return m_response.get_future();
}

const Request& request() const {
return m_request;
}

void set_response(Response response) {
m_response.set_value(std::move(response));
}

private:
Request m_request;
std::promise<Response> m_response;
};


bool handle(const std_srvs::SetInteger::Request& request, std_srvs::SetInteger::Response& response) {

auto wrapper = std::make_shared<Wrapper<std_srvs::SetInteger::Request, std_srvs::SetInteger::Response>>(request);


/// 这里发起异步任务
std::thread([=] {


/// 模拟长时间,可以尝试将此处的时间和超时等待时间的长短交换看看结果
std::this_thread::sleep_for(std::chrono::seconds(5));

wrapper->response().success = true;
wrapper->response().message = "success" + std::to_string(wrapper->request().data);

wrapper->accept(); /// 接受任务结果

}).detach();


/// 这里等待异步任务的结果
if (wrapper->wait_for(std::chrono::seconds(1)) == std::cv_status::no_timeout) {
response = wrapper->response();
} else {
response.success = false;
response.message = "timeout";
}

return true;
}

bool promise_handle(const std_srvs::SetInteger::Request& request, std_srvs::SetInteger::Response& response) {

auto wrapper = std::make_shared<PromiseWrapper<std_srvs::SetInteger::Request, std_srvs::SetInteger::Response>>(request);
/// 这里发起异步任务
std::thread([=] {


/// 模拟长时间,可以尝试将此处的时间和超时等待时间的长短交换看看结果
std::this_thread::sleep_for(std::chrono::seconds(5));

std_srvs::SetInteger::Response r;
r.success = true;
r.message = "success" + std::to_string(wrapper->request().data);

wrapper->set_response(std::move(r)); /// 接受任务结果

}).detach();

auto future = wrapper->get_future();
if (future.wait_for(std::chrono::seconds(1)) == std::future_status::timeout) {
response.success = false;
response.message = "promise timeout";
} else {
response = future.get();
}

return true;
}

void handle_test() {
std_srvs::SetInteger srv;

std::cout << "start: " << std::chrono::system_clock::now().time_since_epoch().count() << std::endl;
handle(srv.request, srv.response);
std::cout << "end : " << std::chrono::system_clock::now().time_since_epoch().count() << std::endl;

if (srv.response.success) {
std::cout << srv.response.message << std::endl;
} else {
std::cerr << srv.response.message << std::endl;
}
}

void promise_handle_test() {
std_srvs::SetInteger srv;

std::cout << "start: " << std::chrono::system_clock::now().time_since_epoch().count() << std::endl;
promise_handle(srv.request, srv.response);
std::cout << "end : " << std::chrono::system_clock::now().time_since_epoch().count() << std::endl;

if (srv.response.success) {
std::cout << srv.response.message << std::endl;
} else {
std::cerr << srv.response.message << std::endl;
}
}


int main(int, char**)
{

handle_test();
promise_handle_test();

return 0;
}

代码说明

  1. 首先是Wrapper/PromiseWrapper类, 用于将同步处理中的请求参数进行包裹并作为异步参数。 对请求与相应进行包裹的目的,在于延长异步处理时的请求参数与相应参数的生命周期,使其在服务回调已经结束的情况下,仍旧有自己的生命周期,只有在服务结束并且异步处理也结束的情况下包裹的对象的生命周期才会结束
  2. Wrapper类使用的是条件变量(std::condition_variable)技术来实现等待, PromiseWrapper使用的是 std::promise技术来实现等待
  3. 两个同步性质的服务处理函数handle/promise_handle,其中handle使用的是Wrapper技术, promise_handle用的是PromiseWrapper技术
  4. 两种方式都有一个共同点, 都可以等待制定的时间后以超时错误结束服务处理流程

应用场景

rosservice的服务程序

  • rosservice的服务程序不能阻塞,一旦阻塞,变回引起调用方连锁阻塞,会使系统进入僵死状态
感谢您对本站的支持.