最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

c++ - Execute and wait for unknown number of coroutines with asio - Stack Overflow

programmeradmin5浏览0评论

I'm trying to execute a number of C++20 coroutines (number provided at runtime), in parallel, and wait for them all.

My current design is to post N times to the pool a task that co_spawn a coroutine, push the result of this into a container and use make_parallel_group. Not sure it makes entirely sense, feedback welcomed.

Here's my attempt (not compiling)

#include <asio/co_spawn.hpp>
#include <asio/detached.hpp>
#include <asio/experimental/awaitable_operators.hpp>
#include <asio/experimental/parallel_group.hpp>
#include <asio/io_context.hpp>
#include <asio/steady_timer.hpp>
#include <asio/thread_pool.hpp>
#include <asio/use_awaitable.hpp>
#include <fmt/core.h>
#include <fmt/ranges.h>
#include <iostream>
#include <list>

int main(int, char* argv[])
{
    auto task = [&]() -> asio::awaitable<int> {
        auto ex = co_await asio::this_coro::executor;
        auto& io = *ex.target<asio::io_context>();

        // do coroutine stuff using io ...

        co_return 1; // int is pointless, just an attempt to see if it was making a difference
    };
    asio::thread_pool pool{ 4 };

    auto end_way = asio::use_awaitable; // using asio::deffered gives a different error
    using op_type = decltype(asio::co_spawn(pool, task(), end_way));
    std::list<op_type> ops;
    std::mutex m;

    // 5 is hard coded for the moment
    for(auto i = 0; i < 5; ++i)
    {
        asio::post(pool, [&ops, &task, &pool, &m, &end_way]() {
            std::lock_guard<std::mutex> l{m};
            ops.push_back(asio::co_spawn(pool, task(), end_way));
        });
    }
    asio::experimental::make_parallel_group(ops)
        .async_wait(
            asio::experimental::wait_for_all(),
            [](std::vector<std::size_t> completion_order, auto&&...) {
                for(std::size_t i = 0; i < completion_order.size(); ++i)
                {
                    std::cout << "task " << completion_order[i] << " finished: ";
                }
            }
        );
    pool.join();

    return 0;
}

I'm trying to execute a number of C++20 coroutines (number provided at runtime), in parallel, and wait for them all.

My current design is to post N times to the pool a task that co_spawn a coroutine, push the result of this into a container and use make_parallel_group. Not sure it makes entirely sense, feedback welcomed.

Here's my attempt (not compiling)

#include <asio/co_spawn.hpp>
#include <asio/detached.hpp>
#include <asio/experimental/awaitable_operators.hpp>
#include <asio/experimental/parallel_group.hpp>
#include <asio/io_context.hpp>
#include <asio/steady_timer.hpp>
#include <asio/thread_pool.hpp>
#include <asio/use_awaitable.hpp>
#include <fmt/core.h>
#include <fmt/ranges.h>
#include <iostream>
#include <list>

int main(int, char* argv[])
{
    auto task = [&]() -> asio::awaitable<int> {
        auto ex = co_await asio::this_coro::executor;
        auto& io = *ex.target<asio::io_context>();

        // do coroutine stuff using io ...

        co_return 1; // int is pointless, just an attempt to see if it was making a difference
    };
    asio::thread_pool pool{ 4 };

    auto end_way = asio::use_awaitable; // using asio::deffered gives a different error
    using op_type = decltype(asio::co_spawn(pool, task(), end_way));
    std::list<op_type> ops;
    std::mutex m;

    // 5 is hard coded for the moment
    for(auto i = 0; i < 5; ++i)
    {
        asio::post(pool, [&ops, &task, &pool, &m, &end_way]() {
            std::lock_guard<std::mutex> l{m};
            ops.push_back(asio::co_spawn(pool, task(), end_way));
        });
    }
    asio::experimental::make_parallel_group(ops)
        .async_wait(
            asio::experimental::wait_for_all(),
            [](std::vector<std::size_t> completion_order, auto&&...) {
                for(std::size_t i = 0; i < completion_order.size(); ++i)
                {
                    std::cout << "task " << completion_order[i] << " finished: ";
                }
            }
        );
    pool.join();

    return 0;
}

Share Improve this question asked Mar 25 at 13:21 DavidbrczDavidbrcz 2,43920 silver badges28 bronze badges 1
  • Posting via post does not add anything. The mutex is therefore redundant here. Of course your real program might be more complicated. – sehe Commented Mar 25 at 14:12
Add a comment  | 

1 Answer 1

Reset to default 3

Deferred async operations are move-only. You need to move the range into the parallel group:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <fmt/ranges.h>
#include <iostream>
#include <vector>
namespace asio = boost::asio;

asio::awaitable<int> task() {
    auto ex = co_await asio::this_coro::executor;
    co_return 1;
}

int main() {
    asio::thread_pool pool{4};

    static constexpr auto token = asio::deferred;

    using op_type = decltype(asio::co_spawn(pool, task, token));
    std::vector<op_type> ops;

    unsigned n = 5;
    for (unsigned i = 0; i < n; ++i)
        ops.push_back(asio::co_spawn(pool, task, token));

    auto g = asio::experimental::make_parallel_group(std::move(ops));
    g.async_wait(asio::experimental::wait_for_all(),
                 [](std::vector<std::size_t> completion_order, auto&&...) {
                     std::cout << __PRETTY_FUNCTION__ << std::endl;
                     for (auto index : completion_order) {
                         std::cout << "task " << index << " finished: " << std::endl;
                     }
                 });

    pool.join();
}

Printing e.g.

task 0 finished: 
task 4 finished: 
task 3 finished: 
task 1 finished: 
task 2 finished:

Demo

Demonstrating actual output, random task load, error handling and reporting and simplifying relying on recent Boost making deferred the default completion token:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <fmt/ranges.h>
#include <iostream>
#include <random>
#include <vector>
namespace asio = boost::asio;

asio::awaitable<int> task(int input) {
    co_return input % 7 ? 10 * input : throw std::runtime_error("seven is not a lucky number");
}

int main() {
    asio::thread_pool pool{4};

    std::vector<decltype(co_spawn(pool, task(1)))> ops;

    unsigned n = 10 + std::random_device{}() % 5;
    for (unsigned i = 1; i < n; ++i)
        ops.push_back(co_spawn(pool, task(i)));

    auto g = asio::experimental::make_parallel_group(std::move(ops));
    std::move(g).async_wait( //
        asio::experimental::wait_for_all(),
        [](auto const& order, auto&& exceptions, auto&& rvals) {
            for (size_t i = 0; i < order.size(); ++i)
                try {
                    std::cout << "task " << std::setw(2) << order.at(i) << " finished: ";
                    if (exceptions.at(i))
                        std::rethrow_exception(exceptions.at(i));

                    std::cout << rvals.at(i) << std::endl;
                } catch (std::exception const& e) {
                    std::cout << "ERROR " << std::quoted(e.what()) << std::endl;
                }
        });

    pool.join();
}

With local demos

发布评论

评论列表(0)

  1. 暂无评论