From 89a16a4afa19d903c4cb40b58ca4e19aed21fd25 Mon Sep 17 00:00:00 2001 From: owent Date: Wed, 22 Jun 2022 21:15:44 +0800 Subject: [PATCH] Fix compatibility --- include/libcopp/coroutine/algorithm_common.h | 203 ------------------- include/libcopp/coroutine/callable_promise.h | 200 ++++++++++++++++++ include/libcotask/task_promise.h | 4 +- test/case/task_promise_test.cpp | 2 +- 4 files changed, 203 insertions(+), 206 deletions(-) diff --git a/include/libcopp/coroutine/algorithm_common.h b/include/libcopp/coroutine/algorithm_common.h index 7df2838..77dff17 100644 --- a/include/libcopp/coroutine/algorithm_common.h +++ b/include/libcopp/coroutine/algorithm_common.h @@ -23,9 +23,6 @@ LIBCOPP_COPP_NAMESPACE_BEGIN -template -class LIBCOPP_COPP_API_HEAD_ONLY callable_future; - template struct LIBCOPP_COPP_API_HEAD_ONLY some_ready { using element_type = gsl::not_null; @@ -94,206 +91,6 @@ struct LIBCOPP_COPP_API_HEAD_ONLY pick_some_reference { inline static const TELEMENT& unwrap(const TELEMENT& input) noexcept { return input; } }; -// some delegate -template -struct LIBCOPP_COPP_API_HEAD_ONLY some_delegate_context { - using future_type = TFUTURE; - using ready_output_type = typename some_ready::type; - - std::list pending; - ready_output_type ready; - size_t ready_bound = 0; - size_t scan_bound = 0; - promise_status status = promise_status::kCreated; - promise_caller_manager::handle_delegate caller_handle = promise_caller_manager::handle_delegate(nullptr); -}; - -template -class LIBCOPP_COPP_API_HEAD_ONLY some_delegate_base { - public: - using future_type = TFUTURE; - using value_type = future_type::value_type; - using context_type = some_delegate_context; - using ready_output_type = typename context_type::ready_output_type; - using delegate_action_type = TDELEGATE_ACTION; - - private: - static void force_resume_all(context_type& context) { - for (auto& pending_future : context.pending) { - delegate_action_type::resume_future(context.caller_handle, *pending_future); - } - - if (context.status < promise_status::kDone && nullptr != context.caller_handle.promise) { - context.status = context.caller_handle.promise->get_status(); - } - - context.caller_handle = nullptr; - if (context.status < promise_status::kDone) { - context.status = promise_status::kKilled; - } - } - - static void scan_ready(context_type& context) { - auto iter = context.pending.begin(); - - while (iter != context.pending.end()) { - if (delegate_action_type::is_pending(**iter)) { - ++iter; - continue; - } - future_type& future = **iter; - context.ready.push_back(gsl::make_not_null(&future)); - iter = context.pending.erase(iter); - - delegate_action_type::resume_future(context.caller_handle, future); - } - } - - public: - class awaitable_type : public awaitable_base_type { - public: - awaitable_type(context_type* context) : context_(context) {} - - inline bool await_ready() noexcept { - if (nullptr == context_) { - return true; - } - - if (context_->status >= promise_status::kDone) { - return true; - } - - return context_->pending.empty(); - } - -# if defined(LIBCOPP_MACRO_ENABLE_CONCEPTS) && LIBCOPP_MACRO_ENABLE_CONCEPTS - template -# else - template ::value>> -# endif - inline void await_suspend(LIBCOPP_MACRO_STD_COROUTINE_NAMESPACE coroutine_handle caller) noexcept { - if (nullptr == context_ || caller.promise().get_status() >= promise_status::kDone) { - // Already done and can not suspend again - caller.resume(); - return; - } - - set_caller(caller); - - // Allow kill resume to forward error information - caller.promise().set_flag(promise_flag::kInternalWaitting, true); - - // set caller for all futures - if (!context_->caller_handle) { - context_->caller_handle = caller; - // Copy pending here, the callback may call resume and will change the pending list - std::list copy_pending = context_->pending; - for (auto& pending_future : copy_pending) { - delegate_action_type::suspend_future(context_->caller_handle, *pending_future); - } - } - } - - void await_resume() { - // caller maybe null if the callable is already ready when co_await - auto caller = get_caller(); - if (caller) { - if (nullptr != caller.promise) { - caller.promise->set_flag(promise_flag::kInternalWaitting, false); - } - set_caller(nullptr); - } - - if (nullptr == context_) { - return; - } - - ++context_->scan_bound; - if (context_->scan_bound >= context_->ready_bound) { - scan_ready(*context_); - context_->scan_bound = context_->ready.size(); - - if (context_->scan_bound >= context_->ready_bound && context_->status < promise_status::kDone) { - context_->status = promise_status::kDone; - } - } - } - - private: - context_type* context_; - }; - - public: - struct promise_type { - context_type* context_; - - promise_type(context_type* context) : context_(context) {} - promise_type(const promise_type&) = delete; - promise_type(promise_type&&) = delete; - promise_type& operator=(const promise_type&) = delete; - promise_type& operator=(promise_type&&) = delete; - ~promise_type() { - COPP_LIKELY_IF (nullptr != context_ && !!context_->caller_handle) { - force_resume_all(*context_); - } - } - - inline awaitable_type operator co_await() & { return awaitable_type{context_}; } - }; - - template - static callable_future run(ready_output_type& ready_futures, size_t ready_count, - TCONTAINER* futures) { - using container_type = typename std::decay::type>::type; - context_type context; - context.ready.reserve(gsl::size(*futures)); - - for (auto& future_object : *futures) { - auto& future_ref = - pick_some_reference::type>::unwrap(future_object); - if (delegate_action_type::is_pending(future_ref)) { - context.pending.push_back(&future_ref); - } else { - context.ready.push_back(gsl::make_not_null(&future_ref)); - } - } - - if (context.ready.size() >= ready_count) { - context.ready.swap(ready_futures); - co_return promise_status::kDone; - } - - if (ready_count >= context.pending.size() + ready_futures.size()) { - ready_count = context.pending.size() + ready_futures.size(); - } - context.ready_bound = ready_count; - context.scan_bound = context.ready.size(); - context.status = promise_status::kRunning; - - { - promise_type some_promise{&context}; - while (context.status < promise_status::kDone) { - // Killed by caller - auto current_status = co_yield callable_future::yield_status(); - if (current_status >= promise_status::kDone) { - context.status = current_status; - break; - } - - co_await some_promise; - } - - // destroy promise object and detach handles - } - - context.ready.swap(ready_futures); - co_return context.status; - } - - private: - std::shared_ptr context_; -}; - LIBCOPP_COPP_NAMESPACE_END #endif diff --git a/include/libcopp/coroutine/callable_promise.h b/include/libcopp/coroutine/callable_promise.h index c47418c..2f55da6 100644 --- a/include/libcopp/coroutine/callable_promise.h +++ b/include/libcopp/coroutine/callable_promise.h @@ -428,6 +428,206 @@ class LIBCOPP_COPP_API_HEAD_ONLY callable_future { handle_type current_handle_; }; +// some delegate +template +struct LIBCOPP_COPP_API_HEAD_ONLY some_delegate_context { + using future_type = TFUTURE; + using ready_output_type = typename some_ready::type; + + std::list pending; + ready_output_type ready; + size_t ready_bound = 0; + size_t scan_bound = 0; + promise_status status = promise_status::kCreated; + promise_caller_manager::handle_delegate caller_handle = promise_caller_manager::handle_delegate(nullptr); +}; + +template +class LIBCOPP_COPP_API_HEAD_ONLY some_delegate_base { + public: + using future_type = TFUTURE; + using value_type = typename future_type::value_type; + using context_type = some_delegate_context; + using ready_output_type = typename context_type::ready_output_type; + using delegate_action_type = TDELEGATE_ACTION; + + private: + static void force_resume_all(context_type& context) { + for (auto& pending_future : context.pending) { + delegate_action_type::resume_future(context.caller_handle, *pending_future); + } + + if (context.status < promise_status::kDone && nullptr != context.caller_handle.promise) { + context.status = context.caller_handle.promise->get_status(); + } + + context.caller_handle = nullptr; + if (context.status < promise_status::kDone) { + context.status = promise_status::kKilled; + } + } + + static void scan_ready(context_type& context) { + auto iter = context.pending.begin(); + + while (iter != context.pending.end()) { + if (delegate_action_type::is_pending(**iter)) { + ++iter; + continue; + } + future_type& future = **iter; + context.ready.push_back(gsl::make_not_null(&future)); + iter = context.pending.erase(iter); + + delegate_action_type::resume_future(context.caller_handle, future); + } + } + + public: + class awaitable_type : public awaitable_base_type { + public: + awaitable_type(context_type* context) : context_(context) {} + + inline bool await_ready() noexcept { + if (nullptr == context_) { + return true; + } + + if (context_->status >= promise_status::kDone) { + return true; + } + + return context_->pending.empty(); + } + +# if defined(LIBCOPP_MACRO_ENABLE_CONCEPTS) && LIBCOPP_MACRO_ENABLE_CONCEPTS + template +# else + template ::value>> +# endif + inline void await_suspend(LIBCOPP_MACRO_STD_COROUTINE_NAMESPACE coroutine_handle caller) noexcept { + if (nullptr == context_ || caller.promise().get_status() >= promise_status::kDone) { + // Already done and can not suspend again + caller.resume(); + return; + } + + set_caller(caller); + + // Allow kill resume to forward error information + caller.promise().set_flag(promise_flag::kInternalWaitting, true); + + // set caller for all futures + if (!context_->caller_handle) { + context_->caller_handle = caller; + // Copy pending here, the callback may call resume and will change the pending list + std::list copy_pending = context_->pending; + for (auto& pending_future : copy_pending) { + delegate_action_type::suspend_future(context_->caller_handle, *pending_future); + } + } + } + + void await_resume() { + // caller maybe null if the callable is already ready when co_await + auto caller = get_caller(); + if (caller) { + if (nullptr != caller.promise) { + caller.promise->set_flag(promise_flag::kInternalWaitting, false); + } + set_caller(nullptr); + } + + if (nullptr == context_) { + return; + } + + ++context_->scan_bound; + if (context_->scan_bound >= context_->ready_bound) { + scan_ready(*context_); + context_->scan_bound = context_->ready.size(); + + if (context_->scan_bound >= context_->ready_bound && context_->status < promise_status::kDone) { + context_->status = promise_status::kDone; + } + } + } + + private: + context_type* context_; + }; + + public: + struct promise_type { + context_type* context_; + + promise_type(context_type* context) : context_(context) {} + promise_type(const promise_type&) = delete; + promise_type(promise_type&&) = delete; + promise_type& operator=(const promise_type&) = delete; + promise_type& operator=(promise_type&&) = delete; + ~promise_type() { + COPP_LIKELY_IF (nullptr != context_ && !!context_->caller_handle) { + force_resume_all(*context_); + } + } + + inline awaitable_type operator co_await() & { return awaitable_type{context_}; } + }; + + template + static callable_future run(ready_output_type& ready_futures, size_t ready_count, + TCONTAINER* futures) { + using container_type = typename std::decay::type>::type; + context_type context; + context.ready.reserve(gsl::size(*futures)); + + for (auto& future_object : *futures) { + auto& future_ref = + pick_some_reference::type>::unwrap(future_object); + if (delegate_action_type::is_pending(future_ref)) { + context.pending.push_back(&future_ref); + } else { + context.ready.push_back(gsl::make_not_null(&future_ref)); + } + } + + if (context.ready.size() >= ready_count) { + context.ready.swap(ready_futures); + co_return promise_status::kDone; + } + + if (ready_count >= context.pending.size() + ready_futures.size()) { + ready_count = context.pending.size() + ready_futures.size(); + } + context.ready_bound = ready_count; + context.scan_bound = context.ready.size(); + context.status = promise_status::kRunning; + + { + promise_type some_promise{&context}; + while (context.status < promise_status::kDone) { + // Killed by caller + auto current_status = co_yield callable_future::yield_status(); + if (current_status >= promise_status::kDone) { + context.status = current_status; + break; + } + + co_await some_promise; + } + + // destroy promise object and detach handles + } + + context.ready.swap(ready_futures); + co_return context.status; + } + + private: + std::shared_ptr context_; +}; + // some template struct LIBCOPP_COPP_API_HEAD_ONLY some_delegate_callable_action { diff --git a/include/libcotask/task_promise.h b/include/libcotask/task_promise.h index e17d954..01edcf9 100644 --- a/include/libcotask/task_promise.h +++ b/include/libcotask/task_promise.h @@ -219,7 +219,7 @@ class LIBCOPP_COPP_API_HEAD_ONLY task_context_delegate : public ta template friend class LIBCOPP_COPP_API_HEAD_ONLY task_awaitable_base; - template + template friend struct LIBCOPP_COPP_API_HEAD_ONLY some_delegate_task_action; using base_type::add_caller; @@ -276,7 +276,7 @@ class LIBCOPP_COPP_API_HEAD_ONLY task_context_delegate : public t template friend class LIBCOPP_COPP_API_HEAD_ONLY task_awaitable_base; - template + template friend struct LIBCOPP_COPP_API_HEAD_ONLY some_delegate_task_action; using base_type::add_caller; diff --git a/test/case/task_promise_test.cpp b/test/case/task_promise_test.cpp index 6701bde..5e2b5f7 100644 --- a/test/case/task_promise_test.cpp +++ b/test/case/task_promise_test.cpp @@ -1363,7 +1363,7 @@ static cotask::task_future task_func_some_any_all_callable_suspend() ++g_task_future_suspend_generator_count; g_task_future_pending_int_contexts.push_back(ctx); }; - auto resume_callback = [](const generator_future_int_type::context_type &ctx) { + auto resume_callback = [](const generator_future_int_type::context_type &) { ++g_task_future_resume_generator_count; };