diff --git a/include/boost/capy/concept/io_awaitable.hpp b/include/boost/capy/concept/io_awaitable.hpp index cb0fc1f4..5ab4e327 100644 --- a/include/boost/capy/concept/io_awaitable.hpp +++ b/include/boost/capy/concept/io_awaitable.hpp @@ -13,6 +13,7 @@ #include #include #include +#include namespace boost { namespace capy { @@ -121,6 +122,19 @@ concept IoAwaitable = template using awaitable_result_t = decltype(std::declval&>().await_resume()); +/** Concept for ranges of I/O awaitables. + + A range satisfies `IoAwaitableRange` if it is a sized input range + whose value type satisfies @ref IoAwaitable. + + @tparam R The range type. +*/ +template +concept IoAwaitableRange = + std::ranges::input_range && + std::ranges::sized_range && + IoAwaitable>; + } // namespace capy } // namespace boost diff --git a/include/boost/capy/when_all.hpp b/include/boost/capy/when_all.hpp index 62680ed0..a55cdb52 100644 --- a/include/boost/capy/when_all.hpp +++ b/include/boost/capy/when_all.hpp @@ -23,10 +23,13 @@ #include #include #include +#include +#include #include #include #include #include +#include namespace boost { namespace capy { @@ -59,32 +62,26 @@ struct result_holder std::monostate get() && { return {}; } }; -/** Shared state for when_all operation. +/** Core shared state for when_all operations. - @tparam Ts The result types of the tasks. + Contains all members and methods common to both heterogeneous (variadic) + and homogeneous (range) when_all implementations. State classes embed + this via composition to avoid CRTP destructor ordering issues. + + @par Thread Safety + Atomic operations protect exception capture and completion count. */ -template -struct when_all_state +struct when_all_core { - static constexpr std::size_t task_count = sizeof...(Ts); - - // Completion tracking - when_all waits for all children std::atomic remaining_count_; - // Result storage in input order - std::tuple...> results_; - - // Runner handles - destroyed in await_resume while allocator is valid - std::array, task_count> runner_handles_{}; - // Exception storage - first error wins, others discarded std::atomic has_exception_{false}; std::exception_ptr first_exception_; - // Stop propagation - on error, request stop for siblings std::stop_source stop_source_; - // Connects parent's stop_token to our stop_source + // Bridges parent's stop token to our stop_source struct stop_callback_fn { std::stop_source* source_; @@ -93,19 +90,15 @@ struct when_all_state using stop_callback_t = std::stop_callback; std::optional parent_stop_callback_; - // Parent resumption std::coroutine_handle<> continuation_; io_env const* caller_env_ = nullptr; - when_all_state() - : remaining_count_(task_count) + explicit when_all_core(std::size_t count) noexcept + : remaining_count_(count) { } - // Runners self-destruct in final_suspend. No destruction needed here. - - /** Capture an exception (first one wins). - */ + /** Capture an exception (first one wins). */ void capture_exception(std::exception_ptr ep) { bool expected = false; @@ -113,25 +106,88 @@ struct when_all_state expected, true, std::memory_order_relaxed)) first_exception_ = ep; } +}; + +/** Shared state for heterogeneous when_all (variadic overload). + + @tparam Ts The result types of the tasks. +*/ +template +struct when_all_state +{ + static constexpr std::size_t task_count = sizeof...(Ts); + + when_all_core core_; + std::tuple...> results_; + std::array, task_count> runner_handles_{}; + when_all_state() + : core_(task_count) + { + } }; -/** Wrapper coroutine that intercepts task completion. +/** Shared state for homogeneous when_all (range overload). + + Stores all results in a vector indexed by task position. - This runner awaits its assigned task and stores the result in - the shared state, or captures the exception and requests stop. + @tparam T The common result type of all tasks. */ -template +template +struct when_all_homogeneous_state +{ + when_all_core core_; + std::vector> results_; + std::vector> runner_handles_; + + explicit when_all_homogeneous_state(std::size_t count) + : core_(count) + , results_(count) + , runner_handles_(count) + { + } + + void set_result(std::size_t index, T value) + { + results_[index].emplace(std::move(value)); + } +}; + +/** Specialization for void tasks (no result storage). */ +template<> +struct when_all_homogeneous_state +{ + when_all_core core_; + std::vector> runner_handles_; + + explicit when_all_homogeneous_state(std::size_t count) + : core_(count) + , runner_handles_(count) + { + } +}; + +/** Wrapper coroutine that intercepts task completion for when_all. + + Parameterized on StateType to work with both heterogeneous (variadic) + and homogeneous (range) state types. All state types expose their + shared members through a `core_` member of type when_all_core. + + @tparam StateType The state type (when_all_state or when_all_homogeneous_state). +*/ +template struct when_all_runner { - struct promise_type // : frame_allocating_base // DISABLED FOR TESTING + struct promise_type { - when_all_state* state_ = nullptr; + StateType* state_ = nullptr; + std::size_t index_ = 0; io_env env_; - when_all_runner get_return_object() + when_all_runner get_return_object() noexcept { - return when_all_runner(std::coroutine_handle::from_promise(*this)); + return when_all_runner( + std::coroutine_handle::from_promise(*this)); } std::suspend_always initial_suspend() noexcept @@ -144,45 +200,32 @@ struct when_all_runner struct awaiter { promise_type* p_; - - bool await_ready() const noexcept - { - return false; - } - + bool await_ready() const noexcept { return false; } auto await_suspend(std::coroutine_handle<> h) noexcept { - // Extract everything needed before self-destruction. - auto* state = p_->state_; - auto* counter = &state->remaining_count_; - auto* caller_env = state->caller_env_; - auto cont = state->continuation_; + auto& core = p_->state_->core_; + auto* counter = &core.remaining_count_; + auto* caller_env = core.caller_env_; + auto cont = core.continuation_; h.destroy(); - // If last runner, dispatch parent for symmetric transfer. auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel); if(remaining == 1) return detail::symmetric_transfer(caller_env->executor.dispatch(cont)); return detail::symmetric_transfer(std::noop_coroutine()); } - - void await_resume() const noexcept - { - } + void await_resume() const noexcept {} }; return awaiter{this}; } - void return_void() - { - } + void return_void() noexcept {} void unhandled_exception() { - state_->capture_exception(std::current_exception()); - // Request stop for sibling tasks - state_->stop_source_.request_stop(); + state_->core_.capture_exception(std::current_exception()); + state_->core_.stop_source_.request_stop(); } template @@ -191,15 +234,8 @@ struct when_all_runner std::decay_t a_; promise_type* p_; - bool await_ready() - { - return a_.await_ready(); - } - - decltype(auto) await_resume() - { - return a_.await_resume(); - } + bool await_ready() { return a_.await_ready(); } + decltype(auto) await_resume() { return a_.await_resume(); } template auto await_suspend(std::coroutine_handle h) @@ -230,15 +266,17 @@ struct when_all_runner std::coroutine_handle h_; - explicit when_all_runner(std::coroutine_handle h) + explicit when_all_runner(std::coroutine_handle h) noexcept : h_(h) { } // Enable move for all clang versions - some versions need it - when_all_runner(when_all_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {} + when_all_runner(when_all_runner&& other) noexcept + : h_(std::exchange(other.h_, nullptr)) + { + } - // Non-copyable when_all_runner(when_all_runner const&) = delete; when_all_runner& operator=(when_all_runner const&) = delete; when_all_runner& operator=(when_all_runner&&) = delete; @@ -249,12 +287,12 @@ struct when_all_runner } }; -/** Create a runner coroutine for a single awaitable. +/** Create a runner coroutine for a single awaitable (variadic path). - Awaitable is passed directly to ensure proper coroutine frame storage. + Uses compile-time index for tuple-based result storage. */ template -when_all_runner, Ts...> +when_all_runner> make_when_all_runner(Awaitable inner, when_all_state* state) { using T = awaitable_result_t; @@ -268,10 +306,32 @@ make_when_all_runner(Awaitable inner, when_all_state* state) } } -/** Internal awaitable that launches all runner coroutines and waits. +/** Create a runner coroutine for a single awaitable (range path). - This awaitable is used inside the when_all coroutine to handle - the concurrent execution of child awaitables. + Uses runtime index for vector-based result storage. +*/ +template +when_all_runner +make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index) +{ + using T = awaitable_result_t; + if constexpr (std::is_void_v) + { + co_await std::move(inner); + } + else + { + state->set_result(index, co_await std::move(inner)); + } +} + +/** Internal awaitable that launches all variadic runner coroutines. + + CRITICAL: If the last task finishes synchronously then the parent + coroutine resumes, destroying its frame, and destroying this object + prior to the completion of await_suspend. Therefore, await_suspend + must ensure `this` cannot be referenced after calling `launch_one` + for the last time. */ template class when_all_launcher @@ -297,37 +357,29 @@ class when_all_launcher std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env) { - state_->continuation_ = continuation; - state_->caller_env_ = caller_env; + state_->core_.continuation_ = continuation; + state_->core_.caller_env_ = caller_env; - // Forward parent's stop requests to children if(caller_env->stop_token.stop_possible()) { - state_->parent_stop_callback_.emplace( + state_->core_.parent_stop_callback_.emplace( caller_env->stop_token, - typename state_type::stop_callback_fn{&state_->stop_source_}); + when_all_core::stop_callback_fn{&state_->core_.stop_source_}); if(caller_env->stop_token.stop_requested()) - state_->stop_source_.request_stop(); + state_->core_.stop_source_.request_stop(); } - // CRITICAL: If the last task finishes synchronously then the parent - // coroutine resumes, destroying its frame, and destroying this object - // prior to the completion of await_suspend. Therefore, await_suspend - // must ensure `this` cannot be referenced after calling `launch_one` - // for the last time. - auto token = state_->stop_source_.get_token(); + auto token = state_->core_.stop_source_.get_token(); [&](std::index_sequence) { (..., launch_one(caller_env->executor, token)); }(std::index_sequence_for{}); - // Let signal_completion() handle resumption return std::noop_coroutine(); } void await_resume() const noexcept { - // Results are extracted by the when_all coroutine from state } private: @@ -339,11 +391,11 @@ class when_all_launcher auto h = runner.release(); h.promise().state_ = state_; - h.promise().env_ = io_env{caller_ex, token, state_->caller_env_->frame_allocator}; + h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator}; std::coroutine_handle<> ch{h}; state_->runner_handles_[I] = ch; - state_->caller_env_->executor.post(ch); + state_->core_.caller_env_->executor.post(ch); } }; @@ -367,6 +419,82 @@ auto extract_results(when_all_state& state) }(std::index_sequence_for{}); } +/** Launches all homogeneous runners concurrently. + + Two-phase approach: create all runners first, then post all. + This avoids lifetime issues if a task completes synchronously. +*/ +template +class when_all_homogeneous_launcher +{ + using Awaitable = std::ranges::range_value_t; + using T = awaitable_result_t; + + Range* range_; + when_all_homogeneous_state* state_; + +public: + when_all_homogeneous_launcher( + Range* range, + when_all_homogeneous_state* state) + : range_(range) + , state_(state) + { + } + + bool await_ready() const noexcept + { + return std::ranges::empty(*range_); + } + + std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env) + { + state_->core_.continuation_ = continuation; + state_->core_.caller_env_ = caller_env; + + if(caller_env->stop_token.stop_possible()) + { + state_->core_.parent_stop_callback_.emplace( + caller_env->stop_token, + when_all_core::stop_callback_fn{&state_->core_.stop_source_}); + + if(caller_env->stop_token.stop_requested()) + state_->core_.stop_source_.request_stop(); + } + + auto token = state_->core_.stop_source_.get_token(); + + // Phase 1: Create all runners without dispatching. + std::size_t index = 0; + for(auto&& a : *range_) + { + auto runner = make_when_all_homogeneous_runner( + std::move(a), state_, index); + + auto h = runner.release(); + h.promise().state_ = state_; + h.promise().index_ = index; + h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator}; + + state_->runner_handles_[index] = std::coroutine_handle<>{h}; + ++index; + } + + // Phase 2: Post all runners. Any may complete synchronously. + // After last post, state_ and this may be destroyed. + std::coroutine_handle<>* handles = state_->runner_handles_.data(); + std::size_t count = state_->runner_handles_.size(); + for(std::size_t i = 0; i < count; ++i) + caller_env->executor.post(handles[i]); + + return std::noop_coroutine(); + } + + void await_resume() const noexcept + { + } +}; + } // namespace detail /** Compute the when_all result tuple type. @@ -447,12 +575,147 @@ template // Safe without explicit acquire: capture_exception() is sequenced-before // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the // last task's decrement that resumes this coroutine. - if(state.first_exception_) - std::rethrow_exception(state.first_exception_); + if(state.core_.first_exception_) + std::rethrow_exception(state.core_.first_exception_); co_return detail::extract_results(state); } +/** Execute a range of awaitables concurrently and collect their results. + + Launches all awaitables in the range simultaneously and waits for all + to complete. Results are collected in a vector preserving input order. + If any awaitable throws, cancellation is requested for siblings and + the first exception is rethrown after all awaitables complete. + + @li All child awaitables run concurrently on the caller's executor + @li Results are returned as a vector in input order + @li First exception wins; subsequent exceptions are discarded + @li Stop is requested for siblings on first error + @li Completes only after all children have finished + + @par Thread Safety + The returned task must be awaited from a single execution context. + Child awaitables execute concurrently but complete through the caller's + executor. + + @param awaitables Range of awaitables to execute concurrently (must + not be empty). Each element must satisfy @ref IoAwaitable and is + consumed (moved-from) when `when_all` is awaited. + + @return A task yielding a vector where each element is the result of + the corresponding awaitable, in input order. + + @throws std::invalid_argument if range is empty (thrown before + coroutine suspends). + @throws Rethrows the first child exception after all children + complete. + + @par Example + @code + task example() + { + std::vector> requests; + for (auto const& url : urls) + requests.push_back(fetch(url)); + + auto responses = co_await when_all(std::move(requests)); + } + @endcode + + @see IoAwaitableRange, when_all +*/ +template + requires (!std::is_void_v>>) +[[nodiscard]] auto when_all(R&& awaitables) + -> task>>> +{ + using Awaitable = std::ranges::range_value_t; + using T = awaitable_result_t; + using OwnedRange = std::remove_cvref_t; + + auto count = std::ranges::size(awaitables); + if(count == 0) + throw std::invalid_argument("when_all requires at least one awaitable"); + + OwnedRange owned_awaitables = std::forward(awaitables); + + detail::when_all_homogeneous_state state(count); + + co_await detail::when_all_homogeneous_launcher( + &owned_awaitables, &state); + + if(state.core_.first_exception_) + std::rethrow_exception(state.core_.first_exception_); + + std::vector results; + results.reserve(count); + for(auto& opt : state.results_) + results.push_back(std::move(*opt)); + + co_return results; +} + +/** Execute a range of void awaitables concurrently. + + Launches all awaitables in the range simultaneously and waits for all + to complete. Since all awaitables return void, no results are collected. + If any awaitable throws, cancellation is requested for siblings and + the first exception is rethrown after all awaitables complete. + + @li All child awaitables run concurrently on the caller's executor + @li First exception wins; subsequent exceptions are discarded + @li Stop is requested for siblings on first error + @li Completes only after all children have finished + + @par Thread Safety + The returned task must be awaited from a single execution context. + Child awaitables execute concurrently but complete through the caller's + executor. + + @param awaitables Range of void awaitables to execute concurrently + (must not be empty). + + @throws std::invalid_argument if range is empty (thrown before + coroutine suspends). + @throws Rethrows the first child exception after all children + complete. + + @par Example + @code + task example() + { + std::vector> jobs; + for (int i = 0; i < n; ++i) + jobs.push_back(process(i)); + + co_await when_all(std::move(jobs)); + } + @endcode + + @see IoAwaitableRange, when_all +*/ +template + requires std::is_void_v>> +[[nodiscard]] auto when_all(R&& awaitables) -> task +{ + using OwnedRange = std::remove_cvref_t; + + auto count = std::ranges::size(awaitables); + if(count == 0) + throw std::invalid_argument("when_all requires at least one awaitable"); + + OwnedRange owned_awaitables = std::forward(awaitables); + + detail::when_all_homogeneous_state state(count); + + co_await detail::when_all_homogeneous_launcher( + &owned_awaitables, &state); + + if(state.core_.first_exception_) + std::rethrow_exception(state.core_.first_exception_); +} + } // namespace capy } // namespace boost diff --git a/include/boost/capy/when_any.hpp b/include/boost/capy/when_any.hpp index 93eb7249..96fdd4c4 100644 --- a/include/boost/capy/when_any.hpp +++ b/include/boost/capy/when_any.hpp @@ -577,43 +577,6 @@ template co_return std::move(*state.result_); } -/** Concept for ranges of full I/O awaitables. - - A range satisfies `IoAwaitableRange` if it is a sized input range - whose value type satisfies @ref IoAwaitable. This enables when_any - to accept any container or view of awaitables, not just std::vector. - - @tparam R The range type. - - @par Requirements - @li `R` must satisfy `std::ranges::input_range` - @li `R` must satisfy `std::ranges::sized_range` - @li `std::ranges::range_value_t` must satisfy @ref IoAwaitable - - @par Syntactic Requirements - Given `r` of type `R`: - @li `std::ranges::begin(r)` is valid - @li `std::ranges::end(r)` is valid - @li `std::ranges::size(r)` returns `std::ranges::range_size_t` - @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable - - @par Example - @code - template - task race_all(R&& awaitables) { - auto winner = co_await when_any(std::forward(awaitables)); - // Process winner... - } - @endcode - - @see when_any, IoAwaitable -*/ -template -concept IoAwaitableRange = - std::ranges::input_range && - std::ranges::sized_range && - IoAwaitable>; - namespace detail { /** Shared state for homogeneous when_any (range overload). diff --git a/test/unit/when_all.cpp b/test/unit/when_all.cpp index 99125f01..d8b0b52e 100644 --- a/test/unit/when_all.cpp +++ b/test/unit/when_all.cpp @@ -1124,5 +1124,333 @@ TEST_SUITE( when_all_strand_test, "boost.capy.when_all_strand"); +//---------------------------------------------------------- +// Range-based when_all tests +//---------------------------------------------------------- + +// Verify IoAwaitableRange concept +static_assert(IoAwaitableRange>>); +static_assert(IoAwaitableRange>>); + +struct when_all_range_test +{ + // Test: Single-element vector + void + testSingleElement() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + + std::vector> tasks; + tasks.push_back(returns_int(42)); + + run_async(ex, + [&](std::vector v) { + completed = true; + BOOST_TEST_EQ(v.size(), 1u); + BOOST_TEST_EQ(v[0], 42); + }, + [](std::exception_ptr) {})( + when_all(std::move(tasks))); + + BOOST_TEST(completed); + } + + // Test: Multiple elements, results in input order + void + testMultipleElements() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + + std::vector> tasks; + tasks.push_back(returns_int(10)); + tasks.push_back(returns_int(20)); + tasks.push_back(returns_int(30)); + + run_async(ex, + [&](std::vector v) { + completed = true; + BOOST_TEST_EQ(v.size(), 3u); + BOOST_TEST_EQ(v[0], 10); + BOOST_TEST_EQ(v[1], 20); + BOOST_TEST_EQ(v[2], 30); + }, + [](std::exception_ptr) {})( + when_all(std::move(tasks))); + + BOOST_TEST(completed); + } + + // Test: Empty range throws std::invalid_argument + void + testEmptyRange() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught = false; + + std::vector> tasks; + + run_async(ex, + [](std::vector) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (std::invalid_argument const&) { + caught = true; + } + })(when_all(std::move(tasks))); + + BOOST_TEST(caught); + } + + // Test: Void range completes successfully + void + testVoidRange() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + + std::vector> tasks; + tasks.push_back(void_task()); + tasks.push_back(void_task()); + tasks.push_back(void_task()); + + run_async(ex, + [&]() { + completed = true; + }, + [](std::exception_ptr) {})( + when_all(std::move(tasks))); + + BOOST_TEST(completed); + } + + // Test: Empty void range throws + void + testEmptyVoidRange() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught = false; + + std::vector> tasks; + + run_async(ex, + []() {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (std::invalid_argument const&) { + caught = true; + } + })(when_all(std::move(tasks))); + + BOOST_TEST(caught); + } + + // Test: Exception in one task propagates + void + testException() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + std::vector> tasks; + tasks.push_back(returns_int(1)); + tasks.push_back(throws_exception("range error")); + tasks.push_back(returns_int(3)); + + run_async(ex, + [](std::vector) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_all(std::move(tasks))); + + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "range error"); + } + + // Test: Void range exception + void + testVoidRangeException() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + + std::vector> tasks; + tasks.push_back(void_task()); + tasks.push_back(void_throws_exception("void range error")); + + run_async(ex, + []() {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const&) { + caught_exception = true; + } + })(when_all(std::move(tasks))); + + BOOST_TEST(caught_exception); + } + + // Test: All tasks complete even after stop is requested + void + testAllTasksCompleteAfterError() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + std::atomic completion_count{0}; + bool caught_exception = false; + + auto counting_task = [&]() -> task { + ++completion_count; + co_return 1; + }; + + auto failing_task = [&]() -> task { + ++completion_count; + throw_test_exception("fail"); + co_return 0; + }; + + std::vector> tasks; + tasks.push_back(counting_task()); + tasks.push_back(failing_task()); + tasks.push_back(counting_task()); + + run_async(ex, + [](std::vector) {}, + [&](std::exception_ptr) { + caught_exception = true; + })(when_all(std::move(tasks))); + + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(completion_count.load(), 3); + } + + // Test: Nested range when_all inside variadic when_all + void + testNestedRangeInVariadic() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + + auto range_task = []() -> task> { + std::vector> tasks; + tasks.push_back(returns_int(1)); + tasks.push_back(returns_int(2)); + tasks.push_back(returns_int(3)); + co_return co_await when_all(std::move(tasks)); + }; + + run_async(ex, + [&](std::tuple, int> t) { + auto& [vec, val] = t; + completed = true; + BOOST_TEST_EQ(vec.size(), 3u); + BOOST_TEST_EQ(vec[0] + vec[1] + vec[2], 6); + BOOST_TEST_EQ(val, 99); + }, + [](std::exception_ptr) {})( + when_all(range_task(), returns_int(99))); + + BOOST_TEST(completed); + } + + // Test: String results (non-trivial type) + void + testStringResults() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + + std::vector> tasks; + tasks.push_back(returns_string("first")); + tasks.push_back(returns_string("second")); + tasks.push_back(returns_string("third")); + + run_async(ex, + [&](std::vector v) { + completed = true; + BOOST_TEST_EQ(v[0], "first"); + BOOST_TEST_EQ(v[1], "second"); + BOOST_TEST_EQ(v[2], "third"); + }, + [](std::exception_ptr) {})( + when_all(std::move(tasks))); + + BOOST_TEST(completed); + } + + // Test: Range when_all on strand executor + void + testStrandRange() + { + thread_pool pool(2); + strand s{pool.get_executor()}; + std::latch done(1); + bool completed = false; + int result = 0; + + auto outer = [&]() -> task> { + std::vector> tasks; + tasks.push_back(returns_int(10)); + tasks.push_back(returns_int(20)); + co_return co_await when_all(std::move(tasks)); + }; + + run_async(s, + [&](std::vector v) { + completed = true; + result = v[0] + v[1]; + done.count_down(); + }, + [&](auto) { + done.count_down(); + } + )(outer()); + + done.wait(); + BOOST_TEST(completed); + BOOST_TEST_EQ(result, 30); + } + + void + run() + { + testSingleElement(); + testMultipleElements(); + testEmptyRange(); + testVoidRange(); + testEmptyVoidRange(); + testException(); + testVoidRangeException(); + testAllTasksCompleteAfterError(); + testNestedRangeInVariadic(); + testStringResults(); + testStrandRange(); + } +}; + +TEST_SUITE( + when_all_range_test, + "boost.capy.when_all_range"); + } // capy } // boost