Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/boost/capy/ex/detail/timer_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ class BOOST_CAPY_DECL

explicit timer_service(execution_context& ctx);

// Calls shutdown() to join the background thread.
// Handles the discard path in use_service_impl where
// a duplicate service is deleted without shutdown().
~timer_service();

/** Schedule a callback to fire after a duration.

The callback is invoked on the timer service's background
Expand Down Expand Up @@ -79,6 +84,7 @@ class BOOST_CAPY_DECL
void shutdown() override;

private:
void stop_and_join();
struct entry
{
std::chrono::steady_clock::time_point deadline;
Expand Down
15 changes: 14 additions & 1 deletion src/ex/detail/timer_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ timer_service(execution_context& ctx)
(void)ctx;
}

timer_service::
~timer_service()
{
stop_and_join();
}

timer_service::timer_id
timer_service::
schedule_at(
Expand Down Expand Up @@ -54,7 +60,7 @@ cancel(timer_id id)

void
timer_service::
shutdown()
stop_and_join()
{
{
std::lock_guard lock(mutex_);
Expand All @@ -65,6 +71,13 @@ shutdown()
thread_.join();
}

void
timer_service::
shutdown()
{
stop_and_join();
}

void
timer_service::
run()
Expand Down
28 changes: 28 additions & 0 deletions test/unit/delay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,33 @@ struct delay_test
#endif
}

// Test: concurrent delays on a multi-threaded pool
// exercises use_service race and shared timer_service
void
testConcurrentDelays()
{
constexpr int N = 10;
thread_pool pool(4);
std::latch done(N);

auto delay_task = [](int i) -> task<void>
{
co_await delay(10ms * i);
};

for(int i = 0; i < N; ++i)
{
run_async(pool.get_executor(),
[&]() { done.count_down(); },
[&](std::exception_ptr) {
done.count_down();
})(delay_task(i));
}

done.wait();
BOOST_TEST(true);
}

void
run()
{
Expand All @@ -251,6 +278,7 @@ struct delay_test
testZeroDuration();
testSequentialDelays();
testDestroyWhileSuspended();
testConcurrentDelays();
}
};

Expand Down
Loading