In this simple implementation, a server listener
coroutine is spawned onto an asio::io_context
to accept incoming connections, spawning a session
on accept.
At the same time 10K concurrent clients establish a connections to the server.
The server writes 1KB chunks of data to each client in a loop.
The main thread waits until all connections are established, sleeping for some time to let the server/clients exchange data.
Then, it sets the flag stop
to true, at which point the sessions
will return, destroying the underlying socket, which terminates the connection on the clients.
However, there are always some connections that will not be closed.
using asio::ip::tcp;
using Executor = asio::io_context::executor_type;
std::atomic_bool stop = false;
std::atomic_int32_t connected = 0;
int numConnections = 10'000;
namespace Client {
asio::awaitable<void, Executor> client() try {
const auto ex = co_await asio::this_coro::executor;
tcp::socket socket{ex};
tcp::resolver res{ex};
co_await async_connect(socket, res.resolve("localhost", "12345"), asio::deferred);
++connected;
for (std::vector<char> buf(1024);;)
co_await async_read(socket, asio::mutable_buffer(buf.data(), buf.size()), asio::deferred);
} catch (asio::system_error const &se) {
--connected;
}
}
namespace Server {
asio::awaitable<void, Executor> session(tcp::socket socket) try {
for (static const std::vector payload(1024, 'A'); !stop;) {
co_await async_write(socket, asio::buffer(payload),
asio::as_tuple(asio::deferred));
}
} catch (asio::system_error &err) {
log() << err.code().message() << std::endl;
}
asio::awaitable<void, Executor> listener() try {
const auto executor = co_await asio::this_coro::executor;
for (tcp::acceptor acceptor{executor, {{}, 12345}};;) {
co_spawn(executor, session(co_await acceptor.async_accept(asio::deferred)), asio::detached);
}
} catch (asio::system_error const &se) {
log() << se.code().message() << std::endl;
}
}
using namespace std::chrono_literals;
using std::this_thread::sleep_for;
#include <ranges>
int main() {
asio::io_context ctx;
asio::cancellation_signal signal;
co_spawn(ctx, Server::listener(), asio::bind_cancellation_slot(signal.slot(), asio::detached));
std::ranges::for_each(std::views::iota(0, numConnections), [&](int) {
co_spawn(ctx, Client::client(), asio::detached);
});
std::jthread th([&ctx] { ctx.run(); });
while (connected < numConnections) {
sleep_for(1s);
log() << "Clients connected: " << connected << std::endl;
}
sleep_for(5s);
stop = true;
signal.emit(asio::cancellation_type::terminal);
while (connected > 0) {
sleep_for(1s);
log() << "Clients still connected: " << connected << std::endl;
}
}
The output I get looks like the following:
main:74 Clients connected: 4097
main:74 Clients connected: 8107
main:74 Clients connected: 8107
main:74 Clients connected: 9174
main:74 Clients connected: 9174
client:25 All clients connected
main:74 Clients connected: 10000
listener:53 Operation aborted.
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
main:83 Clients still connected: 3047
At 3047 clients, it seems that we are stuck.
Since the connection is only terminated from the server session, the issue must lie in the fact that the coroutine is suspended in the async_write
call, never being resumed from there.
This should mean that there is no data to write, which is not clear to me since the clients should read the data, making buffer space available for the server to write to. Therefore, at some point, I would expect the async_write
to succeed, allowing the session to exit, terminating the connection.
Why do we run into this deadlock scenario here?