最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

c++ - how to connect asio blocking io_context with coroutines - Stack Overflow

programmeradmin4浏览0评论

So I am playing around with coroutines and managed to integrate a 3rd party http client, in this particular case "via-http-lib", for learning purposes, and it compiles and works. That lib provides a non-blocking http client powered by Asio under the hood and utilizes callbacks for event handling.

The goal of my learning program was how to integrate those callbacks into coroutines, so I can make async operations appear synchronous and I think I achieved that.

The problem is, I cannot make multiple GET requests after each other, despite co_awaiting them.

The problem is not my coroutine implementation, if I fake the server with a local reply it works. I believe the problem is that asio's io_context is blocking as long as a connection is active.

In that via- lib, a connection is closed when there is no data anymore. That should be the case at some point. However, it never reaches a 2nd co_await HttpClient::GET. More precisely, it never reaches the code directly after io_context.run().


I tried to run co_await req (and thus ultimately io_context) in another thread, which was not enough. Interestingly, I learned that if I add another thread-switch, e.g. wrap the co_await req in between 2 thread-switching co_awaits, it appears I can run as many GET requests as I want, and even if looping 10000 GET requests, I don't have more than 4 threads (according to Windows' task manager). But still it can't be the right solution. I would like to have a single threaded solution. The whole point of async and coroutines.

Unfortunately I have 0 experience with asio and I wonder how to integrate it properly into my use case. I thought the whole point of asio is to enable async work. But the asio examples I found are just procedural programs executed on top level, for example a single one shot call or the main while loop. However I am calling asio deep inside my object oriented program, and fail to translate the usage accordingly...

HttpResponseTask HttpClient::GET(const std::string& uri) const
{
    auto req = HttpRequest(host_, uri);
    co_await continue_on_new_thread();
    auto data = co_await req;
    co_await continue_on_new_thread();
    co_return data;
}

co_await req utilizes my awaitable HttpRequest implementation:

std::coroutine_handle<> HttpRequest::await_suspend(std::coroutine_handle<> h)
{
    std::osyncstream(std::cout) << "    HttpRequest await_suspend" << std::endl;
    impl_ = std::make_unique<HttpClientDetails::HttpClientImpl>(host_);
    HttpClientDetails::HttpResponseReadyCallback cb = [this, h](const std::string& msg)
    {
        this->response.setResponseText(msg);
        std::osyncstream(std::cout) << "    HttpRequest callback trying to resume with response text " << msg << " at thread " << std::this_thread::get_id() <<  std::endl;
        assert(h);
        h.resume();
    };
    impl_->onDataAvailable(cb);
    impl_->startRequest(uri_);
    return std::noop_coroutine();
}

Below is my wrapper of the 3rd party lib that actually makes the request. On the installed connect handler which is not shown here the lib's actual GET request happens. See my comment for asio below which I believe is the problem here:

int HttpClientImpl::startRequest(const std::string &uri)
{
    uri_ = uri;
    if (!http_client->connect(host_name_))
    {
        std::osyncstream(std::cout) << "Error, could not resolve host: " << host_name_ << std::endl;
        return 1;
    }

    print("io_context.run()");
    // Note: the call to io_context.run() will not return until the connection is closed.
    // .md
    io_context.run();
    http_client.reset();
    print("io_context.run() complete, shutdown successful");
    return 0;
}

Here is my basic "continue on new thread" awaitable:

 struct continue_on_new_thread {
    bool await_ready() const noexcept
    {
        std::osyncstream(std::cout) << "    switch_to_new_thread await_ready in thread " << std::this_thread::get_id() << std::endl;
        return false;
    }

    std::coroutine_handle<> await_suspend(std::coroutine_handle<> h)
    {
        std::osyncstream(std::cout) << "    continue_on_new_thread await_suspend" << std::endl;
        std::jthread([this, h]()
            {
                std::osyncstream(std::cout) << "    continue_on_new_thread --- continuing in thread " << std::this_thread::get_id() << std::endl;
                h.resume();
            }).detach();

        return std::noop_coroutine();
    }

    void await_resume()
    {
        std::osyncstream(std::cout) << "    continue_on_new_thread --- resuming in thread " << std::this_thread::get_id() << std::endl;
    }
};

Update: Added full example. You have to get the dependencies into /asio/asio-1.30.2/ and /via-httplib/.

main.cpp

#include <cassert>
#include <chrono>
#include <coroutine>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <syncstream>
#include <thread>

#define ASIO_STANDALONE

// Comment out to switch on a real server on localhost
// #define FAKE_SERVER_CALL

// Comment out to make it work with the thread-switching hack
// #define ACTIVATE_CO_AWAIT_HACK

// Adding the awaiting_coroutine_ as member to the final promise is also a possible implementation detail
// #define AWAITING_COROUTINE_IN_FINAL_PROMISE

#define REQUEST_DELAY_SECONDS 4

#ifndef FAKE_SERVER_CALL
        #include "via/comms/tcp_adaptor.hpp"
        #include "via/http/request_handler.hpp"
        #include "via/http/request_router.hpp"
        #include "via/http_client.hpp"
        #include "via/http_server.hpp"

/// Define an HTTP server using std::string to store message bodies
using http_server_type = via::http_server<via::comms::tcp_adaptor, std::string>;
using http_connection    = http_server_type::http_connection_type;
using http_request       = http_server_type::http_request;
using tx_response            = via::http::tx_response;
using Parameters             = via::http::Parameters;
#endif

#ifdef ACTIVATE_CO_AWAIT_HACK
struct continue_on_new_thread {
        bool await_ready() const noexcept {
                std::osyncstream(std::cout) << "    switch_to_new_thread await_ready in thread "
                                                                        << std::this_thread::get_id() << std::endl;
                return false;
        }

        std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) {
                std::osyncstream(std::cout) << "    switch_to_new_thread await_suspend" << std::endl;
                std::jthread([this, h]() {
                        std::osyncstream(std::cout) << "    switch_to_new_thread --- continuing in thread "
                                                                                << std::this_thread::get_id() << std::endl;
                        h.resume();
                }).detach();

                return std::noop_coroutine();
        }

        void await_resume() {
                std::osyncstream(std::cout) << "    switch_to_new_thread --- resuming in thread "
                                                                        << std::this_thread::get_id() << std::endl;
        }
};
#endif // ACTIVATE_CO_AWAIT_HACK

using namespace std::chrono_literals;

#ifdef FAKE_SERVER_CALL
namespace HttpClientDetails {
        using HttpResponseReadyCallback = std::function<void(std::string const& msg)>;

        class HttpClientImpl {
                std::jthread thread_;
                std::string  host_;

