I would like to use ASIO to communicate with a device through TCP. More precisely there are multiple ports on the device that are used for communication. The communication is done in a request-response-matter. On the other side of my application is a user interface (say GRPC) which can be used to request information from the connected device. These requests can come at any time.
I have tried to replicate the scenario in an example:
Live On Coliru
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
namespace asio = boost::asio;
using namespace asio::experimental::awaitable_operators;
using namespace std::chrono_literals;
using std::this_thread::sleep_for;
static thread_local int const t_id = [] {
static std::atomic_int gen = 0;
return ++gen;
}();
#include <syncstream>
static auto stamp() {
static const auto start = std::chrono::steady_clock::now();
return (std::chrono::steady_clock::now() - start)/1ms;
}
#define TRACE() \
std::osyncstream(std::cout) << std::setw(3) << t_id << " | " << std::setw(6) << stamp() << "ms | " \
<< std::setw(20) << __FUNCTION__ << " | " << std::setw(3) << __LINE__ \
<< " | "
#define UTRACE(U) TRACE() << "user id:" << user_id << " | "
asio::awaitable<int> readFromRemote(asio::ip::tcp::socket& socket, int socket_id, int user_id) {
UTRACE() << "sending request to socket " << socket_id << std::endl;
{
// Placeholder for co_await socket.async_write(...);
asio::steady_timer timer(socket.get_executor(), 5s);
co_await timer.async_wait();
}
UTRACE() << "sending request to socket " << socket_id << " finished." << std::endl;
UTRACE() << "reading response from socket " << socket_id << std::endl;
{
// Placeholder for co_await socket.async_read(...);
asio::steady_timer timer(socket.get_executor(), 5s);
co_await timer.async_wait();
}
UTRACE() << "reading response from socket " << socket_id << " finished." << std::endl;
co_return 2;
}
class Client {
public:
Client(asio::io_context& ctx) : ctx_{ctx}, socket_1_{ctx}, socket_2_{ctx} {
asio::ip::tcp::endpoint //
ep1(asio::ip::make_address_v4("1.1.1.1"), 10),
ep2(asio::ip::make_address_v4("1.1.1.1"), 20);
co_spawn(ctx,
(socket_1_.async_connect(ep1, asio::as_tuple(asio::use_awaitable)) &&
socket_2_.async_connect(ep2, asio::as_tuple(asio::use_awaitable))),
asio::detached);
}
int request1(int user_id) {
std::lock_guard lock(mtx_socket_1);
return co_spawn(ctx_, readFromRemote(socket_1_, 1, user_id), asio::use_future).get();
}
int request12(int user_id) {
std::lock_guard lock(mtx_socket_1);
return co_spawn(ctx_, readFromRemote(socket_1_, 1, user_id), asio::use_future).get();
}
int request2(int user_id) {
std::lock_guard lock(mtx_socket_2);
return co_spawn(ctx_, readFromRemote(socket_2_, 2, user_id), asio::use_future).get();
}
private:
asio::io_context& ctx_;
asio::ip::tcp::socket socket_1_;
asio::ip::tcp::socket socket_2_;
std::mutex mtx_socket_1;
std::mutex mtx_socket_2;
};
void user(Client& client, int user_id) {
while (true) {
UTRACE() << "user thread " << user_id << std::endl;
sleep_for(2s);
client.request1(user_id);
sleep_for(2s);
client.request12(user_id);
sleep_for(2s);
client.request2(user_id);
}
}
void user2(Client& client, int user_id) {
while (true) {
UTRACE() << "user thread " << user_id << std::endl;
sleep_for(2s);
client.request2(user_id);
}
}
asio::awaitable<void> keep_alive() {
auto ex = co_await asio::this_coro::executor;
while (true) {
co_await asio::steady_timer(ex, 2s).async_wait();
TRACE() << "keep alive" << std::endl;
}
}
#include <thread>
int main() {
asio::io_context ctx;
co_spawn(ctx, keep_alive, asio::detached);
std::jthread async_thread([&ctx] { ctx.run(); });
Client client(ctx);
std::vector<std::jthread> users;
users.emplace_back([&client] { user(client, 1); });
users.emplace_back([&client] { user(client, 2); });
users.emplace_back([&client] { user2(client, 3); });
}
With output like
1 | 0ms | user | 84 | user id:1 | user thread 1
2 | 0ms | user | 84 | user id:2 | user thread 2
3 | 0ms | user2 | 96 | user id:3 | user thread 3
4 | 1999ms | keep_alive | 106 | keep alive
4 | 2000ms | readFromRemote | 27 | user id:1 | sending request to socket 1
4 | 2000ms | readFromRemote | 27 | user id:3 | sending request to socket 2
4 | 4000ms | keep_alive | 106 | keep alive
4 | 6000ms | keep_alive | 106 | keep alive
4 | 7000ms | readFromRemote | 33 | user id:1 | sending request to socket 1 finished.
4 | 7000ms | readFromRemote | 35 | user id:1 | reading response from socket 1
4 | 7000ms | readFromRemote | 33 | user id:3 | sending request to socket 2 finished.
4 | 7000ms | readFromRemote | 35 | user id:3 | reading response from socket 2
4 | 8000ms | keep_alive | 106 | keep alive
4 | 10000ms | keep_alive | 106 | keep alive
4 | 12000ms | readFromRemote | 41 | user id:1 | reading response from socket 1 finished.
4 | 12000ms | readFromRemote | 41 | user id:3 | reading response from socket 2 finished.
4 | 12000ms | keep_alive | 106 | keep alive
3 | 12000ms | user2 | 96 | user id:3 | user thread 3
4 | 12000ms | readFromRemote | 27 | user id:2 | sending request to socket 1
4 | 14000ms | keep_alive | 106 | keep alive
4 | 14001ms | readFromRemote | 27 | user id:3 | sending request to socket 2
4 | 16001ms | keep_alive | 106 | keep alive
4 | 17001ms | readFromRemote | 33 | user id:2 | sending request to socket 1 finished.
4 | 17001ms | readFromRemote | 35 | user id:2 | reading response from socket 1
4 | 18001ms | keep_alive | 106 | keep alive
4 | 19001ms | readFromRemote | 33 | user id:3 | sending request to socket 2 finished.
4 | 19001ms | readFromRemote | 35 | user id:3 | reading response from socket 2
As far as I understand, for the request-response-scheme I need to make sure that only one request is getting access to a certain socket at any time to avoid that multiple requests are sent interleaved. If I implemented it without the mutexes, it wouldn't work, because
another co-routine could sneak in another async_write
while the previous one is still executing (the same with the read-operation).
I've read in the documentation that strands
are a means to avoid explicit locking (.html). However, I've found that often it doesn't seem to be necessary if the io_context
is single-threaded (which I think is the case in my example). Can I still use it here to get rid of the mutexes?
My idea would be to create a strand for each socket and then use it as context for all operations concerning this socket, but I didn't manage to do this. I was able to instantiate the strands, but then I didn't know what to do with them. Do I need to replace the co_awaits with calls to strand.post?
Thanks!
I would like to use ASIO to communicate with a device through TCP. More precisely there are multiple ports on the device that are used for communication. The communication is done in a request-response-matter. On the other side of my application is a user interface (say GRPC) which can be used to request information from the connected device. These requests can come at any time.
I have tried to replicate the scenario in an example:
Live On Coliru
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
namespace asio = boost::asio;
using namespace asio::experimental::awaitable_operators;
using namespace std::chrono_literals;
using std::this_thread::sleep_for;
static thread_local int const t_id = [] {
static std::atomic_int gen = 0;
return ++gen;
}();
#include <syncstream>
static auto stamp() {
static const auto start = std::chrono::steady_clock::now();
return (std::chrono::steady_clock::now() - start)/1ms;
}
#define TRACE() \
std::osyncstream(std::cout) << std::setw(3) << t_id << " | " << std::setw(6) << stamp() << "ms | " \
<< std::setw(20) << __FUNCTION__ << " | " << std::setw(3) << __LINE__ \
<< " | "
#define UTRACE(U) TRACE() << "user id:" << user_id << " | "
asio::awaitable<int> readFromRemote(asio::ip::tcp::socket& socket, int socket_id, int user_id) {
UTRACE() << "sending request to socket " << socket_id << std::endl;
{
// Placeholder for co_await socket.async_write(...);
asio::steady_timer timer(socket.get_executor(), 5s);
co_await timer.async_wait();
}
UTRACE() << "sending request to socket " << socket_id << " finished." << std::endl;
UTRACE() << "reading response from socket " << socket_id << std::endl;
{
// Placeholder for co_await socket.async_read(...);
asio::steady_timer timer(socket.get_executor(), 5s);
co_await timer.async_wait();
}
UTRACE() << "reading response from socket " << socket_id << " finished." << std::endl;
co_return 2;
}
class Client {
public:
Client(asio::io_context& ctx) : ctx_{ctx}, socket_1_{ctx}, socket_2_{ctx} {
asio::ip::tcp::endpoint //
ep1(asio::ip::make_address_v4("1.1.1.1"), 10),
ep2(asio::ip::make_address_v4("1.1.1.1"), 20);
co_spawn(ctx,
(socket_1_.async_connect(ep1, asio::as_tuple(asio::use_awaitable)) &&
socket_2_.async_connect(ep2, asio::as_tuple(asio::use_awaitable))),
asio::detached);
}
int request1(int user_id) {
std::lock_guard lock(mtx_socket_1);
return co_spawn(ctx_, readFromRemote(socket_1_, 1, user_id), asio::use_future).get();
}
int request12(int user_id) {
std::lock_guard lock(mtx_socket_1);
return co_spawn(ctx_, readFromRemote(socket_1_, 1, user_id), asio::use_future).get();
}
int request2(int user_id) {
std::lock_guard lock(mtx_socket_2);
return co_spawn(ctx_, readFromRemote(socket_2_, 2, user_id), asio::use_future).get();
}
private:
asio::io_context& ctx_;
asio::ip::tcp::socket socket_1_;
asio::ip::tcp::socket socket_2_;
std::mutex mtx_socket_1;
std::mutex mtx_socket_2;
};
void user(Client& client, int user_id) {
while (true) {
UTRACE() << "user thread " << user_id << std::endl;
sleep_for(2s);
client.request1(user_id);
sleep_for(2s);
client.request12(user_id);
sleep_for(2s);
client.request2(user_id);
}
}
void user2(Client& client, int user_id) {
while (true) {
UTRACE() << "user thread " << user_id << std::endl;
sleep_for(2s);
client.request2(user_id);
}
}
asio::awaitable<void> keep_alive() {
auto ex = co_await asio::this_coro::executor;
while (true) {
co_await asio::steady_timer(ex, 2s).async_wait();
TRACE() << "keep alive" << std::endl;
}
}
#include <thread>
int main() {
asio::io_context ctx;
co_spawn(ctx, keep_alive, asio::detached);
std::jthread async_thread([&ctx] { ctx.run(); });
Client client(ctx);
std::vector<std::jthread> users;
users.emplace_back([&client] { user(client, 1); });
users.emplace_back([&client] { user(client, 2); });
users.emplace_back([&client] { user2(client, 3); });
}
With output like
1 | 0ms | user | 84 | user id:1 | user thread 1
2 | 0ms | user | 84 | user id:2 | user thread 2
3 | 0ms | user2 | 96 | user id:3 | user thread 3
4 | 1999ms | keep_alive | 106 | keep alive
4 | 2000ms | readFromRemote | 27 | user id:1 | sending request to socket 1
4 | 2000ms | readFromRemote | 27 | user id:3 | sending request to socket 2
4 | 4000ms | keep_alive | 106 | keep alive
4 | 6000ms | keep_alive | 106 | keep alive
4 | 7000ms | readFromRemote | 33 | user id:1 | sending request to socket 1 finished.
4 | 7000ms | readFromRemote | 35 | user id:1 | reading response from socket 1
4 | 7000ms | readFromRemote | 33 | user id:3 | sending request to socket 2 finished.
4 | 7000ms | readFromRemote | 35 | user id:3 | reading response from socket 2
4 | 8000ms | keep_alive | 106 | keep alive
4 | 10000ms | keep_alive | 106 | keep alive
4 | 12000ms | readFromRemote | 41 | user id:1 | reading response from socket 1 finished.
4 | 12000ms | readFromRemote | 41 | user id:3 | reading response from socket 2 finished.
4 | 12000ms | keep_alive | 106 | keep alive
3 | 12000ms | user2 | 96 | user id:3 | user thread 3
4 | 12000ms | readFromRemote | 27 | user id:2 | sending request to socket 1
4 | 14000ms | keep_alive | 106 | keep alive
4 | 14001ms | readFromRemote | 27 | user id:3 | sending request to socket 2
4 | 16001ms | keep_alive | 106 | keep alive
4 | 17001ms | readFromRemote | 33 | user id:2 | sending request to socket 1 finished.
4 | 17001ms | readFromRemote | 35 | user id:2 | reading response from socket 1
4 | 18001ms | keep_alive | 106 | keep alive
4 | 19001ms | readFromRemote | 33 | user id:3 | sending request to socket 2 finished.
4 | 19001ms | readFromRemote | 35 | user id:3 | reading response from socket 2
As far as I understand, for the request-response-scheme I need to make sure that only one request is getting access to a certain socket at any time to avoid that multiple requests are sent interleaved. If I implemented it without the mutexes, it wouldn't work, because
another co-routine could sneak in another async_write
while the previous one is still executing (the same with the read-operation).
I've read in the documentation that strands
are a means to avoid explicit locking (https://live.boost.org/doc/libs/1_87_0/doc/html/boost_asio/overview/core/strands.html). However, I've found that often it doesn't seem to be necessary if the io_context
is single-threaded (which I think is the case in my example). Can I still use it here to get rid of the mutexes?
My idea would be to create a strand for each socket and then use it as context for all operations concerning this socket, but I didn't manage to do this. I was able to instantiate the strands, but then I didn't know what to do with them. Do I need to replace the co_awaits with calls to strand.post?
Thanks!
Share Improve this question edited 2 days ago Michael Chourdakis 11.2k3 gold badges47 silver badges89 bronze badges asked 2 days ago kniddlkniddl 111 bronze badge 02 Answers
Reset to default 1The mutexes do very little at all. The only thing guarded by it are the initiations. That ONLY prevents the data race on the socket object instances.
Much more importantly, you are using a future to block on the result of prior request. Because you block holding that mutex, that is what prevents overlapping IO operations.
To leverage strands, you should not be using the ctx
, but typically dispatch()
initiation to the strand of the socket.
Note that strands, like mutexes, protect shared state, not "code events". In your example you have abstracted away a lot of the shared state.
However, I've found that often it doesn't seem to be necessary if the io_context is single-threaded (which I think is the case in my example).
Your example isn't singlethreaded, because your users are on separate threads.
Can I still use it here to get rid of the mutexes?
Yes. Here's a start:
Live On Coliru
#include <boost/asio.hpp>
#include <iostream>
namespace asio = boost::asio;
using namespace std::chrono_literals;
using asio::ip::tcp;
#include <syncstream>
namespace {
using std::this_thread::sleep_for;
static thread_local int const t_id = [] {
static std::atomic_int gen = 0;
return ++gen;
}();
static thread_local std::atomic_int t_uid = 0;
static auto stamp() {
static auto const start = std::chrono::steady_clock::now();
return (std::chrono::steady_clock::now() - start) / 1ms;
}
std::mutex s_mtx;
std::vector<std::pair<asio::strand<asio::io_context::executor_type>, std::string>> s_strand_names;
auto named_strand(auto ex, auto name) {
auto s = make_strand(ex);
std::lock_guard lk(s_mtx);
s_strand_names.emplace_back(s, std::move(name));
return s;
}
auto trace_context() {
std::ostringstream logical, ctx;
logical << (t_uid ? "user:" + std::to_string(t_uid) : "I/O");
for (std::lock_guard lk(s_mtx); auto& [ex, name] : s_strand_names)
if (ex.running_in_this_thread())
logical << " [" << name << "]";
ctx << std::setw(4) << "T:" << t_id << std::setw(10) << std::move(logical).str();
return ctx.str();
}
#define TRACE(U) \
std::osyncstream(std::cout) << trace_context() << " | " << std::setw(6) << stamp() \
<< "ms | " /* << std::setw(20) << __FUNCTION__ << " | " */ \
<< std::setw(3) << __LINE__ << " | "
} // namespace
asio::awaitable<int> readFromRemote(tcp::socket& s, int id) {
TRACE() << "begin request socket " << id << std::endl;
co_await asio::steady_timer(s.get_executor(), rand() % 512 * 1ms).async_wait();
TRACE() << "completed request socket " << id << " finished." << std::endl;
co_return 2;
}
struct Client {
Client(auto ex) : s1_{named_strand(ex, "S1")}, s2_{named_strand(ex, "S2")} {
#if 0
auto host = asio::ip::make_address_v4("1.1.1.1");
{ std::lock_guard lk(m1_); s1_.connect({host, 10}); }
{ std::lock_guard lk(m2_); s2_.connect({host, 20}); }
#endif
}
int request1a() { return co_spawn(s1_.get_executor(), readFromRemote(s1_, 1), asio::use_future).get(); }
int request1b() { return co_spawn(s1_.get_executor(), readFromRemote(s1_, 1), asio::use_future).get(); }
int request2() { return co_spawn(s2_.get_executor(), readFromRemote(s2_, 2), asio::use_future).get(); }
private:
tcp::socket s1_, s2_;
};
void user(Client& client, int user_id) {
for (t_uid = user_id;;) {
TRACE() << "user thread" << std::endl;
sleep_for(2s); client.request1a();
sleep_for(2s); client.request1b();
sleep_for(2s); client.request2();
}
}
void user2(Client& client, int user_id) {
for (t_uid = user_id;;) {
TRACE() << "user thread, type 2" << std::endl;
sleep_for(2s); client.request2();
}
}
#include <thread>
int main() {
TRACE() << "main thread" << std::endl;
asio::io_context ctx;
Client client(ctx.get_executor());
std::vector<std::jthread> users;
users.emplace_back([&client] { user(client, 1); });
users.emplace_back([&client] { user(client, 2); });
users.emplace_back([&client] { user2(client, 3); });
TRACE() << "main thread run" << std::endl;
auto keep_alive = make_work_guard(ctx);
ctx.run();
TRACE() << "main thread finished" << std::endl;
}
Printing
g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp
./a.out& sleep 20; kill %1
T:1 I/O | 0ms | 90 | main thread
T:2 user:1 | 0ms | 74 | user thread
T:3 user:2 | 0ms | 74 | user thread
T:1 I/O | 0ms | 100 | main thread run
T:4 user:3 | 0ms | 83 | user thread, type 2
T:1 I/O [S1] | 2000ms | 50 | begin request socket 1
T:1 I/O [S2] | 2000ms | 50 | begin request socket 2
T:1 I/O [S1] | 2000ms | 50 | begin request socket 1
T:1 I/O [S1] | 2105ms | 52 | completed request socket 1 finished.
T:1 I/O [S1] | 2359ms | 52 | completed request socket 1 finished.
T:1 I/O [S2] | 2454ms | 52 | completed request socket 2 finished.
T:4 user:3 | 2454ms | 83 | user thread, type 2
T:1 I/O [S1] | 4106ms | 50 | begin request socket 1
T:1 I/O [S1] | 4221ms | 52 | completed request socket 1 finished.
T:1 I/O [S1] | 4360ms | 50 | begin request socket 1
T:1 I/O [S1] | 4441ms | 52 | completed request socket 1 finished.
T:1 I/O [S2] | 4455ms | 50 | begin request socket 2
T:1 I/O [S2] | 4710ms | 52 | completed request socket 2 finished.
T:4 user:3 | 4710ms | 83 | user thread, type 2
T:1 I/O [S2] | 6221ms | 50 | begin request socket 2
T:1 I/O [S2] | 6295ms | 52 | completed request socket 2 finished.
T:3 user:2 | 6295ms | 74 | user thread
T:1 I/O [S2] | 6441ms | 50 | begin request socket 2
T:1 I/O [S2] | 6677ms | 52 | completed request socket 2 finished.
T:2 user:1 | 6677ms | 74 | user thread
T:1 I/O [S2] | 6710ms | 50 | begin request socket 2
T:1 I/O [S2] | 7007ms | 52 | completed request socket 2 finished.
T:4 user:3 | 7007ms | 83 | user thread, type 2
T:1 I/O [S1] | 8295ms | 50 | begin request socket 1
T:1 I/O [S1] | 8501ms | 52 | completed request socket 1 finished.
T:1 I/O [S1] | 8677ms | 50 | begin request socket 1
T:1 I/O [S1] | 8864ms | 52 | completed request socket 1 finished.
T:1 I/O [S2] | 9026ms | 50 | begin request socket 2
T:1 I/O [S2] | 9453ms | 52 | completed request socket 2 finished.
T:4 user:3 | 9454ms | 83 | user thread, type 2
T:1 I/O [S1] | 10501ms | 50 | begin request socket 1
T:1 I/O [S1] | 10864ms | 50 | begin request socket 1
T:1 I/O [S1] | 10999ms | 52 | completed request socket 1 finished.
T:1 I/O [S1] | 11115ms | 52 | completed request socket 1 finished.
T:1 I/O [S2] | 11454ms | 50 | begin request socket 2
T:1 I/O [S2] | 11937ms | 52 | completed request socket 2 finished.
T:4 user:3 | 11937ms | 83 | user thread, type 2
T:1 I/O [S2] | 12999ms | 50 | begin request socket 2
T:1 I/O [S2] | 13115ms | 50 | begin request socket 2
T:1 I/O [S2] | 13239ms | 52 | completed request socket 2 finished.
T:2 user:1 | 13239ms | 74 | user thread
T:1 I/O [S2] | 13325ms | 52 | completed request socket 2 finished.
T:3 user:2 | 13325ms | 74 | user thread
T:1 I/O [S2] | 13937ms | 50 | begin request socket 2
T:1 I/O [S2] | 14131ms | 52 | completed request socket 2 finished.
T:4 user:3 | 14131ms | 83 | user thread, type 2
T:1 I/O [S1] | 15240ms | 50 | begin request socket 1
T:1 I/O [S1] | 15324ms | 52 | completed request socket 1 finished.
T:1 I/O [S1] | 15326ms | 50 | begin request socket 1
T:1 I/O [S1] | 15830ms | 52 | completed request socket 1 finished.
T:1 I/O [S2] | 16132ms | 50 | begin request socket 2
T:1 I/O [S2] | 16415ms | 52 | completed request socket 2 finished.
T:4 user:3 | 16415ms | 83 | user thread, type 2
T:1 I/O [S1] | 17324ms | 50 | begin request socket 1
T:1 I/O [S1] | 17812ms | 52 | completed request socket 1 finished.
T:1 I/O [S1] | 17830ms | 50 | begin request socket 1
T:1 I/O [S1] | 18318ms | 52 | completed request socket 1 finished.
T:1 I/O [S2] | 18415ms | 50 | begin request socket 2
T:1 I/O [S2] | 18812ms | 52 | completed request socket 2 finished.
T:4 user:3 | 18812ms | 83 | user thread, type 2
T:1 I/O [S2] | 19813ms | 50 | begin request socket 2
Generalize
I'd prefer when you can "just compose" things. In that vain, consider replacing the coro by composed initiation functions:
template <asio::completion_token_for<void(int)> Token = asio::deferred_t>
auto asyncReadFromRemote(tcp::socket& s, int id, Token&& token = {}) {
return async_initiate<Token, void(int)>(
asio::co_composed(
[](auto state, int id, tcp::socket& s) -> void {
auto tstrand = bind_executor(s.get_executor(), asio::deferred);
co_await asio::dispatch(tstrand);
TRACE() << "begin request socket " << id << std::endl;
co_await asio::steady_timer(s.get_executor(), rand() % 512 * 1ms).async_wait(tstrand);
TRACE() << "completed request socket " << id << " finished." << std::endl;
co_yield state.complete(2);
},
s),
token, id, std::ref(s));
}
Now you can still "just" use them like before: Live On Coliru
int request1a() { return asyncReadFromRemote(s1_, 1, asio::use_future).get(); }
int request1b() { return asyncReadFromRemote(s1_, 1, asio::use_future).get(); }
int request2() { return asyncReadFromRemote(s2_, 2, asio::use_future).get(); }
But I'd actually prefer moving that decision on the caller:
auto asyncRequest1a(auto&& token) {
return asyncReadFromRemote(s1_, 1, std::forward<decltype(token)>(token));
}
auto asyncRequest1b(auto&& token) {
return asyncReadFromRemote(s1_, 1, std::forward<decltype(token)>(token));
}
auto asyncRequest2(auto&& token) {
return asyncReadFromRemote(s2_, 2, std::forward<decltype(token)>(token));
}
And then either: Live On Coliru
void user(Client& client, int user_id) {
for (t_uid = user_id;;) {
TRACE() << "user thread" << std::endl;
sleep_for(2s);
client.asyncRequest1a(asio::use_future).get();
sleep_for(2s);
client.asyncRequest1b(asio::use_future).get();
sleep_for(2s);
client.asyncRequest2(asio::use_future).get();
}
}
void user2(Client& client, int user_id) {
for (t_uid = user_id;;) {
TRACE() << "user thread, type 2" << std::endl;
sleep_for(2s);
client.asyncRequest2(asio::use_future).get();
}
}
Or, indeed, keeping it TRULY single-threaded:
Live On Coliru
#include <boost/asio.hpp>
#include <iostream>
namespace asio = boost::asio;
using namespace std::chrono_literals;
using asio::ip::tcp;
#include <syncstream>
namespace {
static auto stamp() {
static auto const start = std::chrono::steady_clock::now();
return (std::chrono::steady_clock::now() - start) / 1ms;
}
std::mutex s_mtx;
std::vector<std::pair<asio::strand<asio::io_context::executor_type>, std::string>> s_strand_names;
auto named_strand(auto ex, auto name) {
auto s = make_strand(ex);
std::lock_guard lk(s_mtx);
s_strand_names.emplace_back(s, std::move(name));
return s;
}
auto trace_context() {
std::ostringstream oss;
for (std::lock_guard lk(s_mtx); auto& [ex, name] : s_strand_names)
if (ex.running_in_this_thread())
oss << "[" << name << "]";
return oss.str();
}
#define TRACE(U) \
std::osyncstream(std::cout) << std::setw(6) << stamp() << "ms | " << std::setw(3) << __LINE__ << " |" \
<< std::setw(5) << trace_context() << " | "
} // namespace
template <asio::completion_token_for<void(int)> Token = asio::deferred_t>
auto asyncReadFromRemote(tcp::socket& s, int id, Token&& token = {}) {
return async_initiate<Token, void(int)>(
asio::co_composed(
[](auto state, int id, tcp::socket& s) -> void {
auto tstrand = bind_executor(s.get_executor(), asio::deferred);
co_await asio::dispatch(tstrand);
TRACE() << "begin request socket " << id << std::endl;
co_await asio::steady_timer(s.get_executor(), rand() % 512 * 1ms).async_wait(tstrand);
TRACE() << "completed request socket " << id << " finished." << std::endl;
co_yield state.complete(2);
},
s),
token, id, std::ref(s));
}
struct Client {
Client(auto ex) : s1_{named_strand(ex, "S1")}, s2_{named_strand(ex, "S2")} {
#if 0
auto host = asio::ip::make_address_v4("1.1.1.1");
{ std::lock_guard lk(m1_); s1_.connect({host, 10}); }
{ std::lock_guard lk(m2_); s2_.connect({host, 20}); }
#endif
}
template <typename Token = asio::deferred_t> auto asyncRequest1a(Token&& token = {}) {
return asyncReadFromRemote(s1_, 1, std::forward<decltype(token)>(token));
}
template <typename Token = asio::deferred_t> auto asyncRequest1b(Token&& token = {}) {
return asyncReadFromRemote(s1_, 1, std::forward<decltype(token)>(token));
}
template <typename Token = asio::deferred_t> auto asyncRequest2(Token&& token = {}) {
return asyncReadFromRemote(s2_, 2, std::forward<decltype(token)>(token));
}
private:
tcp::socket s1_, s2_;
};
asio::awaitable<void> delay_for(auto duration) {
co_await asio::steady_timer(co_await asio::this_coro::executor, duration).async_wait();
}
asio::awaitable<void> user(Client& client, int user_id) {
for (;;) {
TRACE() << "user thread, user_id:" << user_id << std::endl;
co_await delay_for(2s);
co_await client.asyncRequest1a();
co_await delay_for(2s);
co_await client.asyncRequest1b();
co_await delay_for(2s);
co_await client.asyncRequest2();
}
}
asio::awaitable<void> user2(Client& client, int user_id) {
for (;;) {
TRACE() << "user thread, type 2, user_id:" << user_id << std::endl;
co_await delay_for(2s);
co_await client.asyncRequest2();
}
}
#include <boost/asio/experimental/awaitable_operators.hpp>
using namespace boost::asio::experimental::awaitable_operators;
int main() {
asio::io_context ctx(BOOST_ASIO_CONCURRENCY_HINT_1);
Client client(ctx.get_executor());
co_spawn(ctx, user(client, 1) && user(client, 2) && user2(client, 3), asio::detached);
auto keep_alive = make_work_guard(ctx);
ctx.run();
TRACE() << "done" << std::endl;
}
Printing e.g.
g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp
./a.out& sleep 20; kill %1
0ms | 85 | | user thread, user_id:1
0ms | 85 | | user thread, user_id:2
0ms | 97 | | user thread, type 2, user_id:3
2000ms | 47 | [S1] | begin request socket 1
2000ms | 47 | [S1] | begin request socket 1
2000ms | 47 | [S2] | begin request socket 2
2105ms | 49 | [S2] | completed request socket 2 finished.
2105ms | 97 | [S2] | user thread, type 2, user_id:3
2359ms | 49 | [S1] | completed request socket 1 finished.
2454ms | 49 | [S1] | completed request socket 1 finished.
4105ms | 47 | [S2] | begin request socket 2
4220ms | 49 | [S2] | completed request socket 2 finished.
4220ms | 97 | [S2] | user thread, type 2, user_id:3
4359ms | 47 | [S1] | begin request socket 1
4440ms | 49 | [S1] | completed request socket 1 finished.
4454ms | 47 | [S1] | begin request socket 1
4709ms | 49 | [S1] | completed request socket 1 finished.
6220ms | 47 | [S2] | begin request socket 2
6295ms | 49 | [S2] | completed request socket 2 finished.
6295ms | 97 | [S2] | user thread, type 2, user_id:3
6440ms | 47 | [S2] | begin request socket 2
6676ms | 49 | [S2] | completed request socket 2 finished.
6677ms | 85 | [S2] | user thread, user_id:1
6709ms | 47 | [S2] | begin request socket 2
7006ms | 49 | [S2] | completed request socket 2 finished.
7006ms | 85 | [S2] | user thread, user_id:2
8295ms | 47 | [S2] | begin request socket 2
8500ms | 49 | [S2] | completed request socket 2 finished.
8500ms | 97 | [S2] | user thread, type 2, user_id:3
8677ms | 47 | [S1] | begin request socket 1
8863ms | 49 | [S1] | completed request socket 1 finished.
9007ms | 47 | [S1] | begin request socket 1
9434ms | 49 | [S1] | completed request socket 1 finished.
10500ms | 47 | [S2] | begin request socket 2
10863ms | 47 | [S1] | begin request socket 1
10998ms | 49 | [S2] | completed request socket 2 finished.
10998ms | 97 | [S2] | user thread, type 2, user_id:3
11114ms | 49 | [S1] | completed request socket 1 finished.
11434ms | 47 | [S1] | begin request socket 1
11917ms | 49 | [S1] | completed request socket 1 finished.
12999ms | 47 | [S2] | begin request socket 2
13114ms | 47 | [S2] | begin request socket 2
13238ms | 49 | [S2] | completed request socket 2 finished.
13238ms | 85 | [S2] | user thread, user_id:1
13325ms | 49 | [S2] | completed request socket 2 finished.
13325ms | 97 | [S2] | user thread, type 2, user_id:3
13917ms | 47 | [S2] | begin request socket 2
14112ms | 49 | [S2] | completed request socket 2 finished.
14112ms | 85 | [S2] | user thread, user_id:2
15238ms | 47 | [S1] | begin request socket 1
15323ms | 49 | [S1] | completed request socket 1 finished.
15325ms | 47 | [S2] | begin request socket 2
15829ms | 49 | [S2] | completed request socket 2 finished.
15829ms | 97 | [S2] | user thread, type 2, user_id:3
16112ms | 47 | [S1] | begin request socket 1
16395ms | 49 | [S1] | completed request socket 1 finished.
17323ms | 47 | [S1] | begin request socket 1
17811ms | 49 | [S1] | completed request socket 1 finished.
17829ms | 47 | [S2] | begin request socket 2
18316ms | 49 | [S2] | completed request socket 2 finished.
18316ms | 97 | [S2] | user thread, type 2, user_id:3
18395ms | 47 | [S1] | begin request socket 1
18792ms | 49 | [S1] | completed request socket 1 finished.
19811ms | 47 | [S2] | begin request socket 2
24h Update
Posting as a separate answer because of [SO] post length limitations...
24h Update
Posting as a separate answer because of [SO] post length limitations...
Because I couldn't leave you hanging with the - rightful - comments, here's a version that fills in the gaps:
- doesn't abstract away buffer and communication
- fixes
Client
into an "interface" class (API
) that composes two actualClients
. This was one reason why your design was going to run into complexity limits: each of the endpoints was going to have repeated logic. It's now all inside thisClient
class: - It waits until a connection has been established before actually trying to process any requests
- It adds reconnect logic and tries to be smart about retrying a failed message if it can tell the server did not receive one
- still completely single-threaded
As such, API
is now as simple as:
struct API {
API(auto ex, tcp::endpoint ep1, [[maybe_unused]] tcp::endpoint ep2)
: c1_{make_strand(ex), ep1}, c2_{make_strand(ex), ep2} {}
template <typename Token = asio::deferred_t> auto asyncRequest1(Request req, Token&& token = {}) {
return c1_.asyncRequest(std::move(req), std::forward<decltype(token)>(token));
}
template <typename Token = asio::deferred_t> auto asyncRequest2(Request req, Token&& token = {}) {
return c2_.asyncRequest(std::move(req), std::forward<decltype(token)>(token));
}
private:
Client c1_, c2_;
};
And it defers all the logic per endpoint to Client
:
struct Client {
using Request = std::string;
using Response = std::string;
using Completion = void(error_code const&, Response);
using Handler = asio::any_completion_handler<Completion>;
using Queue = std::deque<std::pair<Request, Handler>>;
Client(asio::any_io_executor ex, tcp::endpoint ep) : ex_{ex}, ep_{ep} {
co_spawn(ex, connect(), asio::detached);
}
template <typename Token = asio::deferred_t> auto asyncRequest(Request req, Token&& token = {}) {
return asio::async_initiate<Token, void(error_code, Response)>(
[this](auto handler, Request req) { enqueue(std::move(req), std::move(handler)); }, //
token, std::move(req));
}
private:
// type erased to ease handler storage
void enqueue(Request req, Handler handler) ;
void startRequestLoop() { asio::co_spawn(ex_, requestLoop(), asio::detached); }
// all coros below are on the strand
asio::awaitable<void> connect();
asio::awaitable<void> reconnect();
asio::awaitable<void> requestLoop();
asio::any_io_executor ex_;
tcp::endpoint ep_;
Queue queue_;
struct Control {
tcp::socket stream_;
std::string buffer_;
// Not strictly needed, but as an example, since if the queue is empty,
// Client would not otherwise represent work:
asio::executor_work_guard<asio::any_io_executor> work_;
};
std::atomic_bool connected_{false};
std::shared_ptr<Control> ctl_;
};
Obviously, that needs a demo, so here goes:
Live On Coliru
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <deque>
#include <iostream>
namespace asio = boost::asio;
using namespace boost::asio::experimental::awaitable_operators;
using namespace std::chrono_literals;
using asio::ip::tcp;
using error_code = boost::system::error_code;
#include <syncstream>
namespace {
static auto stamp() {
static auto const start = std::chrono::steady_clock::now();
return (std::chrono::steady_clock::now() - start) / 1ms;
}
std::mutex s_mtx;
std::vector<std::pair<asio::strand<asio::io_context::executor_type>, std::string>> s_strand_names;
auto named_strand(auto ex, auto name) {
auto s = make_strand(ex);
std::lock_guard lk(s_mtx);
s_strand_names.emplace_back(s, std::move(name));
return s;
}
auto strand_context() {
std::ostringstream oss;
for (std::lock_guard lk(s_mtx); auto& [ex, name] : s_strand_names)
if (ex.running_in_this_thread())
oss << "[" << name << "]";
return oss.str();
}
#define TRACE(U) \
std::osyncstream(std::cout) << std::setw(6) << stamp() << "ms | " << std::setw(3) << __LINE__ << " |" \
<< std::setw(5) << strand_context() << " | " << std::setw(13) \
<< __FUNCTION__ << " | "
} // namespace
static asio::awaitable<void> delay_for(auto duration) {
co_await asio::steady_timer(co_await asio::this_coro::executor, duration).async_wait();
}
struct Client {
#define CTRACE() TRACE() << std::setw(12) << this->ep_ << " | "
using Request = std::string;
using Response = std::string;
using Completion = void(error_code const&, Response);
using Handler = asio::any_completion_handler<Completion>;
using Queue = std::deque<std::pair<Request, Handler>>;
Client(asio::any_io_executor ex, tcp::endpoint ep) : ex_{ex}, ep_{ep} {
co_spawn(ex, connect(), asio::detached);
}
template <typename Token = asio::deferred_t> auto asyncRequest(Request req, Token&& token = {}) {
return asio::async_initiate<Token, void(error_code, Response)>(
[this](auto handler, Request req) { enqueue(std::move(req), std::move(handler)); }, //
token, std::move(req));
}
private:
// type erased to ease handler storage
void enqueue(Request req, Handler handler) {
asio::dispatch( //
ex_,
[this, req = std::move(req), handler = std::move(handler)] //
() mutable {
CTRACE() << "queuing request #" << queue_.size() << " " << quoted(req) << std::endl;
queue_.emplace_back(std::move(req), std::move(handler));
if (connected_ && queue_.size() == 1) // need to (re)launch loop?
startRequestLoop();
});
}
void startRequestLoop() { asio::co_spawn(ex_, requestLoop(), asio::detached); }
asio::awaitable<void> connect() { // on strand
for (;; co_await delay_for(3s)) {
CTRACE() << "(Re)connecting" << std::endl;
tcp::socket attempt(ex_);
if (auto [ec] = co_await attempt.async_connect(ep_, asio::as_tuple); ec) {
CTRACE() << "Connection failed: " << ec.message() << std::endl;
} else {
// in case requests were queued before the connection was established
ctl_ = std::make_shared<Control>(std::move(attempt), "", make_work_guard(ex_));
CTRACE() << "Connected" << std::endl;
if (!connected_.exchange(true))
startRequestLoop(); // check outbox
break;
}
}
}
asio::awaitable<void> reconnect() {
if (connected_.exchange(false)) {
CTRACE() << "Disconnected" << std::endl;
ctl_.reset();
}
co_await connect();
}
asio::awaitable<void> requestLoop() {
namespace ae = asio::experimental;
bool should_reconnect = false;
// CTRACE() << "Request loop started" << std::endl;
// copy of ctl is for convenience, so we don't have to check between resumes
for (auto ctl = ctl_; ctl && !queue_.empty();) {
auto& [s_, buffer_, _] = *ctl;
auto& [req, handler] = queue_.front();
Response res;
if (!req.ends_with('\n'))
req += '\n';
// grouping with the read to detect when the server hung up before we tried to send
auto exchange = ae::make_parallel_group( //
async_read_until(s_, asio::dynamic_buffer(buffer_), '\n'),
async_write(s_, asio::buffer(req)));
auto [order, read_ec, received, write_ec, sent] =
co_await exchange.async_wait(ae::wait_for_all{}, asio::deferred);
if (read_ec)
CTRACE() << "Read error: " << read_ec.message() << std::endl;
if (write_ec)
CTRACE() << "Write error: " << write_ec.message() << std::endl;
if (auto& combined_ec = read_ec ? read_ec : write_ec; combined_ec) {
should_reconnect = //
read_ec != asio::error::operation_aborted && //
write_ec != asio::error::operation_aborted;
if (order == std::array{0ul, 1ul}) { // read completed before write, retry message
CTRACE() << "Retrying send of this message later" << std::endl;
} else {
// server might have processed it
CTRACE() << "Message considered lost, return failure" << std::endl;
// complete with error
std::move(handler)(combined_ec, std::move(res));
queue_.pop_front();
}
break;
}
if (received) {
res.assign(buffer_.data(), received - 1); // strips newline
buffer_.erase(0, received); // in case the server sends part of the next response...
}
std::move(handler)(error_code{}, std::move(res));
queue_.pop_front();
}
// CTRACE() << "Request loop exited" << std::endl;
if (should_reconnect)
co_return co_await reconnect();
}
asio::any_io_executor ex_;
tcp::endpoint ep_;
Queue queue_;
struct Control {
tcp::socket stream_;
std::string buffer_;
// Not strictly needed, but as an example, since if the queue is empty,
// Client would not otherwise represent work:
asio::executor_work_guard<asio::any_io_executor> work_;
};
std::atomic_bool connected_{false};
std::shared_ptr<Control> ctl_;
};
struct API {
API(auto ex, tcp::endpoint ep1, [[maybe_unused]] tcp::endpoint ep2)
: c1_{named_strand(ex, "S1"), ep1}
, c2_{named_strand(ex, "S2"), ep2}
{}
using Request = Client::Request;
template <typename Token = asio::deferred_t> auto asyncRequest1(Request req, Token&& token = {}) {
return c1_.asyncRequest(std::move(req), std::forward<decltype(token)>(token));
}
private:
Client c1_;
public:
template <typename Token = asio::deferred_t> auto asyncRequest2(Request req, Token&& token = {}) {
return c2_.asyncRequest(std::move(req), std::forward<decltype(token)>(token));
}
private:
Client c2_;
};
#define UTRACE() TRACE() << std::setw(12) << ("id:" + std::to_string(user_id)) << " | "
asio::awaitable<void> user(API& client, int user_id) {
for (unsigned i = 0;; ++i) {
try {
UTRACE() << " ===== user thread" << std::endl;
co_await delay_for(2s);
auto r1a = co_await client.asyncRequest1("Request 1a#" + std::to_string(i));
UTRACE() << "response 1a -> " << quoted(r1a) << std::endl;
co_await delay_for(2s);
auto r1b = co_await client.asyncRequest1("Request 1b#" + std::to_string(i));
UTRACE() << "response 1b -> " << quoted(r1b) << std::endl;
co_await delay_for(2s);
auto r2 = co_await client.asyncRequest2("Request 2#" + std::to_string(i));
UTRACE() << "response 2 -> " << quoted(r2) << std::endl;
} catch (boost::system::system_error const& se) {
UTRACE() << "Exception: " << se.code().message() << std::endl;
// if (se.code().has_location())
// UTRACE() << " from " << se.code().location().function_name() << std::endl;
}
}
}
asio::awaitable<void> user2([[maybe_unused]] API& client, int user_id) {
for ([[maybe_unused]] unsigned i = 0;; ++i) {
try {
UTRACE() << " ===== user2 thread" << std::endl;
co_await delay_for(2s);
auto r2 = co_await client.asyncRequest2("Request 2#" + std::to_string(i));
UTRACE() << "response 2 -> " << quoted(r2) << std::endl;
} catch (boost::system::system_error const& se) {
UTRACE() << "Exception: " << se.code().message() << std::endl;
// if (se.code().has_location())
// UTRACE() << " from " << se.code().location().function_name() << std::endl;
}
}
}
int main() {
asio::io_context ctx(BOOST_ASIO_CONCURRENCY_HINT_1);
API client(ctx.get_executor(), {{}, 7878}, {{}, 7879});
co_spawn(ctx, user(client, 1) && user(client, 2) && user2(client, 3), asio::detached);
ctx.run();
TRACE() << "done" << std::endl;
}