Thread safe async loop implementation with ASIO
In the one of my previous post (ASIO based thread implementation thoughts) I was thinking about ASIO based thread like implementation athread
. I’ve spend some time experimenting with the design and realized that what we need there is more like asynchronous loop allows thread safe communication via posting function objects than thread like structure implementation.
In my current design athread
becomes basic_async_loop<Executor>
template with an Executor
template parameter and asio_executor
as its implementation so resulting async_loop
is defined as
using async_loop = basic_async_loop<asio_executor>;
Executor implementation si strightforward and need implement stop()
, run()
, post()
and post_with_result()
functions. In case asio_executor
implementation looks this way
class asio_executor
{
public:
asio_executor() : _idle{_io} {}
void stop() {_io.stop();}
void run() {_io.run();}
template <typename F>
void post(F && f) {
asio::post(_io, f);
}
template<typename F>
auto post_with_result(F && f) -> future<decltype(f())> {
using result_type = decltype(f());
auto p = promise<result_type>{};
future<result_type> result = p.get_future();
asio::post(_io, [result = move(p), func = move(f)]() mutable {
result.set_value(func());
});
return result;
}
private:
asio::io_context _io;
asio::io_context::work _idle;
};
where there is nothing new there, post()
ans post_with_resupt()
are implemented the same way as before.
Used basic_async_loop<>
design allows replace whole ASIO library with for example concurrent queue implementation.
Implementation basic_async_loop<>
looks this way
template <typename Executor>
class basic_async_loop
{
public:
template <typename F>
void post(F && f) {
_io.post(move(f));
}
void async_run() {
_th = jthread{[this](stop_token st){
stop_callback stop_cb{st, [this]{
_io.stop();
}};
_io.run();
}};
}
template<typename F>
auto post_with_result(F && f) -> future<decltype(f())> {return _io.post_with_result(move(f));}
void request_stop() {_th.request_stop();}
[[nodiscard]] stop_token get_stop_token() const noexcept {return _th.get_stop_token();}
void join() {
if (_th.joinable())
_th.join();
}
private:
jthread _th;
Executor _io;
};
where we borrow request_stop()
and join()
functions from std::jthread
interface. In addition basic_async_loop<>
defines async_run()
function to run loop in a new thread.
Last time we developed a little brain/worker sample where with a async_loop
there is no any change there so it still looks this way
class brain
{
public:
brain() {_loop.async_run();}
async_loop & loop() {return _loop;}
bool need_more() {return true;}
void work_done() {cout << "*"; flush(cout);}
private:
async_loop _loop;
};
class worker
{
public:
worker(brain & b) : _b{&b} {}
void operator()()
{
// work hard ..., then notify
std::this_thread::sleep_for(100ms);
_b->loop().post([b = this->_b]{b->work_done();});
}
private:
brain * _b;
};
class robo_worker
{
public:
robo_worker(brain & b) : _b{&b} {}
void operator()(stop_token st)
{
future<bool> more;
do
{
// work hard ..., then notify
std::this_thread::sleep_for(75ms);
_b->loop().post([b = this->_b]{b->work_done();});
// need more?
more = _b->loop().post_with_result([b = this->_b]{return b->need_more();});
}
while (!st.stop_requested() && more.get());
}
private:
brain * _b;
};
int main(int argc, char * argv[])
{
brain b;
vector<future<void>> results;
for (int i = 0; i < 3; ++i)
results.push_back(async(worker{b}));
auto r = async(worker{b});
worker w1{b}, w2{b};
w1();
w2();
auto u = async(robo_worker{b}, b.loop().get_stop_token());
for (auto & r : results)
r.get();
std::this_thread::sleep_for(500ms);
b.loop().request_stop();
u.get(); // wait for robo_worker
cout << "\ndone!\n";
return 0;
}
full blown sample code can be found as
async_loop.cpp
As I had more time to spend with ASIO implementation, I’ve found that post()
based communication comes with a price. To store handler in a type agnostic way there is memory allocation each time post()
is called, which can be pretty expensive in some cases.