            public:
                HttpClientImpl(std::string const& host) {
                        std::osyncstream(std::cout) << "        HttpClientImpl() " << this << std::endl;
                }
                ~HttpClientImpl() {
                        std::osyncstream(std::cout) << "        ~HttpClientImpl() " << this << " at thread "
                                                                                << std::this_thread::get_id() << std::endl;
                }
                void startRequest(std::string const& uri) {
                        std::osyncstream(std::cout)
                                << "        start request " + uri + " at thread " << std::this_thread::get_id() << std::endl;
                        thread_ = std::jthread(
                                [this](std::string const& uri, HttpResponseReadyCallback const& cb) {
                                        std::this_thread::sleep_for(std::chrono::seconds(REQUEST_DELAY_SECONDS));
                                        // TODO Make real request...
                                        assert(cb);
                                        cb(uri);
                                },
                                uri, cb_);
                        thread_.detach();
                }
                void onDataAvailable(HttpResponseReadyCallback const& cb) { cb_ = cb; }

            private:
                HttpResponseReadyCallback cb_;
        };

} // namespace HttpClientDetails

#else

namespace HttpServerDetails {
        class HttpServerImpl {

                void request_handler(http_connection::weak_pointer weak_ptr, http_request const& request,
                                                         std::string const& body) {
                        std::cout << "Rx request: " << request.to_string();
                        std::cout << request.headers().to_string();
                        std::cout << "Rx body: " << body << std::endl;

                        http_connection::shared_pointer connection(weak_ptr.lock());
                        if (connection) {
                                // output the request
                                via::http::tx_response response(via::http::response_status::code::OK);
                                response.add_server_header();
                                response.add_date_header();

                                // respond with the client's address
                                std::string response_body("Hello, ");
                                response_body += connection->remote_address();
                                connection->send(std::move(response), std::move(response_body));
                        } else
                                std::cerr << "Failed to lock http_connection::weak_pointer" << std::endl;
                }

                tx_response get_hello_handler(http_request const&, // request,
                                                                            Parameters const&,   // parameters,
                                                                            std::string const&,  // data,
                                                                            std::string& response_body) {
                        response_body += "Hello";
                        return tx_response(via::http::response_status::code::OK);
                }

                tx_response get_world_handler(http_request const&, // request,
                                                                            Parameters const&,   // parameters,
                                                                            std::string const&,  // data,
                                                                            std::string& response_body) {
                        response_body += "World";
                        return tx_response(via::http::response_status::code::OK);
                }

            public:
                ASIO::io_context& io_context;
                HttpServerImpl(ASIO::io_context& c) : io_context{c} {}

                int run() {
                        unsigned short port_number(via::comms::tcp_adaptor::DEFAULT_HTTP_PORT);
                        std::cout << "server listening on port: " << port_number << std::endl;

                        try {
                                // The asio io_context.
                                // ASIO::io_context io_context;

                                // Create the HTTP server, attach the request handler
                                http_server_type http_server(io_context);

                                http_server.request_router().add_method("GET", "/Hello",
                                                                                                                [this](http_request const& r, Parameters const& p,
                                                                                                                             std::string const& s,
                                                                                                                             std::string& response_body) -> tx_response {
                                                                                                                        return get_hello_handler(r, p, s, response_body);
                                                                                                                });
                                http_server.request_router().add_method("GET", "/World",
                                                                                                                [this](http_request const& r, Parameters const& p,
                                                                                                                             std::string const& s,
                                                                                                                             std::string& response_body) -> tx_response {
                                                                                                                        return get_world_handler(r, p, s, response_body);
                                                                                                                });

                                // Accept IPV4 connections on the default port (80)
                                ASIO_ERROR_CODE error(http_server.accept_connections());
                                if (error) {
                                        std::cerr << "Error: " << error.message() << std::endl;
                                        return 1;
                                }

                                // Start the server
                                io_context.run();
                                std::osyncstream(std::cout) << "    After server io_context.run()" << std::endl;

                        } catch (std::exception& e) {
                                std::cerr << "Exception:" << e.what() << std::endl;
                                return 1;
                        }

                        return 0;
                }
        };

} // namespace HttpServerDetails

namespace HttpClientDetails {

        using HttpResponseReadyCallback = std::function<void(std::string const&)>;

        // This is basically the via-httplib example wrapped in a class,
        // and extended by:
        // - member variable onDataAvailableCallback_
        // - member variable responseText_
        class HttpClientImpl {
                ASIO::io_context io_context;
                std::string          host_name_;

                using http_client_type = via::http_client<via::comms::tcp_adaptor, std::string>;
                using http_response      = http_client_type::http_response;
                using http_chunk_type    = http_client_type::chunk_type;

                http_client_type::shared_pointer http_client;

                std::string uri_;

                void connected_handler() {
                        via::http::tx_request request(via::http::request_method::id::GET, uri_);
                        http_client->send(std::move(request));
                }

                void response_handler(http_response const& response, std::string const& body) {
                        std::osyncstream(std::cout)
                                << "Rx response: " << response.to_string() << response.headers().to_string();
                        std::osyncstream(std::cout) << "Rx body: " << body << std::endl;

                        if (!response.is_chunked()) {
                                http_client->disconnect();
                                if (onDataAvailableCallback_) {
                                        responseText_ += body;
                                        onDataAvailableCallback_(body);
                                }
                        }
                }

                void chunk_handler(http_chunk_type const& chunk, std::string const& data) {
                        if (chunk.is_last()) {
                                std::osyncstream(std::cout) << "Rx chunk is last, extension: " << chunk.extension()
                                                                                        << " trailers: " << chunk.trailers().to_string() << std::endl;
                                http_client->disconnect();

                                if (onDataAvailableCallback_) {
                                        responseText_ += data;
                                        onDataAvailableCallback_(data);
                                }
                        } else {
                                std::osyncstream(std::cout)
                                        << "Rx chunk, size: " << chunk.size() << " data: " << data << std::endl;
                        }
                }

                void disconnected_handler() { std::osyncstream(std::cout) << "Socket disconnected" << std::endl; }

                HttpResponseReadyCallback onDataAvailableCallback_;
                std::string                             responseText_;

            public:
                std::string getResponseText() const { return responseText_; }

                HttpClientImpl(std::string const& host) : host_name_(host) {
                        std::osyncstream(std::cout) << "        HttpClientImpl() " << this << std::endl;

                        http_client = http_client_type::create(
                                io_context,
                                [this](http_response const& res, std::string const& st) { response_handler(res, st); },
                                [this](http_chunk_type const& chunk, std::string const& data) {
                                        chunk_handler(chunk, data);
                                });

                        http_client->connected_event([this]() { connected_handler(); });

                        http_client->disconnected_event([this]() { disconnected_handler(); });
                }

                ~HttpClientImpl() {
                        std::osyncstream(std::cout) << "        ~HttpClientImpl() " << this << " at thread "
                                                                                << std::this_thread::get_id() << std::endl;
                }

