异步+超时等待的C++实现与应用
问题描述
在C++开发过程中,我们最常用的编程模式还是同步编程。原因很简单:
- 流程简介明了,能明确知道程序的步骤流程
- 作用域简单,能够更好的控制作用域内对象的生命周期
- 历史原因, C++11才有的lambda表达式,是的异步编程的复杂度降低, 在此之前都比较复杂
有了C++11以后,异步形式的接口越来越多,最基本就是 std::thread, std::async之类的接口
如果我们需要在某个同步流程中去处理比较耗时的操作时, 我们的同步流程就会被堵住以至于后续的流程无法继续。如果我们在编写同步流程时,能控制这段同步代码的最大耗时, 那将是一个很有效的编程模式。
于是 异步 + 超时等待的模式营运而生
思考
可选方案
使用std::async
1 2 3 4 5 6 7 8 9
| auto future = std::async(std::lauch::async, [=] { return result; }); if (future.wait_for(10s) == std::future_status::timeout) { }
|
注意:
std::async名字虽然带有async
但能给人很大的误导, 使用std::async的时候, 如果没有获取返回值的future对象,那该异步函数便会成为同步函数。原因是: std::async返回的future对象由于是右值,但没有左值对象来接管该对象的生命周期, 因此在调用完std::async后,该future对象会自动析构, 析构时,会发生等待操作,一直等待异步操作的返回值,因此看起来就像是同步一样(见示例代码1)
如果获取了std::async的返回值future对象, 那std::async才能叫真正的异步调用。 这样可以在发起异步操作后立马去执行其他操作,执行完了其他必要操作再使用future对象来获取异步结果(也有可能获取不到仍需等待,见示例代码2)
使用std::async的时候, 如果异步操作的时间大于等待超时的时间,那么等待超时以后return也会阻塞到异步操作完成才会完全退出作用(见示例代码3)
示例代码1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| #include <iostream> #include <future> using namespace std::literals::chrono_literals; int main(int, char**) { std::async(std::launch::async, [] { std::cout << "这里是异步操作" << std::endl; std::this_thread::sleep_for(1s); std::cout << "异步操作结束" << std::endl; }); std::cout << "这里是异步操作发起后的同步操作" << std::endl; return 0; }
|
示例代码1输出:
1 2 3 4
| 这里是异步操作 异步操作结束 这里是异步操作发起后的同步操作 Program ended with exit code: 0
|
示例代码2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| #include <iostream> #include <future>
using namespace std::literals::chrono_literals;
int main(int, char**) { auto future = std::async(std::launch::async, [] { std::cout << "这里是异步操作" << std::endl; std::this_thread::sleep_for(1s); std::cout << "异步操作结束" << std::endl; return 1000; }); std::cout << "这里是异步操作发起后的同步操作" << std::endl; std::this_thread::sleep_for(500ms); auto result = future.get(); std::cout << "异步操作结果为: " << result << std::endl; return 0; }
|
示例代码2输出:
1 2 3 4 5
| 这里是异步操作发起后的同步操作 这里是异步操作 异步操作结束 异步操作结果为: 1000 Program ended with exit code: 0
|
示例代码3:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| #include <iostream> #include <future>
using namespace std::literals::chrono_literals;
double now() { return std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::system_clock::now().time_since_epoch()).count(); }
void do_something_with_future() { auto future = std::async(std::launch::async, [] { std::cout << "这里是异步操作" << std::endl; std::this_thread::sleep_for(5s); std::cout << "异步操作结束" << std::endl; return 1000; }); std::cout << "这里是异步操作发起后的同步操作" << std::endl; std::this_thread::sleep_for(500ms); if (future.wait_for(1s) == std::future_status::timeout) { std::cout << "等待异步结果超时" << std::endl; return; } auto result = future.get(); std::cout << "异步操作结果为: " << result << std::endl; }
int main(int, char**) { auto n = now(); std::cout << "还没有使用future" << std::endl; do_something_with_future(); std::cout << "使用完future, 用时" << now() - n << "s" << std::endl; return 0; }
|
示例代码3输出:
1 2 3 4 5 6 7
| 还没有使用future 这里是异步操作发起后的同步操作 这里是异步操作 等待异步结果超时 异步操作结束 使用完future, 用时5.00282s Program ended with exit code: 0
|
说明:
这段代码乍一看,以为耗时是500ms,其实不然, 输出的总使用耗时是5s左右。
使用std::promise
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| #include <iostream> #include <future>
using namespace std::literals::chrono_literals;
double now() { return std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::system_clock::now().time_since_epoch()).count(); }
void do_something_with_promise() { std::promise<int> promise; std::cout << "发起异步操作:" << std::endl; std::thread([&] { std::this_thread::sleep_for(5s); promise.set_value(1000); }).detach(); auto future = promise.get_future(); if (future.wait_for(1s) == std::future_status::timeout) { throw std::runtime_error("等待异步结果超时"); } auto result = future.get(); std::cout << "异步操作结果为: " << result << std::endl; }
int main(int, char**) { auto n = now(); std::cout << "开始使用promise" << std::endl; try { do_something_with_promise(); } catch(const std::exception& e) { std::cout << e.what() << std::endl; } std::cout << "使用完promise, 用时: " << now() - n << "s" << std::endl; return 0; }
|
输出:
1 2 3 4 5
| 开始使用promise 发起异步操作: 等待异步结果超时 使用完promise, 用时: 1.00624s Program ended with exit code: 0
|
代码说明:
与std::async不同的是,std::promise拿到的future,在promise还没设置值得时候, future的析构不会被阻塞。因此如果希望等待超时以后能立马退出作用域,更推荐使用 std::promise
使用std::packaged_task
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| #include <iostream> #include <future>
using namespace std::literals::chrono_literals;
double now() { return std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::system_clock::now().time_since_epoch()).count(); }
void do_something_with_packaged_task() { std::packaged_task<int()> task([] { std::this_thread::sleep_for(5s); return 1000; }); std::cout << "发起异步操作:" << std::endl; std::thread([&] { task(); }).detach(); auto future = task.get_future(); if (future.wait_for(1s) == std::future_status::timeout) { throw std::runtime_error("等待异步结果超时"); } auto result = future.get(); std::cout << "异步操作结果为: " << result << std::endl; }
int main(int, char**) { auto n = now(); std::cout << "开始使用packaged_task" << std::endl; try { do_something_with_packaged_task(); } catch(const std::exception& e) { std::cout << e.what() << std::endl; } std::cout << "使用完packaged_task, 用时: " << now() - n << "s" << std::endl; return 0; }
|
输出:
1 2 3 4 5
| 开始使用packaged_task 发起异步操作: 等待异步结果超时 使用完packaged_task, 用时: 1.00114s Program ended with exit code: 0
|
代码说明:
其实std::packaged_task可以看成是std::function与std::promise的结合体, 可以理解为把一个 返回值为T类型的std::function的返回值存到内部的std::promise后,可以通过future获取到返回值
使用std::condition_variable
使用std::condition_variable的思路是:
将异步操作的参数类型做一份拷贝,并和结果类型放在一起进行包装,创建一份包裹对象, 并在包裹对象内部使用 std::mutex和std::condition_variable对结果的访问加上条件限制。在包裹对象设置了结果值时,解除条件限制。因此可以利用 std::condition_variable的wait系列接口来限时等待或者无限等待结果的功能
综合使用示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
| #include <string> #include <iostream> #include <condition_variable>
#include <thread> #include <future>
namespace std_srvs { struct SetIntegerRequest { int64_t data; };
struct SetIntegerResponse { bool success; std::string message; };
struct SetInteger { using Request = SetIntegerRequest; using Response = SetIntegerResponse; Request request; Response response; };
}
template<typename MREQ, typename MRES> struct Wrapper { using Request = MREQ; using Response = MRES; Wrapper(const Request& req) : m_request(req) , m_response() { } const Request& request() const { return m_request; } const Response& response() const { return m_response; } Response& response() { return m_response; } void wait() { std::unique_lock<std::mutex> lock(m_reponse_lock); m_response_ready.wait(lock); } template<typename Rep, typename Period> std::cv_status wait_for(const std::chrono::duration<Rep, Period>& duration) { std::unique_lock<std::mutex> lock(m_reponse_lock); return m_response_ready.wait_for(lock, duration); } template<typename Clock, typename Duration> std::cv_status wait_until(const std::chrono::time_point<Clock, Duration>& time_point) { std::unique_lock<std::mutex> lock(m_reponse_lock); return m_response_ready.wait_until(lock, time_point); } void accept() { std::unique_lock<std::mutex> lock(m_reponse_lock); m_response_ready.notify_all(); } private: Request m_request; Response m_response; std::mutex m_reponse_lock; std::condition_variable m_response_ready; };
template<typename MREQ, typename MRES> struct PromiseWrapper { using Request = MREQ; using Response = MRES; PromiseWrapper(const Request& req): m_request(req) { } std::future<Response> get_future() { return m_response.get_future(); } const Request& request() const { return m_request; } void set_response(Response response) { m_response.set_value(std::move(response)); } private: Request m_request; std::promise<Response> m_response; };
bool handle(const std_srvs::SetInteger::Request& request, std_srvs::SetInteger::Response& response) { auto wrapper = std::make_shared<Wrapper<std_srvs::SetInteger::Request, std_srvs::SetInteger::Response>>(request); std::thread([=] { std::this_thread::sleep_for(std::chrono::seconds(5)); wrapper->response().success = true; wrapper->response().message = "success" + std::to_string(wrapper->request().data); wrapper->accept(); }).detach(); if (wrapper->wait_for(std::chrono::seconds(1)) == std::cv_status::no_timeout) { response = wrapper->response(); } else { response.success = false; response.message = "timeout"; } return true; }
bool promise_handle(const std_srvs::SetInteger::Request& request, std_srvs::SetInteger::Response& response) { auto wrapper = std::make_shared<PromiseWrapper<std_srvs::SetInteger::Request, std_srvs::SetInteger::Response>>(request); std::thread([=] { std::this_thread::sleep_for(std::chrono::seconds(5)); std_srvs::SetInteger::Response r; r.success = true; r.message = "success" + std::to_string(wrapper->request().data); wrapper->set_response(std::move(r)); }).detach(); auto future = wrapper->get_future(); if (future.wait_for(std::chrono::seconds(1)) == std::future_status::timeout) { response.success = false; response.message = "promise timeout"; } else { response = future.get(); } return true; }
void handle_test() { std_srvs::SetInteger srv; std::cout << "start: " << std::chrono::system_clock::now().time_since_epoch().count() << std::endl; handle(srv.request, srv.response); std::cout << "end : " << std::chrono::system_clock::now().time_since_epoch().count() << std::endl; if (srv.response.success) { std::cout << srv.response.message << std::endl; } else { std::cerr << srv.response.message << std::endl; } }
void promise_handle_test() { std_srvs::SetInteger srv; std::cout << "start: " << std::chrono::system_clock::now().time_since_epoch().count() << std::endl; promise_handle(srv.request, srv.response); std::cout << "end : " << std::chrono::system_clock::now().time_since_epoch().count() << std::endl; if (srv.response.success) { std::cout << srv.response.message << std::endl; } else { std::cerr << srv.response.message << std::endl; } }
int main(int, char**) {
handle_test(); promise_handle_test(); return 0; }
|
代码说明
- 首先是Wrapper/PromiseWrapper类, 用于将同步处理中的请求参数进行包裹并作为异步参数。 对请求与相应进行包裹的目的,在于延长异步处理时的请求参数与相应参数的生命周期,使其在服务回调已经结束的情况下,仍旧有自己的生命周期,只有在服务结束并且异步处理也结束的情况下包裹的对象的生命周期才会结束
- Wrapper类使用的是
条件变量(std::condition_variable)
技术来实现等待, PromiseWrapper使用的是 std::promise
技术来实现等待
- 两个同步性质的服务处理函数handle/promise_handle,其中handle使用的是Wrapper技术, promise_handle用的是PromiseWrapper技术
- 两种方式都有一个共同点, 都可以等待制定的时间后以超时错误结束服务处理流程
应用场景
rosservice的服务程序
- rosservice的服务程序不能阻塞,一旦阻塞,变回引起调用方连锁阻塞,会使系统进入僵死状态