                int startRequest(std::string const& uri) {
                        std::osyncstream(std::cout)
                                << "        start request " + uri + " at thread " << std::this_thread::get_id() << std::endl;
                        uri_ = uri;
                        // attempt to connect to the host on the standard http port (80)
                        if (!http_client->connect(host_name_)) {
                                std::osyncstream(std::cout) << "Error, could not resolve host: " << host_name_ << std::endl;
                                return 1;
                        }

                        std::osyncstream(std::cout) << "        io_context.run()" << std::endl;

                        // Note: the call to io_context.run() will not return until the connection is closed.
                        // .md
                        io_context.run();
                        http_client.reset();
                        std::osyncstream(std::cout)
                                << "        io_context.run() complete, shutdown successful" << std::endl;
                        return 0;
                }

                void onDataAvailable(HttpResponseReadyCallback const& cb) { onDataAvailableCallback_ = cb; }
        };

} // namespace HttpClientDetails

#endif // ifdef FAKE_SERVER_CALL

struct HttpResponse {
        std::string body = "Default message";
        void                setResponseText(std::string const& msg) { body = msg; }
};

std::ostream& operator<<(std::ostream& os, HttpResponse const& res) {
        os << "response body: " << res.body;
        return os;
}

template <typename T> struct BasicTask;

template <typename T> struct promise_type_t {
        friend class promise_final_awaitable;

    public:
        // Keep a coroutine handle referring to the parent coroutine if any. That is, if we
        // co_await a coroutine within another coroutine, this handle will be used to continue
        // working from where we left off.
        std::coroutine_handle<> awaiting_coroutine_;

        // value to be computed
        T value_;

        promise_type_t() {
                globalInstanceCounter_++;
                instanceId_ = globalInstanceCounter_;
                std::osyncstream(std::cout) << "    promise_type() " << instanceId_ << std::endl;
        }

        ~promise_type_t() { std::osyncstream(std::cout) << "    ~promise_type() " << instanceId_ << std::endl; }

        BasicTask<T> get_return_object() noexcept {
                return {std::coroutine_handle<promise_type_t>::from_promise(*this)};
        }

        std::suspend_never initial_suspend() noexcept {
                std::osyncstream(std::cout) << "    promise_type " << instanceId_ << " initial_suspend" << std::endl;
                return {};
        }

        void unhandled_exception() { std::rethrow_exception(std::current_exception()); }

        struct promise_final_awaitable {
#ifdef AWAITING_COROUTINE_IN_FINAL_PROMISE
                std::coroutine_handle<> awaiting_coroutine;
                promise_final_awaitable(std::coroutine_handle<> h)
                        : awaiting_coroutine(std::move(h))
#else
                promise_final_awaitable()
#endif
                {
                        globalInstanceCounter_++;
                        instanceId_ = globalInstanceCounter_;
                }

                bool await_ready() const noexcept {
                        std::osyncstream(std::cout)
                                << "    promise_final_awaitable " << instanceId_ << " await_ready" << std::endl;
                        return false;
                }
                // 
                std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type_t> handle) const noexcept {
                        std::osyncstream(std::cout)
                                << "    promise_final_awaitable " << instanceId_ << " await_suspend" << std::endl;
#ifdef AWAITING_COROUTINE_IN_FINAL_PROMISE
                        if (awaiting_coroutine)
                                return awaiting_coroutine;
#endif
                        if (handle.promise().awaiting_coroutine_) {
                                return handle.promise().awaiting_coroutine_;
                        }
                        return std::noop_coroutine();
                }
                void await_resume() const noexcept {
                        std::osyncstream(std::cout)
                                << "    promise_final_awaitable " << instanceId_ << " await_resume" << std::endl;
                }

            private:
                int                             instanceId_                      = 0;
                inline static int globalInstanceCounter_ = 0;
        };

        auto final_suspend() noexcept {
                std::osyncstream(std::cout) << "    promise_type " << instanceId_ << " final_suspend" << std::endl;
#ifdef AWAITING_COROUTINE_IN_FINAL_PROMISE
                return promise_final_awaitable{awaiting_coroutine_};
#else
                return promise_final_awaitable{};
#endif
                // return std::suspend_always();
        }

        void return_value(T value) noexcept {
                std::cout << "    promise_type " << instanceId_ << " setting return value: " << value << std::endl;
                value_ = std::move(value);
        }

    private:
        int                             instanceId_                      = 0;
        inline static int globalInstanceCounter_ = 0;
};

struct HttpRequest {
        HttpRequest(HttpRequest&& other)            = delete;
        HttpRequest(HttpRequest const& other) = delete;
        HttpRequest(std::string const& host, std::string const& uri) : host_(host), uri_(uri) {
                std::osyncstream(std::cout) << "    HttpRequest()" << std::endl;
        }
        ~HttpRequest() {
                std::osyncstream(std::cout) << "    ~HttpRequest() at thread " << std::this_thread::get_id()
                                                                        << std::endl;
        }
        bool await_ready() const noexcept {
                std::osyncstream(std::cout) << "    HttpRequest await_ready" << std::endl;
                return false;
        }

        std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) {
                std::osyncstream(std::cout) << "    HttpRequest await_suspend" << std::endl;
                impl_ = std::make_unique<HttpClientDetails::HttpClientImpl>(host_);
                HttpClientDetails::HttpResponseReadyCallback cb = [this, h](std::string const& msg) {
                        this->response.setResponseText(msg);
                        std::osyncstream(std::cout) << "    HttpRequest callback trying to resume with response text "
                                                                                << msg << " at thread " << std::this_thread::get_id() << std::endl;
                        assert(h);
                        h.resume();
                };
                impl_->onDataAvailable(cb);
                impl_->startRequest(uri_);
                return std::noop_coroutine();
        }

        HttpResponse await_resume() {
                std::osyncstream(std::cout) << "    HttpRequest await_resume" << std::endl;
                return std::move(response);
        }

    private:
        std::string host_;
        std::string uri_;
#ifdef INFINITE_IMPL_LIFETIME
        HttpClientDetails::HttpClientImpl* impl_ = nullptr;
#else
        std::unique_ptr<HttpClientDetails::HttpClientImpl> impl_;
#endif
        HttpResponse response;
};

using HttpResponseTask = BasicTask<HttpResponse>;

template <typename T> struct BasicTask {
        using promise_type = ::promise_type_t<T>;
        std::coroutine_handle<promise_type> handle_;

        BasicTask(std::coroutine_handle<promise_type> p) : handle_(std::move(p)) {
                globalInstanceCounter_++;
                instanceId_ = globalInstanceCounter_;
                std::osyncstream(std::cout) << "    BasicTask() " << instanceId_ << std::endl;
        }

        ~BasicTask() {
                std::osyncstream(std::cout) << "    ~BasicTask() " << instanceId_ << std::endl;
                if (handle_) {
                        handle_.destroy();
                }
        }

#if 1
        struct TaskAwaiter {
                BasicTask<T>& task_;
                bool                    await_ready() const noexcept {
                         std::osyncstream(std::cout)
                                 << "    BasicTask " << task_.instanceId_ << " await_ready" << std::endl;
                         return false;
                }

                // Jeremyong uses symmetric transfer in the final_suspend of the promise,
                // not in this Tasks' await suspend method. He also uses a void type, but we can also return a
                // noop_coroutine.
                // /
                std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> h) const noexcept {
                        std::osyncstream(std::cout)
                                << "    BasicTask " << task_.instanceId_ << " await_suspend " << std::endl;
                        task_.handle_.promise().awaiting_coroutine_ = h;
                        return std::noop_coroutine();
                }

                T await_resume() const noexcept {
                        std::osyncstream(std::cout)
                                << "    BasicTask " << task_.instanceId_
                                << " await_resume, value: " << task_.handle_.promise().value_ << std::endl;
                        return std::move(task_.handle_.promise().value_);
                }
        };

        TaskAwaiter operator co_await() { return TaskAwaiter{*this}; }

#else
        bool await_ready() const noexcept {
                std::osyncstream(std::cout) << "    BasicTask " << instanceId_ << " await_ready" << std::endl;
                return false;
        }

        // Jeremyong uses symmetric transfer in the final_suspend of the promise,
        // not in this Tasks' await suspend method. He also uses a void type, but we can also return a
        // noop_coroutine. /
        std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> h) const noexcept {
                std::osyncstream(std::cout) << "    BasicTask " << instanceId_ << " await_suspend " << std::endl;
                handle_.promise().awaiting_coroutine_ = h;
                return std::noop_coroutine();
        }

        T await_resume() const noexcept {
                std::osyncstream(std::cout) << "    BasicTask " << instanceId_
                                                                        << " await_resume, value: " << handle_.promise().value_ << std::endl;
                return std::move(handle_.promise().value_);
        }
#endif

        // void resume() {
        //     if (handle_)
        //         handle_.resume();
        // }

    private:
        int                             instanceId_                      = 0;
        inline static int globalInstanceCounter_ = 0;
};

class HttpClient {
        std::string host_;

    public:
        explicit HttpClient(std::string const& serverUrl) : host_(serverUrl) {
                std::osyncstream(std::cout) << "    HttpClient()" << std::endl;
        }

        ~HttpClient() { std::osyncstream(std::cout) << "    ~HttpClient()" << std::endl; }

        HttpResponseTask GET(std::string const& uri) const {
                std::osyncstream(std::cout) << "    Before HttpClient::GET" << std::endl;
                auto req = HttpRequest(host_, uri);
#ifdef ACTIVATE_CO_AWAIT_HACK
                co_await continue_on_new_thread();
#endif
                auto data = co_await req;
#ifdef ACTIVATE_CO_AWAIT_HACK
                co_await continue_on_new_thread();
#endif
                std::osyncstream(std::cout) << "    After HttpClient::GET " << data.body << std::endl;
                co_return data;
        }
};

// This works!
#ifdef RUN_EXAMPLE_1
HttpResponseTask async_http_call(HttpClient& client) {
        std::osyncstream(std::cout) << "    Coroutine creation" << std::endl;

        // pending_response contains the handle to the coroutine frame of HttpClient::GET
        HttpResponseTask pending_response_1 = client.GET("/Hello");

        std::osyncstream(std::cout) << "    Coroutine about to suspend" << std::endl;
        HttpResponse response_1 = co_await pending_response_1;

        std::osyncstream(std::cout) << "    Coroutine 1 resumed:" << std::endl;
        std::osyncstream(std::cout) << "    -> result retrieved via promise object function: " << response_1.body
                                                                << std::endl;

        HttpResponseTask pending_response_2 = client.GET("/World");
        std::osyncstream(std::cout) << "    Coroutine about to suspend again" << std::endl;
        HttpResponse response_2 = co_await pending_response_2;

        std::osyncstream(std::cout) << "    Coroutine 2 resumed:" << std::endl;
        std::osyncstream(std::cout) << "    -> result retrieved via promise object function: " << response_2.body
                                                                << std::endl;

        #ifndef FAKE_SERVER_CALL
        for (int i = 0; i < 1000; i++) {
                co_await client.GET("/Hello");
        }
        #endif

        // TODO How can I start another async operation, that does not return a string, but an int?
        // Assuming we want to calculate the number of words in the retrieved document body.

        co_return {response_1.body + " " + response_2.body};
}
#endif

#ifndef FAKE_SERVER_CALL
class ServerWrapper {
        ASIO::io_context io_context;
        std::jthread         serverThread_;

    public:
        ServerWrapper() = default;
        void startInAnotherThread() {
                serverThread_ = std::jthread([&]() {
                        HttpServerDetails::HttpServerImpl server(io_context);
                        // Blocking call
                        int ec = server.run();
                        std::osyncstream(std::cout) << "    After server.run()" << std::endl;
                        return ec;
                });
        }
        void stop() {
                std::osyncstream(std::cout) << "    Request server to stop" << std::endl;
                // .20.0/doc/asio/reference/io_context/stop.html
                io_context.stop();
        }
};
#endif // ifdef FAKE_SERVER_CALL

int main(int argc, char* argv[]) {
        ServerWrapper server;
        server.startInAnotherThread();
        std::this_thread::sleep_for(200ms);

        HttpClient client("localhost");

        std::osyncstream(std::cout) << "Starting async call..." << std::endl;

        auto task = async_http_call(client);
        // If the initial_suspend of the promise_type is std::suspend_always, we have to resume here in order to
        // continue:
        // task.resume();

        std::osyncstream(std::cout) << "Main continuing..." << std::endl;

        // Run the user interface...
        std::this_thread::sleep_for(std::chrono::seconds(3 * REQUEST_DELAY_SECONDS));

        server.stop();

        return 0;
}

So I am playing around with coroutines and managed to integrate a 3rd party http client, in this particular case "via-http-lib", for learning purposes, and it compiles and works. That lib provides a non-blocking http client powered by Asio under the hood and utilizes callbacks for event handling.

The goal of my learning program was how to integrate those callbacks into coroutines, so I can make async operations appear synchronous and I think I achieved that.

The problem is, I cannot make multiple GET requests after each other, despite co_awaiting them.

The problem is not my coroutine implementation, if I fake the server with a local reply it works. I believe the problem is that asio's io_context is blocking as long as a connection is active.

In that via- lib, a connection is closed when there is no data anymore. That should be the case at some point. However, it never reaches a 2nd co_await HttpClient::GET. More precisely, it never reaches the code directly after io_context.run().


I tried to run co_await req (and thus ultimately io_context) in another thread, which was not enough. Interestingly, I learned that if I add another thread-switch, e.g. wrap the co_await req in between 2 thread-switching co_awaits, it appears I can run as many GET requests as I want, and even if looping 10000 GET requests, I don't have more than 4 threads (according to Windows' task manager). But still it can't be the right solution. I would like to have a single threaded solution. The whole point of async and coroutines.

Unfortunately I have 0 experience with asio and I wonder how to integrate it properly into my use case. I thought the whole point of asio is to enable async work. But the asio examples I found are just procedural programs executed on top level, for example a single one shot call or the main while loop. However I am calling asio deep inside my object oriented program, and fail to translate the usage accordingly...

HttpResponseTask HttpClient::GET(const std::string& uri) const
{
    auto req = HttpRequest(host_, uri);
    co_await continue_on_new_thread();
    auto data = co_await req;
    co_await continue_on_new_thread();
    co_return data;
}

co_await req utilizes my awaitable HttpRequest implementation:

std::coroutine_handle<> HttpRequest::await_suspend(std::coroutine_handle<> h)
{
    std::osyncstream(std::cout) << "    HttpRequest await_suspend" << std::endl;
    impl_ = std::make_unique<HttpClientDetails::HttpClientImpl>(host_);
    HttpClientDetails::HttpResponseReadyCallback cb = [this, h](const std::string& msg)
    {
        this->response.setResponseText(msg);
        std::osyncstream(std::cout) << "    HttpRequest callback trying to resume with response text " << msg << " at thread " << std::this_thread::get_id() <<  std::endl;
        assert(h);
        h.resume();
    };
    impl_->onDataAvailable(cb);
    impl_->startRequest(uri_);
    return std::noop_coroutine();
}

Below is my wrapper of the 3rd party lib that actually makes the request. On the installed connect handler which is not shown here the lib's actual GET request happens. See my comment for asio below which I believe is the problem here:

int HttpClientImpl::startRequest(const std::string &uri)
{
    uri_ = uri;
    if (!http_client->connect(host_name_))
    {
        std::osyncstream(std::cout) << "Error, could not resolve host: " << host_name_ << std::endl;
        return 1;
    }

    print("io_context.run()");
    // Note: the call to io_context.run() will not return until the connection is closed.
    // https://github/kenba/via-httplib/blob/main/docs/Client.md
    io_context.run();
    http_client.reset();
    print("io_context.run() complete, shutdown successful");
    return 0;
}

Here is my basic "continue on new thread" awaitable:

 struct continue_on_new_thread {
    bool await_ready() const noexcept
    {
        std::osyncstream(std::cout) << "    switch_to_new_thread await_ready in thread " << std::this_thread::get_id() << std::endl;
        return false;
    }

    std::coroutine_handle<> await_suspend(std::coroutine_handle<> h)
    {
        std::osyncstream(std::cout) << "    continue_on_new_thread await_suspend" << std::endl;
        std::jthread([this, h]()
            {
                std::osyncstream(std::cout) << "    continue_on_new_thread --- continuing in thread " << std::this_thread::get_id() << std::endl;
                h.resume();
            }).detach();

        return std::noop_coroutine();
    }

    void await_resume()
    {
        std::osyncstream(std::cout) << "    continue_on_new_thread --- resuming in thread " << std::this_thread::get_id() << std::endl;
    }
};

Update: Added full example. You have to get the dependencies into /asio/asio-1.30.2/ and /via-httplib/.

main.cpp

#include <cassert>
#include <chrono>
#include <coroutine>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <syncstream>
#include <thread>

#define ASIO_STANDALONE

// Comment out to switch on a real server on localhost
// #define FAKE_SERVER_CALL

// Comment out to make it work with the thread-switching hack
// #define ACTIVATE_CO_AWAIT_HACK

// Adding the awaiting_coroutine_ as member to the final promise is also a possible implementation detail
// #define AWAITING_COROUTINE_IN_FINAL_PROMISE

#define REQUEST_DELAY_SECONDS 4

#ifndef FAKE_SERVER_CALL
        #include "via/comms/tcp_adaptor.hpp"
        #include "via/http/request_handler.hpp"
        #include "via/http/request_router.hpp"
        #include "via/http_client.hpp"
        #include "via/http_server.hpp"

/// Define an HTTP server using std::string to store message bodies
using http_server_type = via::http_server<via::comms::tcp_adaptor, std::string>;
using http_connection    = http_server_type::http_connection_type;
using http_request       = http_server_type::http_request;
using tx_response            = via::http::tx_response;
using Parameters             = via::http::Parameters;
#endif

#ifdef ACTIVATE_CO_AWAIT_HACK
struct continue_on_new_thread {
        bool await_ready() const noexcept {
                std::osyncstream(std::cout) << "    switch_to_new_thread await_ready in thread "
                                                                        << std::this_thread::get_id() << std::endl;
                return false;
        }

        std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) {
                std::osyncstream(std::cout) << "    switch_to_new_thread await_suspend" << std::endl;
                std::jthread([this, h]() {
                        std::osyncstream(std::cout) << "    switch_to_new_thread --- continuing in thread "
                                                                                << std::this_thread::get_id() << std::endl;
                        h.resume();
                }).detach();

                return std::noop_coroutine();
        }

        void await_resume() {
                std::osyncstream(std::cout) << "    switch_to_new_thread --- resuming in thread "
                                                                        << std::this_thread::get_id() << std::endl;
        }
};
#endif // ACTIVATE_CO_AWAIT_HACK

using namespace std::chrono_literals;

#ifdef FAKE_SERVER_CALL
namespace HttpClientDetails {
        using HttpResponseReadyCallback = std::function<void(std::string const& msg)>;

        class HttpClientImpl {
                std::jthread thread_;
                std::string  host_;

            public:
                HttpClientImpl(std::string const& host) {
                        std::osyncstream(std::cout) << "        HttpClientImpl() " << this << std::endl;
                }
                ~HttpClientImpl() {
                        std::osyncstream(std::cout) << "        ~HttpClientImpl() " << this << " at thread "
                                                                                << std::this_thread::get_id() << std::endl;
                }
                void startRequest(std::string const& uri) {
                        std::osyncstream(std::cout)
                                << "        start request " + uri + " at thread " << std::this_thread::get_id() << std::endl;
                        thread_ = std::jthread(
                                [this](std::string const& uri, HttpResponseReadyCallback const& cb) {
                                        std::this_thread::sleep_for(std::chrono::seconds(REQUEST_DELAY_SECONDS));
                                        // TODO Make real request...
                                        assert(cb);
                                        cb(uri);
                                },
                                uri, cb_);
                        thread_.detach();
                }
                void onDataAvailable(HttpResponseReadyCallback const& cb) { cb_ = cb; }

            private:
                HttpResponseReadyCallback cb_;
        };

} // namespace HttpClientDetails

#else

namespace HttpServerDetails {
        class HttpServerImpl {

                void request_handler(http_connection::weak_pointer weak_ptr, http_request const& request,
                                                         std::string const& body) {
                        std::cout << "Rx request: " << request.to_string();
                        std::cout << request.headers().to_string();
                        std::cout << "Rx body: " << body << std::endl;

                        http_connection::shared_pointer connection(weak_ptr.lock());
                        if (connection) {
                                // output the request
                                via::http::tx_response response(via::http::response_status::code::OK);
                                response.add_server_header();
                                response.add_date_header();

                                // respond with the client's address
                                std::string response_body("Hello, ");
                                response_body += connection->remote_address();
                                connection->send(std::move(response), std::move(response_body));
                        } else
                                std::cerr << "Failed to lock http_connection::weak_pointer" << std::endl;
                }

                tx_response get_hello_handler(http_request const&, // request,
                                                                            Parameters const&,   // parameters,
                                                                            std::string const&,  // data,
                                                                            std::string& response_body) {
                        response_body += "Hello";
                        return tx_response(via::http::response_status::code::OK);
                }

                tx_response get_world_handler(http_request const&, // request,
                                                                            Parameters const&,   // parameters,
                                                                            std::string const&,  // data,
                                                                            std::string& response_body) {
                        response_body += "World";
                        return tx_response(via::http::response_status::code::OK);
                }

            public:
                ASIO::io_context& io_context;
                HttpServerImpl(ASIO::io_context& c) : io_context{c} {}

                int run() {
                        unsigned short port_number(via::comms::tcp_adaptor::DEFAULT_HTTP_PORT);
                        std::cout << "server listening on port: " << port_number << std::endl;

                        try {
                                // The asio io_context.
                                // ASIO::io_context io_context;

                                // Create the HTTP server, attach the request handler
                                http_server_type http_server(io_context);

                                http_server.request_router().add_method("GET", "/Hello",
                                                                                                                [this](http_request const& r, Parameters const& p,
                                                                                                                             std::string const& s,
                                                                                                                             std::string& response_body) -> tx_response {
                                                                                                                        return get_hello_handler(r, p, s, response_body);
                                                                                                                });
                                http_server.request_router().add_method("GET", "/World",
                                                                                                                [this](http_request const& r, Parameters const& p,
                                                                                                                             std::string const& s,
                                                                                                                             std::string& response_body) -> tx_response {
                                                                                                                        return get_world_handler(r, p, s, response_body);
                                                                                                                });

                                // Accept IPV4 connections on the default port (80)
                                ASIO_ERROR_CODE error(http_server.accept_connections());
                                if (error) {
                                        std::cerr << "Error: " << error.message() << std::endl;
                                        return 1;
                                }

                                // Start the server
                                io_context.run();
                                std::osyncstream(std::cout) << "    After server io_context.run()" << std::endl;

                        } catch (std::exception& e) {
                                std::cerr << "Exception:" << e.what() << std::endl;
                                return 1;
                        }

                        return 0;
                }
        };

} // namespace HttpServerDetails

namespace HttpClientDetails {

        using HttpResponseReadyCallback = std::function<void(std::string const&)>;

        // This is basically the via-httplib example wrapped in a class,
        // and extended by:
        // - member variable onDataAvailableCallback_
        // - member variable responseText_
        class HttpClientImpl {
                ASIO::io_context io_context;
                std::string          host_name_;

                using http_client_type = via::http_client<via::comms::tcp_adaptor, std::string>;
                using http_response      = http_client_type::http_response;
                using http_chunk_type    = http_client_type::chunk_type;

                http_client_type::shared_pointer http_client;

                std::string uri_;

                void connected_handler() {
                        via::http::tx_request request(via::http::request_method::id::GET, uri_);
                        http_client->send(std::move(request));
                }

                void response_handler(http_response const& response, std::string const& body) {
                        std::osyncstream(std::cout)
                                << "Rx response: " << response.to_string() << response.headers().to_string();
                        std::osyncstream(std::cout) << "Rx body: " << body << std::endl;

                        if (!response.is_chunked()) {
                                http_client->disconnect();
                                if (onDataAvailableCallback_) {
                                        responseText_ += body;
                                        onDataAvailableCallback_(body);
                                }
                        }
                }

                void chunk_handler(http_chunk_type const& chunk, std::string const& data) {
                        if (chunk.is_last()) {
                                std::osyncstream(std::cout) << "Rx chunk is last, extension: " << chunk.extension()
                                                                                        << " trailers: " << chunk.trailers().to_string() << std::endl;
                                http_client->disconnect();

                                if (onDataAvailableCallback_) {
                                        responseText_ += data;
                                        onDataAvailableCallback_(data);
                                }
                        } else {
                                std::osyncstream(std::cout)
                                        << "Rx chunk, size: " << chunk.size() << " data: " << data << std::endl;
                        }
                }

                void disconnected_handler() { std::osyncstream(std::cout) << "Socket disconnected" << std::endl; }

                HttpResponseReadyCallback onDataAvailableCallback_;
                std::string                             responseText_;

            public:
                std::string getResponseText() const { return responseText_; }

                HttpClientImpl(std::string const& host) : host_name_(host) {
                        std::osyncstream(std::cout) << "        HttpClientImpl() " << this << std::endl;

                        http_client = http_client_type::create(
                                io_context,
                                [this](http_response const& res, std::string const& st) { response_handler(res, st); },
                                [this](http_chunk_type const& chunk, std::string const& data) {
                                        chunk_handler(chunk, data);
                                });

                        http_client->connected_event([this]() { connected_handler(); });

                        http_client->disconnected_event([this]() { disconnected_handler(); });
                }

                ~HttpClientImpl() {
                        std::osyncstream(std::cout) << "        ~HttpClientImpl() " << this << " at thread "
                                                                                << std::this_thread::get_id() << std::endl;
                }

                int startRequest(std::string const& uri) {
                        std::osyncstream(std::cout)
                                << "        start request " + uri + " at thread " << std::this_thread::get_id() << std::endl;
                        uri_ = uri;
                        // attempt to connect to the host on the standard http port (80)
                        if (!http_client->connect(host_name_)) {
                                std::osyncstream(std::cout) << "Error, could not resolve host: " << host_name_ << std::endl;
                                return 1;
                        }

                        std::osyncstream(std::cout) << "        io_context.run()" << std::endl;

                        // Note: the call to io_context.run() will not return until the connection is closed.
                        // https://github/kenba/via-httplib/blob/main/docs/Client.md
                        io_context.run();
                        http_client.reset();
                        std::osyncstream(std::cout)
                                << "        io_context.run() complete, shutdown successful" << std::endl;
                        return 0;
                }

                void onDataAvailable(HttpResponseReadyCallback const& cb) { onDataAvailableCallback_ = cb; }
        };

} // namespace HttpClientDetails

#endif // ifdef FAKE_SERVER_CALL

struct HttpResponse {
        std::string body = "Default message";
        void                setResponseText(std::string const& msg) { body = msg; }
};

std::ostream& operator<<(std::ostream& os, HttpResponse const& res) {
        os << "response body: " << res.body;
        return os;
}

template <typename T> struct BasicTask;

template <typename T> struct promise_type_t {
        friend class promise_final_awaitable;

    public:
        // Keep a coroutine handle referring to the parent coroutine if any. That is, if we
        // co_await a coroutine within another coroutine, this handle will be used to continue
        // working from where we left off.
        std::coroutine_handle<> awaiting_coroutine_;

        // value to be computed
        T value_;

        promise_type_t() {
                globalInstanceCounter_++;
                instanceId_ = globalInstanceCounter_;
                std::osyncstream(std::cout) << "    promise_type() " << instanceId_ << std::endl;
        }

        ~promise_type_t() { std::osyncstream(std::cout) << "    ~promise_type() " << instanceId_ << std::endl; }

        BasicTask<T> get_return_object() noexcept {
                return {std::coroutine_handle<promise_type_t>::from_promise(*this)};
        }

        std::suspend_never initial_suspend() noexcept {
                std::osyncstream(std::cout) << "    promise_type " << instanceId_ << " initial_suspend" << std::endl;
                return {};
        }

        void unhandled_exception() { std::rethrow_exception(std::current_exception()); }

        struct promise_final_awaitable {
#ifdef AWAITING_COROUTINE_IN_FINAL_PROMISE
                std::coroutine_handle<> awaiting_coroutine;
                promise_final_awaitable(std::coroutine_handle<> h)
                        : awaiting_coroutine(std::move(h))
#else
                promise_final_awaitable()
#endif
                {
                        globalInstanceCounter_++;
                        instanceId_ = globalInstanceCounter_;
                }

                bool await_ready() const noexcept {
                        std::osyncstream(std::cout)
                                << "    promise_final_awaitable " << instanceId_ << " await_ready" << std::endl;
                        return false;
                }
                // https://stackoverflow/a/67944896
                std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type_t> handle) const noexcept {
                        std::osyncstream(std::cout)
                                << "    promise_final_awaitable " << instanceId_ << " await_suspend" << std::endl;
#ifdef AWAITING_COROUTINE_IN_FINAL_PROMISE
                        if (awaiting_coroutine)
                                return awaiting_coroutine;
#endif
                        if (handle.promise().awaiting_coroutine_) {
                                return handle.promise().awaiting_coroutine_;
                        }
                        return std::noop_coroutine();
                }
                void await_resume() const noexcept {
                        std::osyncstream(std::cout)
                                << "    promise_final_awaitable " << instanceId_ << " await_resume" << std::endl;
                }

            private:
                int                             instanceId_                      = 0;
                inline static int globalInstanceCounter_ = 0;
        };

        auto final_suspend() noexcept {
                std::osyncstream(std::cout) << "    promise_type " << instanceId_ << " final_suspend" << std::endl;
#ifdef AWAITING_COROUTINE_IN_FINAL_PROMISE
                return promise_final_awaitable{awaiting_coroutine_};
#else
                return promise_final_awaitable{};
#endif
                // return std::suspend_always();
        }

        void return_value(T value) noexcept {
                std::cout << "    promise_type " << instanceId_ << " setting return value: " << value << std::endl;
                value_ = std::move(value);
        }

    private:
        int                             instanceId_                      = 0;
        inline static int globalInstanceCounter_ = 0;
};

struct HttpRequest {
        HttpRequest(HttpRequest&& other)            = delete;
        HttpRequest(HttpRequest const& other) = delete;
        HttpRequest(std::string const& host, std::string const& uri) : host_(host), uri_(uri) {
                std::osyncstream(std::cout) << "    HttpRequest()" << std::endl;
        }
        ~HttpRequest() {
                std::osyncstream(std::cout) << "    ~HttpRequest() at thread " << std::this_thread::get_id()
                                                                        << std::endl;
        }
        bool await_ready() const noexcept {
                std::osyncstream(std::cout) << "    HttpRequest await_ready" << std::endl;
                return false;
        }

        std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) {
                std::osyncstream(std::cout) << "    HttpRequest await_suspend" << std::endl;
                impl_ = std::make_unique<HttpClientDetails::HttpClientImpl>(host_);
                HttpClientDetails::HttpResponseReadyCallback cb = [this, h](std::string const& msg) {
                        this->response.setResponseText(msg);
                        std::osyncstream(std::cout) << "    HttpRequest callback trying to resume with response text "
                                                                                << msg << " at thread " << std::this_thread::get_id() << std::endl;
                        assert(h);
                        h.resume();
                };
                impl_->onDataAvailable(cb);
                impl_->startRequest(uri_);
                return std::noop_coroutine();
        }

        HttpResponse await_resume() {
                std::osyncstream(std::cout) << "    HttpRequest await_resume" << std::endl;
                return std::move(response);
        }

    private:
        std::string host_;
        std::string uri_;
#ifdef INFINITE_IMPL_LIFETIME
        HttpClientDetails::HttpClientImpl* impl_ = nullptr;
#else
        std::unique_ptr<HttpClientDetails::HttpClientImpl> impl_;
#endif
        HttpResponse response;
};

using HttpResponseTask = BasicTask<HttpResponse>;

template <typename T> struct BasicTask {
        using promise_type = ::promise_type_t<T>;
        std::coroutine_handle<promise_type> handle_;

        BasicTask(std::coroutine_handle<promise_type> p) : handle_(std::move(p)) {
                globalInstanceCounter_++;
                instanceId_ = globalInstanceCounter_;
                std::osyncstream(std::cout) << "    BasicTask() " << instanceId_ << std::endl;
        }

        ~BasicTask() {
                std::osyncstream(std::cout) << "    ~BasicTask() " << instanceId_ << std::endl;
                if (handle_) {
                        handle_.destroy();
                }
        }

#if 1
        struct TaskAwaiter {
                BasicTask<T>& task_;
                bool                    await_ready() const noexcept {
                         std::osyncstream(std::cout)
                                 << "    BasicTask " << task_.instanceId_ << " await_ready" << std::endl;
                         return false;
                }

                // Jeremyong uses symmetric transfer in the final_suspend of the promise,
                // not in this Tasks' await suspend method. He also uses a void type, but we can also return a
                // noop_coroutine.
                // https://www.jeremyong/cpp/2021/01/04/cpp20-coroutines-a-minimal-async-framework/
                std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> h) const noexcept {
                        std::osyncstream(std::cout)
                                << "    BasicTask " << task_.instanceId_ << " await_suspend " << std::endl;
                        task_.handle_.promise().awaiting_coroutine_ = h;
                        return std::noop_coroutine();
                }

                T await_resume() const noexcept {
                        std::osyncstream(std::cout)
                                << "    BasicTask " << task_.instanceId_
                                << " await_resume, value: " << task_.handle_.promise().value_ << std::endl;
                        return std::move(task_.handle_.promise().value_);
                }
        };

        TaskAwaiter operator co_await() { return TaskAwaiter{*this}; }

#else
        bool await_ready() const noexcept {
                std::osyncstream(std::cout) << "    BasicTask " << instanceId_ << " await_ready" << std::endl;
                return false;
        }

        // Jeremyong uses symmetric transfer in the final_suspend of the promise,
        // not in this Tasks' await suspend method. He also uses a void type, but we can also return a
        // noop_coroutine. https://www.jeremyong/cpp/2021/01/04/cpp20-coroutines-a-minimal-async-framework/
        std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> h) const noexcept {
                std::osyncstream(std::cout) << "    BasicTask " << instanceId_ << " await_suspend " << std::endl;
                handle_.promise().awaiting_coroutine_ = h;
                return std::noop_coroutine();
        }

        T await_resume() const noexcept {
                std::osyncstream(std::cout) << "    BasicTask " << instanceId_
                                                                        << " await_resume, value: " << handle_.promise().value_ << std::endl;
                return std::move(handle_.promise().value_);
        }
#endif

        // void resume() {
        //     if (handle_)
        //         handle_.resume();
        // }

    private:
        int                             instanceId_                      = 0;
        inline static int globalInstanceCounter_ = 0;
};

class HttpClient {
        std::string host_;

    public:
        explicit HttpClient(std::string const& serverUrl) : host_(serverUrl) {
                std::osyncstream(std::cout) << "    HttpClient()" << std::endl;
        }

        ~HttpClient() { std::osyncstream(std::cout) << "    ~HttpClient()" << std::endl; }

        HttpResponseTask GET(std::string const& uri) const {
                std::osyncstream(std::cout) << "    Before HttpClient::GET" << std::endl;
                auto req = HttpRequest(host_, uri);
#ifdef ACTIVATE_CO_AWAIT_HACK
                co_await continue_on_new_thread();
#endif
                auto data = co_await req;
#ifdef ACTIVATE_CO_AWAIT_HACK
                co_await continue_on_new_thread();
#endif
                std::osyncstream(std::cout) << "    After HttpClient::GET " << data.body << std::endl;
                co_return data;
        }
};

// This works!
#ifdef RUN_EXAMPLE_1
HttpResponseTask async_http_call(HttpClient& client) {
        std::osyncstream(std::cout) << "    Coroutine creation" << std::endl;

        // pending_response contains the handle to the coroutine frame of HttpClient::GET
        HttpResponseTask pending_response_1 = client.GET("/Hello");

        std::osyncstream(std::cout) << "    Coroutine about to suspend" << std::endl;
        HttpResponse response_1 = co_await pending_response_1;

        std::osyncstream(std::cout) << "    Coroutine 1 resumed:" << std::endl;
        std::osyncstream(std::cout) << "    -> result retrieved via promise object function: " << response_1.body
                                                                << std::endl;

        HttpResponseTask pending_response_2 = client.GET("/World");
        std::osyncstream(std::cout) << "    Coroutine about to suspend again" << std::endl;
        HttpResponse response_2 = co_await pending_response_2;

        std::osyncstream(std::cout) << "    Coroutine 2 resumed:" << std::endl;
        std::osyncstream(std::cout) << "    -> result retrieved via promise object function: " << response_2.body
                                                                << std::endl;

        #ifndef FAKE_SERVER_CALL
        for (int i = 0; i < 1000; i++) {
                co_await client.GET("/Hello");
        }
        #endif

        // TODO How can I start another async operation, that does not return a string, but an int?
        // Assuming we want to calculate the number of words in the retrieved document body.

        co_return {response_1.body + " " + response_2.body};
}
#endif

#ifndef FAKE_SERVER_CALL
class ServerWrapper {
        ASIO::io_context io_context;
        std::jthread         serverThread_;

    public:
        ServerWrapper() = default;
        void startInAnotherThread() {
                serverThread_ = std::jthread([&]() {
                        HttpServerDetails::HttpServerImpl server(io_context);
                        // Blocking call
                        int ec = server.run();
                        std::osyncstream(std::cout) << "    After server.run()" << std::endl;
                        return ec;
                });
        }
        void stop() {
                std::osyncstream(std::cout) << "    Request server to stop" << std::endl;
                // https://think-async/Asio/asio-1.20.0/doc/asio/reference/io_context/stop.html
                io_context.stop();
        }
};
#endif // ifdef FAKE_SERVER_CALL

int main(int argc, char* argv[]) {
        ServerWrapper server;
        server.startInAnotherThread();
        std::this_thread::sleep_for(200ms);

        HttpClient client("localhost");

        std::osyncstream(std::cout) << "Starting async call..." << std::endl;

        auto task = async_http_call(client);
        // If the initial_suspend of the promise_type is std::suspend_always, we have to resume here in order to
        // continue:
        // task.resume();

        std::osyncstream(std::cout) << "Main continuing..." << std::endl;

        // Run the user interface...
        std::this_thread::sleep_for(std::chrono::seconds(3 * REQUEST_DELAY_SECONDS));

        server.stop();

        return 0;
}
Share Improve this question edited Apr 1 at 21:24 sehe 395k47 gold badges472 silver badges664 bronze badges asked Mar 31 at 22:12 user2366975user2366975 4,7529 gold badges54 silver badges96 bronze badges 3
  • I'd love to see a self-contained working example here. If you start coming up with custom promise types, it really depends on how you tie it all together – sehe Commented Mar 31 at 22:23
  • I added a full example. Here is the CMakeLists.txt because I could not paste it due to character limit. cmake_minimum_required(VERSION 3.5) project(coroutine-chaining LANGUAGES CXX) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED ON) add_executable(coroutine-chaining main.cpp) set(ENV{Asio_DIR} "/asio/asio-1.30.2") include_directories(${CMAKE_SOURCE_DIR}$ENV{Asio_DIR}/include) add_subdirectory(via-httplib) include_directories(via-httplib/include) – user2366975 Commented Apr 1 at 18:04
  • Sorry I had to reformat your sample because after my text edits it exceeded the post length limits... Never mind, I'll still answer – sehe Commented Apr 1 at 21:25
Add a comment  | 

1 Answer 1

Reset to default 1

Urg. I fail to see the appeal of via-http-lib for the client here. It really appears to be an a-typical async client.

It does inversion-of-control on the continuation-passing-style that naturally fits asynchronous APIs. To add insult to injury, it also flips ownership (callbacks cannot contain the client - that would be a cyclic reference during construction) so you have to have a wrapping type with dynamic lifetime.

It also hard-couples to io_context& instead of using executors as god intended. This may be showing its age.

This is a problem.

Besides all that, I see your code explicitly http_client->disconnect() in response_handler() - so it should be little surprise that you cannot do multiple requests.

Given the limitations, I'd suggest "domesticating" the Via model into a proper async interface. I gave it considerable effort (wayyyyy too long) and kept running into unpleasant corners. I'm sad to inform you that my time budget to learn ViaHttplib has been depleted before I could make it work.

Whatever the solution, I would not have been using custom promise types or custom transformables. I'd stick to the highlevel interfaces of asio::awaitable or perhaps consider Boost Cobalt to glue together different coroutines.

发布评论

评论列表(0)

  1. 暂无评论