代码拉取完成,页面将自动刷新
同步操作将从 魏兆华/StudyCpp 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
//Rewrite from https://github.com/David-Haim/concurrencpp
//Test successfully on VS2019 16.11.3
#include <cassert>
#include <chrono>
#include <coroutine>
#include <deque>
#include <iostream>
#include <list>
#include <mutex>
#include <semaphore>
#include <set>
#include <span>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
#include <Windows.h> //for thread support on OS
namespace concurrencpp
{
namespace errors //声明了一堆异常,构造函数直接 using
{
struct empty_object : public std::runtime_error { using runtime_error::runtime_error; };
struct empty_result : public empty_object { using empty_object::empty_object; };
struct empty_result_promise : public empty_object { using empty_object::empty_object; };
struct empty_awaitable : public empty_object { using empty_object::empty_object; };
struct empty_timer : public empty_object { using empty_object::empty_object; };
struct broken_task : public std::runtime_error { using runtime_error::runtime_error; };
struct result_already_retrieved : public std::runtime_error { using runtime_error::runtime_error; };
struct runtime_shutdown : public std::runtime_error { using runtime_error::runtime_error; };
} //namespace errors
namespace details
{
class await_context //主要用于 resume_on_awaitable
{
private:
std::coroutine_handle<void> m_caller_handle; //协程调用者
std::exception_ptr m_interrupt_exception; //中断异常
public:
void resume() noexcept
{
assert(static_cast<bool>(m_caller_handle));
assert(!m_caller_handle.done());
m_caller_handle(); //协程有效且未结束,直接调用,恢复执行
}
void set_coro_handle(std::coroutine_handle<void> coro_handle) noexcept
{
assert(!static_cast<bool>(m_caller_handle));
assert(static_cast<bool>(coro_handle));
assert(!coro_handle.done());
m_caller_handle = coro_handle; //设置协程调用者
}
void set_interrupt(const std::exception_ptr& interrupt) noexcept
{
assert(m_interrupt_exception == nullptr);
assert(static_cast<bool>(interrupt));
m_interrupt_exception = interrupt; //设置中断异常
}
void throw_if_interrupted() const //有异常就抛出
{
if (m_interrupt_exception != nullptr)
std::rethrow_exception(m_interrupt_exception);
}
}; //class await_context
class wait_context //主要用于 result_state 等待结果,期间通过 consumer_context
{
private:
std::mutex m_lock;
std::condition_variable m_condition;
bool m_ready = false;
public:
void wait() noexcept //等待就绪信号
{
std::unique_lock<std::mutex> lock(m_lock);
m_condition.wait(lock, [this] { return m_ready; });
}
bool wait_for(size_t milliseconds) noexcept //等待就绪信号或超时
{
std::unique_lock<std::mutex> lock(m_lock);
return m_condition.wait_for(lock, std::chrono::milliseconds(milliseconds), [this]{ return m_ready; });
}
void notify() noexcept //设定就绪信号,通知所有等待线程
{
{ std::unique_lock<std::mutex> lock(m_lock); m_ready = true; }
m_condition.notify_all();
}
}; //class wait_context
class result_state_base;
class when_any_context //主要用于 when_any_awaitable
{
private:
std::atomic_bool m_fulfilled = false;
result_state_base* m_completed_result = nullptr;
std::coroutine_handle<void> m_coro_handle;
public:
when_any_context(std::coroutine_handle<void> coro_handle) noexcept : m_coro_handle(coro_handle) {}
bool fulfilled() const noexcept { return m_fulfilled.load(std::memory_order_acquire); }
result_state_base* completed_result() const noexcept
{
assert(m_completed_result != nullptr);
return m_completed_result;
}
void try_resume(result_state_base* completed_result) noexcept //对结果赋值后尝试恢复协程
{
assert(completed_result != nullptr);
const auto already_resumed = m_fulfilled.exchange(true, std::memory_order_acq_rel);
if (already_resumed) return;
assert(m_completed_result == nullptr);
m_completed_result = completed_result;
assert(static_cast<bool>(m_coro_handle));
m_coro_handle();
}
}; //class when_any_context
enum class result_status { idle, value, exception };
template<class type>
class producer_context //负责通过 build_result/exception 生成结果值或异常,由 get 获取
{
union storage
{
type object;
std::exception_ptr exception;
storage() noexcept {}
~storage() noexcept {}
};
private:
storage m_storage;
result_status m_status = result_status::idle;
public:
~producer_context() noexcept
{
switch (m_status) {
case result_status::value: m_storage.object.~type(); break; //注意这种析构用法,type 是模板参数
case result_status::exception: m_storage.exception.~exception_ptr(); break;
case result_status::idle: break;
default: assert(false);
}
}
producer_context& operator=(producer_context&& rhs) noexcept
{
assert(m_status == result_status::idle);
m_status = std::exchange(rhs.m_status, result_status::idle); //交换值
switch (m_status) {
case result_status::value:
{
new (std::addressof(m_storage.object)) type(std::move(rhs.m_storage.object)); //预分配构造
rhs.m_storage.object.~type();
break;
}
case result_status::exception:
{
new (std::addressof(m_storage.exception)) std::exception_ptr(rhs.m_storage.exception);
rhs.m_storage.exception.~exception_ptr();
break;
}
case result_status::idle: break;
default: assert(false);
}
return *this;
}
template<class... argument_types>
void build_result(argument_types&&... arguments) noexcept(noexcept(type(std::forward<argument_types>(arguments)...)))
{ //生成类型结果。只要相应构造函数 noexcept,则此函数 noexcept
assert(m_status == result_status::idle);
new (std::addressof(m_storage.object)) type(std::forward<argument_types>(arguments)...);
m_status = result_status::value;
}
void build_exception(const std::exception_ptr& exception) noexcept //异常的构造一定是 noexcept
{
assert(m_status == result_status::idle);
new (std::addressof(m_storage.exception)) std::exception_ptr(exception);
m_status = result_status::exception;
}
result_status status() const noexcept { return m_status; }
type get() { return std::move(get_ref()); } //会调用移动构造函数
type& get_ref()
{
assert(m_status != result_status::idle);
if (m_status == result_status::value)
return m_storage.object;
assert(m_status == result_status::exception);
assert(static_cast<bool>(m_storage.exception));
std::rethrow_exception(m_storage.exception); //发生了异常,就在 get() 时重新抛出
}
};
template<>
class producer_context<void>
{
union storage
{
std::exception_ptr exception{};
storage() noexcept {}
~storage() noexcept {}
};
private:
storage m_storage;
result_status m_status = result_status::idle;
public:
~producer_context() noexcept
{
if (m_status == result_status::exception)
m_storage.exception.~exception_ptr();
}
producer_context& operator=(producer_context&& rhs) noexcept
{
assert(m_status == result_status::idle);
m_status = std::exchange(rhs.m_status, result_status::idle);
if (m_status == result_status::exception) {
new (std::addressof(m_storage.exception)) std::exception_ptr(rhs.m_storage.exception);
rhs.m_storage.exception.~exception_ptr();
}
return *this;
}
void build_result() noexcept
{
assert(m_status == result_status::idle);
m_status = result_status::value;
}
void build_exception(const std::exception_ptr& exception) noexcept
{
assert(m_status == result_status::idle);
new (std::addressof(m_storage.exception)) std::exception_ptr(exception);
m_status = result_status::exception;
}
result_status status() const noexcept { return m_status; }
void get() const { get_ref(); }
void get_ref() const
{
assert(m_status != result_status::idle);
if (m_status == result_status::exception) {
assert(static_cast<bool>(m_storage.exception));
std::rethrow_exception(m_storage.exception);
}
}
};
template<class type>
class producer_context<type&>
{
union storage
{
type* pointer; //此时内部保存的是原始指针
std::exception_ptr exception;
storage() noexcept {}
~storage() noexcept {}
};
private:
storage m_storage;
result_status m_status = result_status::idle;
public:
~producer_context() noexcept
{
if (m_status == result_status::exception)
m_storage.exception.~exception_ptr();
}
producer_context& operator=(producer_context&& rhs) noexcept
{
assert(m_status == result_status::idle);
m_status = std::exchange(rhs.m_status, result_status::idle);
switch (m_status) {
case result_status::value: m_storage.pointer = rhs.m_storage.pointer; break;
case result_status::exception:
{
new (std::addressof(m_storage.exception)) std::exception_ptr(rhs.m_storage.exception);
rhs.m_storage.exception.~exception_ptr();
break;
}
case result_status::idle: break;
default: assert(false);
}
return *this;
}
void build_result(type& reference) noexcept
{
assert(m_status == result_status::idle);
auto pointer = std::addressof(reference);
assert(pointer != nullptr);
assert(reinterpret_cast<size_t>(pointer) % alignof(type) == 0); //确保对齐
m_storage.pointer = pointer;
m_status = result_status::value;
}
void build_exception(const std::exception_ptr& exception) noexcept
{
assert(m_status == result_status::idle);
new (std::addressof(m_storage.exception)) std::exception_ptr(exception);
m_status = result_status::exception;
}
result_status status() const noexcept { return m_status; }
type& get() const { return get_ref(); }
type& get_ref() const
{
assert(m_status != result_status::idle);
if (m_status == result_status::value) {
assert(m_storage.pointer != nullptr);
assert(reinterpret_cast<size_t>(m_storage.pointer) % alignof(type) == 0);
return *m_storage.pointer;
}
assert(m_status == result_status::exception);
assert(static_cast<bool>(m_storage.exception));
std::rethrow_exception(m_storage.exception);
}
}; //class producer_context
class consumer_context //负责结果获取
{
private:
enum class consumer_status { idle, await, wait, when_any }; //决定 storage 使用哪个成员
union storage
{
std::coroutine_handle<void> caller_handle{}; //协程使用,用它 resume
std::shared_ptr<wait_context> wait_ctx;
std::shared_ptr<when_any_context> when_any_ctx;
template<class type, class... argument_type>
static void build(type& o, argument_type&&... arguments) noexcept
{ //使用这种通用构造方式,来根据类型分别构造关联协程或同/异步上下文,避免了 switch
new (std::addressof(o)) type(std::forward<argument_type>(arguments)...); //构造对象,用的引用
}
template<class type>
static void destroy(type& o) noexcept { o.~type(); } //销毁,也是引用
storage() noexcept {}
~storage() noexcept {}
};
private:
consumer_status m_status = consumer_status::idle;
storage m_storage;
public:
~consumer_context() noexcept { clear(); }
void clear() noexcept
{
const auto status = std::exchange(m_status, consumer_status::idle);
switch (status) { //根据状态,清理关联的协程或同/异步等待上下文
case consumer_status::idle: return;
case consumer_status::await: storage::destroy(m_storage.caller_handle); return;
case consumer_status::wait: storage::destroy(m_storage.wait_ctx); return;
case consumer_status::when_any: storage::destroy(m_storage.when_any_ctx); return;
}
assert(false);
}
void resume_consumer(result_state_base* self) const noexcept
{
switch (m_status) {
case consumer_status::idle: return;
case consumer_status::await:
{
auto caller_handle = m_storage.caller_handle;
assert(static_cast<bool>(caller_handle));
assert(!caller_handle.done());
return caller_handle(); //coroutine 继续执行,即 resume
}
case consumer_status::wait:
{
const auto wait_ctx = m_storage.wait_ctx;
assert(static_cast<bool>(wait_ctx));
return wait_ctx->notify(); //唤醒继续
}
case consumer_status::when_any:
{
const auto when_any_ctx = m_storage.when_any_ctx;
return when_any_ctx->try_resume(self); //尝试赋值并恢复
}
}
assert(false);
}
void set_await_handle(std::coroutine_handle<void> caller_handle) noexcept
{
assert(m_status == consumer_status::idle);
m_status = consumer_status::await;
storage::build(m_storage.caller_handle, caller_handle);
}
void set_wait_context(const std::shared_ptr<wait_context>& wait_ctx) noexcept
{
assert(m_status == consumer_status::idle);
m_status = consumer_status::wait;
storage::build(m_storage.wait_ctx, wait_ctx); //内部调用了 wait_context 的默认移动构造函数
}
void set_when_any_context(const std::shared_ptr<when_any_context>& when_any_ctx) noexcept
{
assert(m_status == consumer_status::idle);
m_status = consumer_status::when_any;
storage::build(m_storage.when_any_ctx, when_any_ctx);
}
}; //class consumer_context
class result_state_base
{
public:
enum class pc_state { idle, consumer_set, consumer_done, producer_done }; //生产/消费状态
protected:
std::atomic<pc_state> m_pc_state { pc_state::idle };
consumer_context m_consumer; //消费者,负责取结果
std::coroutine_handle<void> m_done_handle; //用在 complete_producer/consumer
void assert_done() const noexcept { assert(m_pc_state.load(std::memory_order_relaxed) == pc_state::producer_done); }
public:
//三种方式等待生成结果(即 producer_done),期间将状态设为 consumer_set,后面还有一个 wait_for
// cosumer_set 之后,要么 try_rewind_consumer 倒回 idle,要么 complete_producer 唤醒消费者
void wait() //注意这个函数不是 noexcept,原因是 wait_context 可能创建失败
{
const auto state = m_pc_state.load(std::memory_order_acquire);
if (state == pc_state::producer_done) return; //已经有结果,就不用再等了
auto wait_ctx = std::make_shared<wait_context>(); //这里有可能会抛出异常。除此行外其他都是 noexcept
m_consumer.set_wait_context(wait_ctx);
auto expected_state = pc_state::idle; //既然结果没出来,应该还是 idle,设成 consumer_set 表示已经在被等待了
const auto idle = m_pc_state.compare_exchange_strong(expected_state, pc_state::consumer_set, std::memory_order_acq_rel);
if (!idle) { assert_done(); return; } //非 idle,一定是有结果了,同样不等待直接返回
wait_ctx->wait(); //否则就等待,直到 wait_ctx.m_ready
assert_done(); //结束等待时一定是有结果了
}
bool await(std::coroutine_handle<void> caller_handle) noexcept
{
const auto state = m_pc_state.load(std::memory_order_acquire);
if (state == pc_state::producer_done) return false;
m_consumer.set_await_handle(caller_handle); //句柄交给 consumer,以便将来可以 resume
auto expected_state = pc_state::idle;
const auto idle = m_pc_state.compare_exchange_strong(expected_state, pc_state::consumer_set, std::memory_order_acq_rel);
if (!idle) assert_done();
return idle; //为真表示还没有结果,需要挂起
}
pc_state when_any(const std::shared_ptr<when_any_context>& when_any_state) noexcept
{
const auto state = m_pc_state.load(std::memory_order_acquire);
if (state == pc_state::producer_done) return state;
m_consumer.set_when_any_context(when_any_state);
auto expected_state = pc_state::idle;
const auto idle = m_pc_state.compare_exchange_strong(expected_state, pc_state::consumer_set, std::memory_order_acq_rel);
if (!idle) assert_done();
return state; //调用时的起始状态,大概率是空闲
}
void try_rewind_consumer() noexcept //尝试将状态从 consumer_set 倒回 idle,在 when_any_awaitable.await_resume 里调用
{
const auto pc_state = m_pc_state.load(std::memory_order_acquire);
if (pc_state != pc_state::consumer_set) return; //仅限于 consumer_set 状态
auto expected_consumer_state = pc_state::consumer_set; //准备设置成空闲
const auto consumer = m_pc_state.compare_exchange_strong(expected_consumer_state, pc_state::idle, std::memory_order_acq_rel);
if (!consumer) { assert_done(); return; } //不成功,一定是 producer_done 了,直接返回
m_consumer.clear(); //成功设成空闲,清除 consumer
}
};
template<class type>
class result_state : public result_state_base
{
private:
producer_context<type> m_producer; //result_base 同时拥有生产者和消费者。消费者在 base 里
//自删除静态函数,用在 complete_producer/consumer 里,当生产者、消费者均完成时做删除
static void delete_self(std::coroutine_handle<void> done_handle, result_state<type>* state) noexcept
{
if (static_cast<bool>(done_handle)) { assert(done_handle.done()); return done_handle.destroy(); }
delete state; //基本调用情况:参数一为 m_done_handle(默认空);参数二为 this,所以一般是删除自己
}
template<class callable_type>
void from_callable(std::true_type /*is_void_type*/, callable_type&& callable) { callable(); set_result(); }
template<class callable_type>
void from_callable(std::false_type /*is_void_type*/, callable_type&& callable) { set_result(callable()); }
public:
template<class... argument_types>
void set_result(argument_types&&... arguments) noexcept(noexcept(type(std::forward<argument_types>(arguments)...)))
{
m_producer.build_result(std::forward<argument_types>(arguments)...);
}
void set_exception(const std::exception_ptr& error) noexcept
{
assert(error != nullptr);
m_producer.build_exception(error);
}
result_status status() const noexcept //主要供 wait_until 使用
{
const auto state = m_pc_state.load(std::memory_order_acquire);
assert(state != pc_state::consumer_set); //一定不是 consumer_set。为啥?
if (state == pc_state::idle) return result_status::idle;
return m_producer.status(); //idle、value、exception
}
template<class duration_unit, class ratio>
result_status wait_for(std::chrono::duration<duration_unit, ratio> duration)
{
const auto state_0 = m_pc_state.load(std::memory_order_acquire);
if (state_0 == pc_state::producer_done) return m_producer.status();
auto wait_ctx = std::make_shared<wait_context>();
m_consumer.set_wait_context(wait_ctx);
auto expected_idle_state = pc_state::idle; //空闲,则转换为 consumer_set,表示消费者已在等待
const auto idle_0 = m_pc_state.compare_exchange_strong(expected_idle_state, pc_state::consumer_set, std::memory_order_acq_rel);
if (!idle_0) { assert_done(); return m_producer.status(); }
const auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
if (wait_ctx->wait_for(static_cast<size_t>(ms + 1))) { assert_done(); return m_producer.status(); } //没有超时就完成了
//超时了,状态应该是 consumer_set,将其设为 idle。若不成功,说明生产者设定了结果,直接返回;若成功就不再等了,清理消费者
auto expected_consumer_state = pc_state::consumer_set;
const auto idle_1 = m_pc_state.compare_exchange_strong(expected_consumer_state, pc_state::idle, std::memory_order_acq_rel);
if (!idle_1) { assert_done(); return m_producer.status(); }
m_consumer.clear(); //生产者已经不再尝试访问消费者,所以要清理
return result_status::idle;
}
template<class clock, class duration>
result_status wait_until(const std::chrono::time_point<clock, duration>& timeout_time) //主要用在时间队列里
{
const auto now = clock::now();
if (timeout_time <= now) return status();
const auto diff = timeout_time - now;
return wait_for(diff);
}
type get() { assert_done(); return m_producer.get(); } //result.get 调用之前,已经先 wait 过了,故必定已经有结果
void initialize_producer_from(producer_context<type>& producer_ctx) noexcept { producer_ctx = std::move(m_producer); }
template<class callable_type>
void from_callable(callable_type&& callable) noexcept //在 result_promise 的 set_from_function 中调用
{
using is_void = std::is_same<type, void>;
try {
from_callable(is_void{}, std::forward<callable_type>(callable)); //成功了,设置结果
} catch (...) {
set_exception(std::current_exception()); //有异常,保存好等待调用时抛出
}
}
//两个 complete 函数用于清理生产者/消费者的状态,一个可以等另一个,两个都完成时就销毁 this
//complete_producer 主要用在 result_publisher、result_coro_promise 里,生成结果后/删除前调用
void complete_producer(result_state_base* self /*for when_any*/, std::coroutine_handle<void> done_handle = {}) noexcept
{
m_done_handle = done_handle; //存好 handle,将来 complete_consumer 销毁要用到
const auto state_before = m_pc_state.exchange(pc_state::producer_done, std::memory_order_acq_rel);
assert(state_before != pc_state::producer_done); //producer_done 只在这里设定,不能二次重复
switch (state_before) {
case pc_state::consumer_set: m_consumer.resume_consumer(self); return; //消费者挂起在等,恢复后由它完成自删除
case pc_state::idle: return;
case pc_state::consumer_done: return delete_self(done_handle, this); //complete_consumer 中设定的,值取走了当然要销毁
default: break;
}
assert(false);
}
void complete_consumer() noexcept //主要用在 awaitable::get() 之后,取走结果值之后调用
{
const auto pc_state = m_pc_state.load(std::memory_order_acquire);
if (pc_state == pc_state::producer_done) return delete_self(m_done_handle, this); //取完值了,要么 producer_done,要么 idle
const auto pc_state1 = m_pc_state.exchange(pc_state::consumer_done, std::memory_order_acq_rel);
assert(pc_state1 != pc_state::consumer_set); //一定不是 consumer_set。要么流程没走完不会调用,要么上面已经自删除了
if (pc_state1 == pc_state::producer_done) return delete_self(m_done_handle, this);
assert(pc_state1 == pc_state::idle);
}
}; //class result_state
template<class type>
struct consumer_result_state_deleter //用于各类 result
{
void operator()(result_state<type>* state_ptr) { assert(state_ptr != nullptr); state_ptr->complete_consumer(); }
};
template<class type>
using consumer_result_state_ptr = std::unique_ptr<result_state<type>, consumer_result_state_deleter<type>>;
template<class type>
struct producer_result_state_deleter //用于各类 promise
{
void operator()(result_state<type>* state_ptr) { assert(state_ptr != nullptr); state_ptr->complete_producer(state_ptr); }
};
template<class type>
using producer_result_state_ptr = std::unique_ptr<result_state<type>, producer_result_state_deleter<type>>;
template<class callable_type>
auto&& bind(callable_type&& callable) { return std::forward<callable_type>(callable); } //无参,直接调用
template<class callable_type, class... argument_types>
auto bind(callable_type&& callable, argument_types&&... arguments)
{
constexpr static auto inti = std::is_nothrow_invocable_v<callable_type, argument_types...>;
return [callable = std::forward<callable_type>(callable),
tuple = std::make_tuple(std::forward<argument_types>(arguments)...)]() mutable noexcept(inti) -> decltype(auto) {
return std::apply(callable, tuple); //带参数调用,参数打包在 tuple 里
};
}
template<class callable_type>
auto&& bind_with_try_catch_impl(std::true_type, callable_type&& callable) //noexcept,无需捕获异常
{
return std::forward<callable_type>(callable);
}
template<class callable_type>
auto bind_with_try_catch_impl(std::false_type, callable_type&& callable)
{
return [callable = std::forward<callable_type>(callable)]() mutable noexcept { //异常不会溢出,所以是 noexcept
try { callable(); } catch (...) {} }; //捕获异常无需处理,防止溢出即可
}
template<class callable_type>
auto bind_with_try_catch(callable_type&& callable)
{
using is_noexcept = typename std::is_nothrow_invocable<callable_type>::type;
return bind_with_try_catch_impl(is_noexcept{}, std::forward<callable_type>(callable));
}
template<class callable_type, class... argument_types>
auto bind_with_try_catch(callable_type&& callable, argument_types&&... arguments)
{
return bind_with_try_catch(bind(std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...));
}
class await_via_functor //主要用于 task,以及 resume_on_awaitable,以恢复协程运行
{
private:
await_context* m_ctx;
public:
await_via_functor(await_context* ctx) noexcept : m_ctx(ctx) {}
await_via_functor(await_via_functor&& rhs) noexcept : m_ctx(rhs.m_ctx) { rhs.m_ctx = nullptr; }
~await_via_functor() noexcept
{
if (m_ctx == nullptr) return; //若上下文非空,设定异常后恢复协程执行
m_ctx->set_interrupt(std::make_exception_ptr(errors::broken_task("result - Associated task was interrupted abnormally")));
m_ctx->resume();
}
void operator()() noexcept
{
assert(m_ctx != nullptr);
const auto await_context = std::exchange(m_ctx, nullptr); //m_ctx 置空,赋给临时变量
await_context->resume(); //关联协程恢复执行
}
}; //class await_via_functor
class coroutine_handle_functor //用于 task,直接恢复协程执行。await_via_functor 则多了一层包装
{
private:
std::coroutine_handle<void> m_coro_handle;
public:
coroutine_handle_functor() noexcept : m_coro_handle() {}
coroutine_handle_functor(const coroutine_handle_functor&) = delete;
coroutine_handle_functor& operator=(const coroutine_handle_functor&) = delete;
coroutine_handle_functor(std::coroutine_handle<void> coro_handle) noexcept : m_coro_handle(coro_handle) {}
coroutine_handle_functor(coroutine_handle_functor&& rhs) noexcept : m_coro_handle(std::exchange(rhs.m_coro_handle, {})) {}
~coroutine_handle_functor() noexcept { if (static_cast<bool>(m_coro_handle)) m_coro_handle.destroy(); }
void execute_destroy() noexcept { auto coro_handle = std::exchange(m_coro_handle, {}); coro_handle(); } //有活就干,干完就删
void operator()() noexcept { execute_destroy(); }
}; //class coroutine_handle_functor
template<class type>
class awaitable_base : public std::suspend_always //awaitable、resolve_awaitable 的基类
{
protected:
consumer_result_state_ptr<type> m_state; //释放会调用 complete_consumer
public:
awaitable_base(consumer_result_state_ptr<type> state) noexcept : m_state(std::move(state)) {}
awaitable_base(const awaitable_base&) = delete;
awaitable_base(awaitable_base&&) = delete; //移动构造也不允许
}; //class awaitable_base
[[noreturn]] void throw_runtime_shutdown_exception(std::string_view executor_name)
{
const auto error_msg = std::string(executor_name) + " - shutdown has been called on this executor.";
throw errors::runtime_shutdown(error_msg);
}
std::string make_executor_worker_name(std::string_view executor_name)
{
return std::string(executor_name) + " worker";
}
struct executor_bulk_tag {};
class when_result_helper;
} //namespace details
template<class type>
class awaitable : public details::awaitable_base<type> //result 的 co_await 操作符返回 awaitable,它定义了一系列函数
{
public:
awaitable(details::consumer_result_state_ptr<type> state) noexcept : details::awaitable_base<type>(std::move(state)) {}
//无 await_ready,视为返回 false,会接着调用 await_suspend;若返回 true 将跳过 await_suspend 直接 await_resume
//await_suspend 返回类型也可以是 void,此时视同返回 true,无条件挂起,交给 caller/resumer
bool await_suspend(std::coroutine_handle<void> caller_handle) noexcept
{
assert(static_cast<bool>(this->m_state));
return this->m_state->await(caller_handle); //句柄交给 state,返回真表示需要挂起(m_pc_state 在 idle,结果还没出来)
}
type await_resume() //协程恢复后即调用此函数,获取返回值
{
auto state = std::move(this->m_state); //函数返回后即销毁,期间调用 complete_consumer,因为取完值了
return state->get(); //result_state->get() 再调用 m_producer->get() 获取值
}
}; //class awaitable
template<class type>
class result; //for resolve_awaitable.await_resume
template<class type>
class resolve_awaitable : public details::awaitable_base<type> //用在 result::resolve 里
{
public:
resolve_awaitable(details::consumer_result_state_ptr<type> state) noexcept : details::awaitable_base<type>(std::move(state)) {}
resolve_awaitable(resolve_awaitable&&) noexcept = delete;
resolve_awaitable(const resolve_awaitable&) noexcept = delete;
bool await_suspend(std::coroutine_handle<void> caller_handle) noexcept
{
assert(static_cast<bool>(this->m_state));
return this->m_state->await(caller_handle);
}
result<type> await_resume() { return result<type>(std::move(this->m_state)); } //与 awaitable 相比,返回值多包了一个 result<>
}; //class resolve_awaitable
template<class type>
class result //将 result_state 再进行一个包装,使之能够自动调用 complete_consumer/producer
{
static constexpr auto valid_result_type_v = std::is_same_v<type, void> || std::is_nothrow_move_constructible_v<type>;
static_assert(valid_result_type_v, "result<type> - <<type>> should be no-throw-move constructable or void.");
friend class details::when_result_helper; //前面有一个前置声明,定义在后面
private:
details::consumer_result_state_ptr<type> m_state;
void throw_if_empty(const char* message) const { if (static_cast<bool>(!m_state)) throw errors::empty_result(message); }
public:
result() noexcept = default;
result(result&& rhs) noexcept = default;
result(details::consumer_result_state_ptr<type> state) noexcept : m_state(std::move(state)) {}
result(details::result_state<type>* state) noexcept : m_state(state) {} //可以直接从原生指针构造
result(const result& rhs) = delete;
result& operator=(const result& rhs) = delete;
result& operator=(result&& rhs) noexcept { if (this != &rhs) m_state = std::move(rhs.m_state); return *this; }
explicit operator bool() const noexcept { return static_cast<bool>(m_state); }
details::result_status status() const { throw_if_empty("result::status() - result is empty."); return m_state->status(); }
void wait() const { throw_if_empty("result::wait() - result is empty."); m_state->wait(); }
template<class duration_type, class ratio_type>
details::result_status wait_for(std::chrono::duration<duration_type, ratio_type> duration) const
{
throw_if_empty("result::wait_for() - result is empty.");
return m_state->wait_for(duration);
}
template<class clock_type, class duration_type>
details::result_status wait_until(std::chrono::time_point<clock_type, duration_type> timeout_time) const
{
throw_if_empty("result::wait_until() - result is empty.");
return m_state->wait_until(timeout_time);
}
type get()
{
throw_if_empty("result::get() - result is empty.");
auto state = std::move(m_state); //移动交给一个临时变量,返回即自动释放并调用 complete_consumer
state->wait(); //调用 wait 函数,无需等待就直接返回,需要就等到结果生成
return state->get(); //最后由 m_producer.get 来取得结果
}
auto operator co_await() //重载运算符,使用 awaitable<type>,否则自己要定义 await_* 系列函数
{
throw_if_empty("result::operator co_await() - result is empty.");
return awaitable<type>{ std::move(m_state) }; //会随后调用此类型的 await_* 系列函数
//如果结果已有,立即恢复;否则挂起等待结果生成,生成时会通知已经挂起的协程恢复,去取值或者抛出异常
}
auto resolve()
{
throw_if_empty("result::resolve() - result is empty.");
return resolve_awaitable<type>{ std::move(m_state) };
}
}; //class result
template<class type>
class lazy_result;
namespace details
{
struct lazy_final_awaiter : public std::suspend_always
{
template<class promise_type>
std::coroutine_handle<void> await_suspend(std::coroutine_handle<promise_type> handle) noexcept
{
return handle.promise().resume_caller();
}
};
class lazy_result_state_base
{
protected:
std::coroutine_handle<void> m_caller_handle;
public:
std::coroutine_handle<void> resume_caller() const noexcept { return m_caller_handle; }
std::coroutine_handle<void> await(std::coroutine_handle<void> caller_handle) noexcept
{
m_caller_handle = caller_handle;
return std::coroutine_handle<lazy_result_state_base>::from_promise(*this);
}
};
template<class type>
class lazy_result_state : public lazy_result_state_base
{
private:
producer_context<type> m_producer;
public:
lazy_result<type> get_return_object() noexcept
{
const auto self_handle = std::coroutine_handle<lazy_result_state>::from_promise(*this);
return lazy_result<type>(self_handle);
}
void unhandled_exception() noexcept { m_producer.build_exception(std::current_exception()); }
std::suspend_always initial_suspend() const noexcept { return {}; } //lazy 总是挂起
lazy_final_awaiter final_suspend() const noexcept { return {}; }
template<class... argument_types>
void set_result(argument_types&&... arguments) noexcept(noexcept(type(std::forward<argument_types>(arguments)...)))
{
m_producer.build_result(std::forward<argument_types>(arguments)...);
}
result_status status() const noexcept { return m_producer.status(); }
type get() { return m_producer.get(); }
};
} //namespace details
template<class type>
class lazy_awaitable
{
private:
const std::coroutine_handle<details::lazy_result_state<type>> m_state;
public:
lazy_awaitable(std::coroutine_handle<details::lazy_result_state<type>> state) noexcept
: m_state(state)
{
assert(static_cast<bool>(state));
}
lazy_awaitable(const lazy_awaitable&) = delete;
lazy_awaitable(lazy_awaitable&&) = delete;
~lazy_awaitable() noexcept { auto state = m_state; state.destroy(); }
bool await_ready() const noexcept { return m_state.done(); }
std::coroutine_handle<void> await_suspend(std::coroutine_handle<void> caller_handle) noexcept
{
return m_state.promise().await(caller_handle);
}
type await_resume() { return m_state.promise().get(); }
}; //class lazy_awaitable
template<class type>
class lazy_resolve_awaitable
{
private:
std::coroutine_handle<details::lazy_result_state<type>> m_state;
public:
lazy_resolve_awaitable(std::coroutine_handle<details::lazy_result_state<type>> state) noexcept
: m_state(state)
{
assert(static_cast<bool>(state));
}
lazy_resolve_awaitable(const lazy_resolve_awaitable&) = delete;
lazy_resolve_awaitable(lazy_resolve_awaitable&&) = delete;
~lazy_resolve_awaitable() noexcept { if (static_cast<bool>(m_state)) m_state.destroy(); }
bool await_ready() const noexcept { return m_state.done(); }
std::coroutine_handle<void> await_suspend(std::coroutine_handle<void> caller_handle) noexcept
{
return m_state.promise().await(caller_handle);
}
lazy_result<type> await_resume() { return { std::exchange(m_state, {}) }; }
}; //class lazy_resolve_awaitable
template<class type>
class lazy_result //启动关联延迟任务,并将其值交给调用者
{
private:
std::coroutine_handle<details::lazy_result_state<type>> m_state;
void throw_if_empty(const char* err_msg) const
{
if (!static_cast<bool>(m_state)) throw errors::empty_result(err_msg);
}
result<type> run_impl()
{
lazy_result self(std::move(*this));
co_return co_await self;
}
public:
lazy_result() noexcept = default;
lazy_result(lazy_result&& rhs) noexcept : m_state(std::exchange(rhs.m_state, {})) {}
lazy_result(std::coroutine_handle<details::lazy_result_state<type>> state) noexcept : m_state(state) {}
~lazy_result() noexcept { if (static_cast<bool>(m_state)) m_state.destroy(); }
lazy_result& operator=(lazy_result&& rhs) noexcept
{
if (&rhs == this) return *this;
if (static_cast<bool>(m_state)) m_state.destroy();
m_state = std::exchange(rhs.m_state, {});
return *this;
}
explicit operator bool() const noexcept { return static_cast<bool>(m_state); }
details::result_status status() const { throw_if_empty("."); return m_state.promise().status(); }
auto operator co_await() { throw_if_empty("."); return lazy_awaitable<type>{ std::exchange(m_state, {}) }; }
auto resolve() { throw_if_empty("."); return lazy_resolve_awaitable<type>{ std::exchange(m_state, {}) }; }
result<type> run() { throw_if_empty("."); return run_impl(); } //内联运行关联任务(转换成迫切任务)
}; //class lazy_result
namespace details
{
//return_value_struct 其实是一个包装,底类型 type/void,再包装成 derived_type,分别定义了两个 return,以支持 co_return
template<class derived_type, class type>
struct return_value_struct
{
template<class return_type>
void return_value(return_type&& value)
{
auto self = static_cast<derived_type*>(this);
self->set_result(std::forward<return_type>(value)); //从这里调用 promise->set_result
}
};
template<class derived_type>
struct return_value_struct<derived_type, void>
{
void return_void() noexcept
{
auto self = static_cast<derived_type*>(this);
self->set_result();
}
};
struct initialy_resumed_promise
{
std::suspend_never initial_suspend() const noexcept { return {}; } //除 lazy_result 外都是立即执行不挂起
};
//result_publisher 只用在 result_coro_promise 作为 final_suspend 的返回值
struct result_publisher : public std::suspend_always //挂起,caller/resumer 销毁生产者后再恢复
{
template<class promise_type> //带 promise_type 是因为要使用 handle.promise()
void await_suspend(std::coroutine_handle<promise_type> handle) const noexcept //void 视同 true
{
handle.promise().complete_producer(handle); //promise 类型是 result_coro_promise
}
};
template<class type> //result_coro_promise<type> 正好继承自 type,这种类型容易绕晕
struct result_coro_promise : public return_value_struct<result_coro_promise<type>, type>
{
private:
result_state<type> m_result_state;
public:
template<class... argument_types> //从 return_value/void 函数调用过来
void set_result(argument_types&&... arguments) noexcept(noexcept(type(std::forward<argument_types>(arguments)...)))
{
this->m_result_state.set_result(std::forward<argument_types>(arguments)...); //最后交给 m_producer.build_result
}
void unhandled_exception() noexcept { this->m_result_state.set_exception(std::current_exception()); }
result<type> get_return_object() noexcept { return { &m_result_state }; } //result<type> 都是从这里构建而得
//从 final_suspend 返回的 result_publisher.await_suspend 调用过来,确保生产者销毁完毕
void complete_producer(std::coroutine_handle<void> done_handle) noexcept
{
this->m_result_state.complete_producer(&m_result_state, done_handle);
}
result_publisher final_suspend() const noexcept { return {}; } //协程结束释放前是否要挂起
};
//result_coro_promise 实现了 get_return_object、final_suspend、unhandled_exception
//return_value_struct 基类实现了 return_value/return_void;initialy_resumed_promise 实现了 initial_suspend
template<class return_type>
struct initialy_resumed_result_promise : public initialy_resumed_promise, public result_coro_promise<return_type> {};
template<class type>
struct lazy_promise : lazy_result_state<type>, public return_value_struct<lazy_promise<type>, type> {};
} //namespace details
template<class type>
class shared_result;
namespace details
{
struct shared_await_context
{
shared_await_context* next = nullptr;
std::coroutine_handle<void> caller_handle;
};
class shared_result_state_base
{
protected:
std::atomic_bool m_ready{ false };
mutable std::mutex m_lock;
shared_await_context* m_awaiters = nullptr;
std::optional<std::condition_variable> m_condition;
void await_impl(std::unique_lock<std::mutex>& lock, shared_await_context& awaiter) noexcept
{
assert(lock.owns_lock());
if (m_awaiters == nullptr) { m_awaiters = &awaiter; return; }
awaiter.next = m_awaiters;
m_awaiters = &awaiter;
}
void wait_impl(std::unique_lock<std::mutex>& lock) noexcept
{
assert(lock.owns_lock());
if (!m_condition.has_value()) m_condition.emplace();
m_condition.value().wait(lock, [this] { return m_ready.load(std::memory_order_relaxed); });
}
bool wait_for_impl(std::unique_lock<std::mutex>& lock, std::chrono::milliseconds ms) noexcept
{
assert(lock.owns_lock());
if (!m_condition.has_value()) m_condition.emplace();
return m_condition.value().wait_for(lock, ms, [this] { return m_ready.load(std::memory_order_relaxed); });
}
public:
void complete_producer() noexcept
{
shared_await_context* awaiters;
{
std::unique_lock<std::mutex> lock(m_lock);
awaiters = std::exchange(m_awaiters, nullptr);
m_ready.store(true, std::memory_order_release);
if (m_condition.has_value()) m_condition.value().notify_all();
}
while (awaiters != nullptr) {
const auto next = awaiters->next;
awaiters->caller_handle();
awaiters = next;
}
}
bool await(shared_await_context& awaiter) noexcept
{
if (m_ready.load(std::memory_order_acquire)) return false;
{
std::unique_lock<std::mutex> lock(m_lock);
if (m_ready.load(std::memory_order_acquire)) return false;
await_impl(lock, awaiter);
}
return true;
}
void wait() noexcept
{
if (m_ready.load(std::memory_order_acquire)) return;
{
std::unique_lock<std::mutex> lock(m_lock);
if (m_ready.load(std::memory_order_acquire)) return;
wait_impl(lock);
}
}
}; //class shared_result_state_base
template<class type>
class shared_result_state final : public shared_result_state_base
{
private:
producer_context<type> m_producer;
void assert_done() const noexcept
{
assert(m_ready.load(std::memory_order_acquire));
assert(m_producer.status() != result_status::idle);
}
public:
result_status status() const noexcept
{
if (!m_ready.load(std::memory_order_acquire)) return result_status::idle;
return m_producer.status();
}
template<class duration_unit, class ratio>
result_status wait_for(std::chrono::duration<duration_unit, ratio> duration) noexcept
{
if (m_ready.load(std::memory_order_acquire)) return m_producer.status();
const auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(duration) + std::chrono::milliseconds(1);
std::unique_lock<std::mutex> lock(m_lock);
if (m_ready.load(std::memory_order_acquire)) return m_producer.status();
const auto ready = wait_for_impl(lock, ms);
if (ready) { assert_done(); return m_producer.status(); }
lock.unlock();
return result_status::idle;
}
template<class clock, class duration>
result_status wait_until(const std::chrono::time_point<clock, duration>& timeout_time) noexcept
{
const auto now = clock::now();
if (timeout_time <= now) return status();
const auto diff = timeout_time - now;
return wait_for(diff);
}
template<class... argument_types>
void set_result(argument_types&&... arguments) noexcept(noexcept(type(std::forward<argument_types>(arguments)...)))
{
m_producer.build_result(std::forward<argument_types>(arguments)...);
}
std::add_lvalue_reference_t<type> get() { return m_producer.get_ref(); }
void unhandled_exception() noexcept { m_producer.build_exception(std::current_exception()); }
}; //class shared_result_state
struct shared_result_publisher : public std::suspend_always
{
template<class promise_type>
bool await_suspend(std::coroutine_handle<promise_type> handle) const noexcept
{
handle.promise().complete_producer();
return false;
}
};
template<class type>
class shared_result_promise : public return_value_struct<shared_result_promise<type>, type>
{
private:
const std::shared_ptr<shared_result_state<type>> m_state = std::make_shared<shared_result_state<type>>();
public:
template<class... argument_types>
void set_result(argument_types&&... arguments) noexcept(noexcept(type(std::forward<argument_types>(arguments)...)))
{
m_state->set_result(std::forward<argument_types>(arguments)...);
}
void unhandled_exception() noexcept { m_state->unhandled_exception(); }
shared_result<type> get_return_object() noexcept { return shared_result<type> {m_state}; }
std::suspend_never initial_suspend() const noexcept { return {}; }
shared_result_publisher final_suspend() const noexcept { return {}; }
void complete_producer() noexcept { m_state->complete_producer(); }
};
template<class type>
class shared_awaitable_base : public std::suspend_always
{
protected:
std::shared_ptr<shared_result_state<type>> m_state;
public:
shared_awaitable_base(const std::shared_ptr<shared_result_state<type>>& state) noexcept : m_state(state) {}
shared_awaitable_base(const shared_awaitable_base&) = delete;
shared_awaitable_base(shared_awaitable_base&&) = delete;
};
struct shared_result_tag {};
} //namespace details
template<class type>
class shared_awaitable : public details::shared_awaitable_base<type>
{
private:
details::shared_await_context m_await_ctx;
public:
shared_awaitable(const std::shared_ptr<details::shared_result_state<type>>& state) noexcept
: details::shared_awaitable_base<type>(state) {}
bool await_suspend(std::coroutine_handle<void> caller_handle) noexcept
{
assert(static_cast<bool>(this->m_state));
this->m_await_ctx.caller_handle = caller_handle;
return this->m_state->await(m_await_ctx);
}
std::add_lvalue_reference_t<type> await_resume() { return this->m_state->get(); }
}; //class shared_awaitable
template<class type>
class shared_resolve_awaitable : public details::shared_awaitable_base<type>
{
private:
details::shared_await_context m_await_ctx;
public:
shared_resolve_awaitable(const std::shared_ptr<details::shared_result_state<type>>& state) noexcept
: details::shared_awaitable_base<type>(state) {}
bool await_suspend(std::coroutine_handle<void> caller_handle) noexcept
{
assert(static_cast<bool>(this->m_state));
this->m_await_ctx.caller_handle = caller_handle;
return this->m_state->await(m_await_ctx);
}
shared_result<type> await_resume() { return shared_result<type>(std::move(this->m_state)); }
}; //class shared_resolve_awaitable
template<class type>
class shared_result //允许多个消费者取同一个结果
{
private:
std::shared_ptr<details::shared_result_state<type>> m_state; //共享指针
static shared_result<type> make_shared_result(details::shared_result_tag, result<type> result)
{
co_return co_await result; //result => shared_result
}
void throw_if_empty(const char* message) const
{
if (!static_cast<bool>(m_state)) throw errors::empty_result(message);
}
public:
shared_result() noexcept = default;
~shared_result() noexcept = default;
shared_result(std::shared_ptr<details::shared_result_state<type>> state) noexcept : m_state(std::move(state)) {}
shared_result(result<type> rhs)
{
if (!static_cast<bool>(rhs)) return;
*this = make_shared_result({}, std::move(rhs));
}
shared_result(const shared_result& rhs) noexcept = default;
shared_result(shared_result&& rhs) noexcept = default;
shared_result& operator=(const shared_result& rhs) noexcept
{
if (this != &rhs && m_state != rhs.m_state) m_state = rhs.m_state;
return *this;
}
shared_result& operator=(shared_result&& rhs) noexcept
{
if (this != &rhs && m_state != rhs.m_state) m_state = std::move(rhs.m_state);
return *this;
}
operator bool() const noexcept { return static_cast<bool>(m_state.get()); }
details::result_status status() const { throw_if_empty("."); return m_state->status(); }
void wait() { throw_if_empty("."); m_state->wait(); }
template<class duration_type, class ratio_type>
details::result_status wait_for(std::chrono::duration<duration_type, ratio_type> duration)
{
throw_if_empty(".");
return m_state->wait_for(duration);
}
template<class clock_type, class duration_type>
details::result_status wait_until(std::chrono::time_point<clock_type, duration_type> timeout_time)
{
throw_if_empty(".");
return m_state->wait_until(timeout_time);
}
std::add_lvalue_reference_t<type> get()
{
throw_if_empty(".");
m_state->wait();
return m_state->get();
}
auto operator co_await() { throw_if_empty("."); return shared_awaitable<type> {m_state}; }
auto resolve() { throw_if_empty("."); return shared_resolve_awaitable<type> {m_state}; }
}; //class shared_result
} //namespace concurrencpp
//返回类型指定 promise_type,才能在 executor 中使用 co_return
template<class type, class... arguments>
struct std::coroutine_traits<concurrencpp::result<type>, arguments...>
{
using promise_type = concurrencpp::details::initialy_resumed_result_promise<type>;
};
template<class type, class... arguments>
struct std::coroutine_traits<::concurrencpp::lazy_result<type>, arguments...>
{
using promise_type = concurrencpp::details::lazy_promise<type>;
};
template<class type>
struct std::coroutine_traits<::concurrencpp::shared_result<type>, concurrencpp::details::shared_result_tag, concurrencpp::result<type>>
{
using promise_type = concurrencpp::details::shared_result_promise<type>;
};
namespace concurrencpp
{
template<class type>
class result_promise //生产者的一个包装,典型应用场景是与第三方代码交互
{
static constexpr auto valid_result_type_v = std::is_same_v<type, void> || std::is_nothrow_move_constructible_v<type>;
static_assert(valid_result_type_v, "result_promise<type> - <<type>> should be now-throw-move constructable or void.");
private:
details::producer_result_state_ptr<type> m_state;
bool m_result_retrieved;
void throw_if_empty(const char* message) const { if (!static_cast<bool>(m_state)) throw errors::empty_result_promise(message); }
void break_task_if_needed() noexcept //还没取结果就要销毁,则抛出一个中断异常
{
if (!static_cast<bool>(m_state)) return;
if (!m_result_retrieved) return; //状态空的,或者结果还没取到,直接返回。除此之外就要异常了
auto exception_ptr = std::make_exception_ptr(errors::broken_task("result - Associated task was interrupted abnormally"));
m_state->set_exception(exception_ptr);
m_state.reset(); //重置指针,原有指针通过 deleter 销毁,后调用 complete_producer
}
public:
result_promise() : m_state(new details::result_state<type>()), m_result_retrieved(false) {}
result_promise(result_promise&& rhs) noexcept : m_state(std::move(rhs.m_state)), m_result_retrieved(rhs.m_result_retrieved) {}
result_promise(const result_promise&) = delete;
result_promise& operator=(const result_promise&) = delete;
~result_promise() noexcept { break_task_if_needed(); }
result_promise& operator=(result_promise&& rhs) noexcept
{
if (this != &rhs) {
break_task_if_needed();
m_state = std::move(rhs.m_state);
m_result_retrieved = rhs.m_result_retrieved;
}
return *this;
}
explicit operator bool() const noexcept { return static_cast<bool>(m_state); }
template<class... argument_types>
void set_result(argument_types&&... arguments)
{
constexpr auto is_constructable = std::is_constructible_v<type, argument_types...> || std::is_same_v<void, type>;
static_assert(is_constructable, "result_promise::set_result() - <<type>> is not constructable from <<arguments...>>");
throw_if_empty("result_promise::set_result() - empty result_promise.");
m_state->set_result(std::forward<argument_types>(arguments)...);
m_state.reset();
}
void set_exception(std::exception_ptr exception_ptr)
{
throw_if_empty("result_promise::set_exception() - empty result_promise.");
if (!static_cast<bool>(exception_ptr))
throw std::invalid_argument("result_promise::set_exception() - exception pointer is null.");
m_state->set_exception(exception_ptr);
m_state.reset();
}
template<class callable_type, class... argument_types>
void set_from_function(callable_type&& callable, argument_types&&... args) noexcept
{
constexpr auto is_invokable = std::is_invocable_r_v<type, callable_type, argument_types...>;
static_assert(is_invokable, "result_promise::set_from_function() - function(args...) is not invokable or its return type can't be used to construct <<type>>");
throw_if_empty("result_promise::set_from_function() - empty result_promise.");
m_state->from_callable(details::bind(std::forward<callable_type>(callable), std::forward<argument_types>(args)...));
m_state.reset();
}
result<type> get_result()
{
throw_if_empty("result::get() - result is empty.");
if (m_result_retrieved)
throw errors::result_already_retrieved("result_promise::get_result() - result was already retrieved.");
m_result_retrieved = true;
return result<type>(m_state.get());
}
}; //class result_promise
namespace details
{
struct vtable //用在 task 类中
{
void (*move_destroy_fn)(void* src, void* dst) noexcept;
void (*execute_destroy_fn)(void* target);
void (*destroy_fn)(void* target) noexcept;
vtable(const vtable&) noexcept = default;
constexpr vtable() noexcept : move_destroy_fn(nullptr), execute_destroy_fn(nullptr), destroy_fn(nullptr) {}
constexpr vtable(decltype(move_destroy_fn) move_destroy_fn, decltype(execute_destroy_fn) execute_destroy_fn,
decltype(destroy_fn) destroy_fn) noexcept : move_destroy_fn(move_destroy_fn),
execute_destroy_fn(execute_destroy_fn), destroy_fn(destroy_fn)
{
}
//普通可拷贝可析构:没有自定义移动销毁函数
static constexpr bool trivially_copiable_destructible(decltype(move_destroy_fn) move_fn) noexcept { return move_fn == nullptr; }
//普通可析构:没有自定义销毁函数
static constexpr bool trivially_destructable(decltype(destroy_fn) destroy_fn) noexcept { return destroy_fn == nullptr; }
}; //struct vtable
template<class callable_type>
class callable_vtable //用在 task 类中
{
private:
static callable_type* inline_ptr(void* src) noexcept { return static_cast<callable_type*>(src); } //单层转换
static callable_type* allocated_ptr(void* src) noexcept { return *static_cast<callable_type**>(src); } //双层脱壳
static callable_type*& allocated_ref_ptr(void* src) noexcept { return *static_cast<callable_type**>(src); } //双层脱壳引用
static void move_destroy_inline(void* src, void* dst) noexcept //单层指针
{
auto callable_ptr = inline_ptr(src);
new (dst) callable_type(std::move(*callable_ptr)); //根据指针所指对象移动构造
callable_ptr->~callable_type(); //再调用指针所指对象的析构函数
}
static void move_destroy_allocated(void* src, void* dst) noexcept //双层指针
{
auto callable_ptr = std::exchange(allocated_ref_ptr(src), nullptr); //src交换为空,callable_ptr指向实际目标
new (dst) callable_type* (callable_ptr); //直接填入实际目标指针
}
static void execute_destroy_inline(void* target)
{
auto callable_ptr = inline_ptr(target);
(*callable_ptr)();
callable_ptr->~callable_type(); //调用刚刚执行完的执行体析构函数
}
static void execute_destroy_allocated(void* target)
{
auto callable_ptr = allocated_ptr(target); //双层脱壳为单层指针
(*callable_ptr)();
delete callable_ptr; //第二层指针直接销毁释放
}
static void destroy_inline(void* target) noexcept
{
auto callable_ptr = inline_ptr(target);
callable_ptr->~callable_type();
}
static void destroy_allocated(void* target) noexcept
{
auto callable_ptr = allocated_ptr(target);
delete callable_ptr;
}
static constexpr vtable make_vtable() noexcept
{
void (*move_destroy_fn)(void* src, void* dst) noexcept = nullptr;
void (*destroy_fn)(void* target) noexcept = nullptr;
if constexpr (std::is_trivially_copy_constructible_v<callable_type> && std::is_trivially_destructible_v<callable_type>)
move_destroy_fn = nullptr;
else //有自定义的移动或析构函数
move_destroy_fn = move_destroy;
if constexpr (std::is_trivially_destructible_v<callable_type>)
destroy_fn = nullptr;
else //有自定义析构函数
destroy_fn = destroy;
return vtable(move_destroy_fn, execute_destroy, destroy_fn);
}
template<class passed_callable_type>
static void build_inlinable(void* dst, passed_callable_type&& callable)
{
new (dst) callable_type(std::forward<passed_callable_type>(callable)); //直接构造对象
}
template<class passed_callable_type>
static void build_allocated(void* dst, passed_callable_type&& callable)
{
auto new_ptr = new callable_type(std::forward<passed_callable_type>(callable));
new (dst) callable_type* (new_ptr); //先分配构造好对象,再分配其指针,将对象指针填入
}
public:
static constexpr bool is_inlinable() noexcept //1.移动构造无异常;2.双层指针内存占用不会越界
{
return std::is_nothrow_move_constructible_v<callable_type> && sizeof(callable_type) <= 64 - sizeof(void*);
}
template<class passed_callable_type>
static void build(void* dst, passed_callable_type&& callable)
{
if (is_inlinable()) return build_inlinable(dst, std::forward<passed_callable_type>(callable));
build_allocated(dst, std::forward<passed_callable_type>(callable));
}
static void move_destroy(void* src, void* dst) noexcept
{
assert(src != nullptr && dst != nullptr);
if (is_inlinable()) return move_destroy_inline(src, dst);
return move_destroy_allocated(src, dst);
}
static void execute_destroy(void* target)
{
assert(target != nullptr);
if (is_inlinable()) return execute_destroy_inline(target);
return execute_destroy_allocated(target);
}
static void destroy(void* target) noexcept
{
assert(target != nullptr);
if (is_inlinable()) return destroy_inline(target);
return destroy_allocated(target);
}
static constexpr callable_type* as(void* src) noexcept
{
if (is_inlinable()) return inline_ptr(src);
return allocated_ptr(src);
}
static constexpr inline vtable s_vtable = make_vtable();
}; //class callable_vtable
} //namespace details
class task
{
private:
alignas(std::max_align_t) std::byte m_buffer[64 - sizeof(void*)];
const details::vtable* m_vtable;
void build(task&& rhs) noexcept
{
m_vtable = std::exchange(rhs.m_vtable, nullptr); //先把 vtable 交换过来,再对 buffer 区别操作
if (m_vtable == nullptr) return;
//coroutine_handle_functor 和 await_via_functor 有自定义移动和析构函数,所以要使用 move_destroy
if (contains<details::coroutine_handle_functor>(m_vtable))
return details::callable_vtable<details::coroutine_handle_functor>::move_destroy(rhs.m_buffer, m_buffer);
if (contains<details::await_via_functor>(m_vtable))
return details::callable_vtable<details::await_via_functor>::move_destroy(rhs.m_buffer, m_buffer);
const auto move_destroy_fn = m_vtable->move_destroy_fn;
if (details::vtable::trivially_copiable_destructible(move_destroy_fn)) { //若没有定义 move_destroy_fn
std::memcpy(m_buffer, rhs.m_buffer, 64 - sizeof(void*)); //直接进行移动操作
return;
}
move_destroy_fn(rhs.m_buffer, m_buffer); //否则使用自定义函数进行移动
}
void build(std::coroutine_handle<void> coro_handle) noexcept
{
build(details::coroutine_handle_functor{ coro_handle }); //协程句柄包装成函数子,作为 callable_type
}
template<class callable_type>
void build(callable_type&& callable)
{
using decayed_type = typename std::decay_t<callable_type>;
details::callable_vtable<decayed_type>::build(m_buffer, std::forward<callable_type>(callable)); //buffer 对应的 dst
m_vtable = &details::callable_vtable<decayed_type>::s_vtable;
}
template<class callable_type>
static bool contains(const details::vtable* const vtable) noexcept
{
return vtable == &details::callable_vtable<callable_type>::s_vtable; //是否包含/实现了 callable_type
}
public:
task() noexcept : m_buffer(), m_vtable(nullptr) {}
task(task&& rhs) noexcept { build(std::move(rhs)); }
task(const task& rhs) = delete;
task& operator=(const task&& rhs) = delete;
template<class callable_type>
task(callable_type&& callable) { build(std::forward<callable_type>(callable)); }
~task() noexcept { clear(); }
void operator()()
{
const auto vtable = std::exchange(m_vtable, nullptr); //交换出来,执行后返回都会销毁
if (vtable == nullptr) return;
if (contains<details::coroutine_handle_functor>(vtable)) //这两种执行子,就调 execute_destroy 函数
return details::callable_vtable<details::coroutine_handle_functor>::execute_destroy(m_buffer);
if (contains<details::await_via_functor>(vtable))
return details::callable_vtable<details::await_via_functor>::execute_destroy(m_buffer);
vtable->execute_destroy_fn(m_buffer); //都不是,调用函数。这个函数不可能空
}
task& operator=(task&& rhs) noexcept
{
if (this == &rhs) return *this;
clear();
build(std::move(rhs));
return *this;
}
void clear() noexcept
{
if (m_vtable == nullptr) return;
const auto vtable = std::exchange(m_vtable, nullptr);
if (contains<details::coroutine_handle_functor>(vtable))
return details::callable_vtable<details::coroutine_handle_functor>::destroy(m_buffer);
if (contains<details::await_via_functor>(vtable))
return details::callable_vtable<details::await_via_functor>::destroy(m_buffer);
auto destroy_fn = vtable->destroy_fn;
if (details::vtable::trivially_destructable(destroy_fn)) return; //可能为空,是则直接返回
destroy_fn(m_buffer);
}
explicit operator bool() const noexcept { return m_vtable != nullptr; }
template<class callable_type>
bool contains() const noexcept
{
using decayed_type = typename std::decay_t<callable_type>;
if constexpr (std::is_same_v<decayed_type, std::coroutine_handle<void>>)
return contains<details::coroutine_handle_functor>();
return m_vtable == &details::callable_vtable<decayed_type>::s_vtable;
}
}; //class task
struct executor_tag {};
class executor
{
private:
template<class return_type, class executor_type, class callable_type, class... argument_types>
static result<return_type> submit_bridge(executor_tag, executor_type&, callable_type callable, argument_types... arguments)
{
co_return callable(arguments...);
}
template<class callable_type, typename return_type = std::invoke_result_t<callable_type>>
static result<return_type> bulk_submit_bridge(details::executor_bulk_tag, std::vector<task>& accumulator, callable_type callable)
{
co_return callable(); //没用到 accumulator
}
protected:
template<class executor_type, class callable_type, class... argument_types>
static void do_post(executor_type& executor_ref, callable_type&& callable, argument_types&&... arguments)
{
static_assert(std::is_invocable_v<callable_type, argument_types...>,
"executor::post - <<callable_type>> is not invokable with <<argument_types...>>");
executor_ref.enqueue(details::bind_with_try_catch(std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...));
}
template<class executor_type, class callable_type, class... argument_types>
static auto do_submit(executor_type& executor_ref, callable_type&& callable, argument_types&&... arguments)
{
static_assert(std::is_invocable_v<callable_type, argument_types...>,
"executor::submit - <<callable_type>> is not invokable with <<argument_types...>>");
using return_type = typename std::invoke_result_t<callable_type, argument_types...>;
return submit_bridge<return_type>({}, executor_ref, std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
}
template<class executor_type, class callable_type>
static void do_bulk_post(executor_type& executor_ref, std::span<callable_type> callable_list)
{
assert(!callable_list.empty());
std::vector<task> tasks;
tasks.reserve(callable_list.size());
for (auto& callable : callable_list) tasks.emplace_back(details::bind_with_try_catch(std::move(callable)));
std::span<task> span = tasks;
executor_ref.enqueue(span);
}
template<class executor_type, class callable_type, class return_type = std::invoke_result_t<callable_type>>
static std::vector<result<return_type>> do_bulk_submit(executor_type& executor_ref, std::span<callable_type> callable_list)
{
std::vector<task> accumulator;
accumulator.reserve(callable_list.size());
std::vector<result<return_type>> results;
results.reserve(callable_list.size());
for (auto& callable : callable_list) results.emplace_back(bulk_submit_bridge<callable_type>({}, accumulator, std::move(callable)));
assert(!accumulator.empty()); //TODO:上面的任务是怎么加到这里来的?bulk_submit_bridge 似乎没用到 accumulator
std::span<task> span = accumulator;
executor_ref.enqueue(span);
return results;
}
public:
executor(std::string_view name) : name(name) {}
virtual ~executor() noexcept = default;
const std::string name;
virtual void enqueue(task task) = 0;
virtual void enqueue(std::span<task> tasks) = 0;
virtual int max_concurrency_level() const noexcept = 0;
virtual bool shutdown_requested() const noexcept = 0;
virtual void shutdown() noexcept = 0;
template<class callable_type, class... argument_types>
void post(callable_type&& callable, argument_types&&... arguments) //不带返回值的任务,enqueue
{
return do_post(*this, std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
}
template<class callable_type, class... argument_types>
auto submit(callable_type&& callable, argument_types&&... arguments) //带返回值的任务,submit_bridge
{
return do_submit(*this, std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
}
template<class callable_type>
void bulk_post(std::span<callable_type> callable_list) { return do_bulk_post(*this, callable_list); }
template<class callable_type, class return_type = std::invoke_result_t<callable_type>>
std::vector<result<return_type>> bulk_submit(std::span<callable_type> callable_list) { return do_bulk_submit(*this, callable_list); }
}; //class executor
template<class sequence_type>
struct when_any_result
{
std::size_t index;
sequence_type results;
when_any_result() noexcept : index(static_cast<size_t>(-1)) {}
template<class... result_types>
when_any_result(size_t index, result_types&&... results) noexcept :
index(index), results(std::forward<result_types>(results)...) {}
when_any_result(when_any_result&&) noexcept = default;
when_any_result& operator=(when_any_result&&) noexcept = default;
};
namespace details
{
class when_result_helper
{
private:
template<class type>
static void throw_if_empty_single(const char* error_message, const result<type>& result)
{
if (!static_cast<bool>(result)) throw errors::empty_result(error_message);
}
static void throw_if_empty_impl(const char* error_message) noexcept { (void)error_message; }
template<class type, class... result_types>
static void throw_if_empty_impl(const char* error_message, const result<type>& result, result_types&&... results)
{
throw_if_empty_single(error_message, result); //模板递归,依次扔出异常
throw_if_empty_impl(error_message, std::forward<result_types>(results)...);
}
template<class type>
static result_state_base* get_state_base(result<type>& result) noexcept { return result.m_state.get(); }
template<std::size_t... is, typename tuple_type>
static result_state_base* at_impl(std::index_sequence<is...>, tuple_type& tuple, size_t n) noexcept
{
result_state_base* bases[] = { get_state_base(std::get<is>(tuple))... }; //构造 result_state_base 数组
return bases[n]; //返回其中第 n 项
}
public:
template<typename tuple_type>
static result_state_base* at(tuple_type& tuple, size_t n) noexcept //获取 tuple 中的第 n 项
{
auto seq = std::make_index_sequence<std::tuple_size<tuple_type>::value>();
return at_impl(seq, tuple, n);
}
template<class... result_types>
static void throw_if_empty_tuple(const char* error_message, result_types&&... results) //遍历查空,有则异常
{
throw_if_empty_impl(error_message, std::forward<result_types>(results)...);
}
template<class iterator_type>
static void throw_if_empty_range(const char* error_message, iterator_type begin, iterator_type end)
{
for (; begin != end; ++begin) throw_if_empty_single(error_message, *begin);
}
class when_all_awaitable
{
private:
result_state_base& m_state;
public:
when_all_awaitable(result_state_base& state) noexcept : m_state(state) {}
bool await_ready() const noexcept { return false; }
bool await_suspend(std::coroutine_handle<void> coro_handle) noexcept { return m_state.await(coro_handle); }
void await_resume() const noexcept {}
};
template<class result_types>
class when_any_awaitable
{
private:
std::shared_ptr<when_any_context> m_promise;
result_types& m_results;
template<class type> //这里几个 type 类型应该是 result<type>
static result_state_base* get_at(std::vector<type>& vector, size_t i) noexcept { return get_state_base(vector[i]); }
template<class type>
static size_t size(const std::vector<type>& vector) noexcept { return vector.size(); }
template<class... types>
static result_state_base* get_at(std::tuple<types...>& tuple, size_t i) noexcept { return at(tuple, i); }
template<class... types>
static size_t size(std::tuple<types...>& tuple) noexcept { return std::tuple_size_v<std::tuple<types...>>; }
public:
when_any_awaitable(result_types& results) noexcept : m_results(results) {}
bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<void> coro_handle) //返回 void,一定挂起
{
m_promise = std::make_shared<when_any_context>(coro_handle);
const auto range_length = size(m_results);
for (size_t i = 0; i < range_length; i++) {
if (m_promise->fulfilled()) return; //有一个达成了,就返回
auto state_ptr = get_at(m_results, i);
const auto status = state_ptr->when_any(m_promise); //result_state_base::when_any,会设置 consumer_set
if (status == result_state_base::pc_state::producer_done) {
m_promise->try_resume(state_ptr);
return; //完成生产,尝试恢复后返回
}
}
}
size_t await_resume() noexcept
{
#undef max
const auto completed_result_state = m_promise->completed_result();
auto completed_result_index = std::numeric_limits<size_t>::max();
const auto range_length = size(m_results);
for (size_t i = 0; i < range_length; i++) {
auto state_ptr = get_at(m_results, i);
state_ptr->try_rewind_consumer(); //尝试将所有 consumer_set 倒回 idle,注意下一行没有 break
if (completed_result_state == state_ptr) completed_result_index = i; //找到完成的
}
assert(completed_result_index != std::numeric_limits<size_t>::max());
return completed_result_index; //返回的是完成者的索引号
}
}; //class when_any_awaitable
}; //class when_result_helper
template<class... result_types>
result<std::tuple<typename std::decay<result_types>::type...>> when_all_impl(result_types&&... results)
{
std::tuple<typename std::decay<result_types>::type...> tuple = std::make_tuple(std::forward<result_types>(results)...);
for (size_t i = 0; i < std::tuple_size_v<decltype(tuple)>; i++) {
auto state_ptr = when_result_helper::at(tuple, i);
co_await when_result_helper::when_all_awaitable{ *state_ptr };
}
co_return std::move(tuple);
}
template<class iterator_type>
result<std::vector<typename std::iterator_traits<iterator_type>::value_type>> when_all_impl(iterator_type begin, iterator_type end)
{
using type = typename std::iterator_traits<iterator_type>::value_type;
if (begin == end)
co_return std::vector<type> {};
std::vector<type> vector{ std::make_move_iterator(begin), std::make_move_iterator(end) };
for (auto& result : vector)
result = co_await result.resolve();
co_return std::move(vector);
}
template<class... result_types>
result<when_any_result<std::tuple<result_types...>>> when_any_impl(result_types&&... results)
{
using tuple_type = std::tuple<result_types...>;
tuple_type tuple = std::make_tuple(std::forward<result_types>(results)...);
const auto completed_index = co_await when_result_helper::when_any_awaitable<tuple_type> {tuple};
co_return when_any_result<tuple_type> {completed_index, std::move(tuple)};
}
template<class iterator_type>
result<when_any_result<std::vector<typename std::iterator_traits<iterator_type>::value_type>>>
when_any_impl(iterator_type begin, iterator_type end)
{
using type = typename std::iterator_traits<iterator_type>::value_type;
std::vector<type> vector {std::make_move_iterator(begin), std::make_move_iterator(end)};
const auto completed_index = co_await when_result_helper::when_any_awaitable {vector};
co_return when_any_result<std::vector<type>> {completed_index, std::move(vector)};
}
}; //namespace details
template<class type, class... argument_types>
result<type> make_ready_result(argument_types&&... arguments)
{
static_assert(std::is_constructible_v<type, argument_types...> || std::is_same_v<type, void>,
"concurrencpp::make_ready_result - <<type>> is not constructible from <<argument_types...>");
static_assert(std::is_same_v<type, void> ? (sizeof...(argument_types) == 0) : true,
"concurrencpp::make_ready_result<void> - this overload does not accept any argument.");
details::producer_result_state_ptr<type> promise(new details::result_state<type>());
details::consumer_result_state_ptr<type> state_ptr(promise.get()); //基于生产者的承诺构建消费者
promise->set_result(std::forward<argument_types>(arguments)...);
promise.reset(); //TODO: 为什么这样就可以 publish result
return { std::move(state_ptr) };
}
template<class type>
result<type> make_exceptional_result(std::exception_ptr exception_ptr)
{
if (!static_cast<bool>(exception_ptr))
throw std::invalid_argument("make_exception_result() - given exception_ptr is null.");
details::producer_result_state_ptr<type> promise(new details::result_state<type>());
details::consumer_result_state_ptr<type> state_ptr(promise.get());
promise->set_exception(exception_ptr);
promise.reset();
return { std::move(state_ptr) };
}
template<class type, class exception_type>
result<type> make_exceptional_result(exception_type exception)
{
return make_exceptional_result<type>(std::make_exception_ptr(exception));
}
template<class... result_types>
result<std::tuple<typename std::decay<result_types>::type...>> when_all(result_types&&... results)
{
details::when_result_helper::throw_if_empty_tuple("when_all() - one of the result objects is empty.",
std::forward<result_types>(results)...);
return details::when_all_impl(std::forward<result_types>(results)...);
}
template<class iterator_type>
result<std::vector<typename std::iterator_traits<iterator_type>::value_type>> when_all(iterator_type begin, iterator_type end)
{
details::when_result_helper::throw_if_empty_range("when_all() - one of the result objects is empty.", begin, end);
return details::when_all_impl(begin, end);
}
inline result<std::tuple<>> when_all() { return make_ready_result<std::tuple<>>(); }
template<class... result_types>
result<when_any_result<std::tuple<result_types...>>> when_any(result_types&&... results)
{
static_assert(sizeof...(result_types) != 0, "when_any() - must accept at least one result object.");
details::when_result_helper::throw_if_empty_tuple(".", std::forward<result_types>(results)...);
return details::when_any_impl(std::forward<result_types>(results)...);
}
template<class iterator_type>
result<when_any_result<std::vector<typename std::iterator_traits<iterator_type>::value_type>>>
when_any(iterator_type begin, iterator_type end)
{
details::when_result_helper::throw_if_empty_range(".", begin, end);
if (begin == end) throw std::invalid_argument("when_any() - given range contains no elements.");
return details::when_any_impl(begin, end);
}
class timer_queue;
namespace details
{
std::uintptr_t generate_thread_id() noexcept
{
static std::atomic_uintptr_t s_id_seed = 1;
//默认 memory_order_seq_cst,线程 ID 只要确保唯一即可,故用 relaxed 提高效率
return s_id_seed.fetch_add(1, std::memory_order_relaxed);
}
struct thread_per_thread_data
{
const std::uintptr_t id = generate_thread_id();
};
static thread_local thread_per_thread_data s_tl_thread_per_data; //每个线程独有的虚拟 ID 号
class thread
{
private:
std::thread m_thread;
static void set_name(std::string_view name) noexcept
{
const std::wstring utf16_name(name.begin(), name.end()); //注意转码用法
::SetThreadDescription(::GetCurrentThread(), utf16_name.data());
}
public:
thread() noexcept = default;
thread(thread&&) noexcept = default;
template<class callable_type>
thread(std::string name, callable_type&& callable)
{
m_thread = std::thread([name = std::move(name), callable = std::forward<callable_type>(callable)]() mutable {
set_name(name);
callable();
});
}
thread& operator=(thread&& rhs) noexcept = default;
std::thread::id get_id() const noexcept { return m_thread.get_id(); }
static std::uintptr_t get_current_virtual_id() noexcept { return s_tl_thread_per_data.id; }
bool joinable() const noexcept { return m_thread.joinable(); }
void join() { m_thread.join(); }
static size_t hardware_concurrency() noexcept
{
const auto hc = std::thread::hardware_concurrency();
return (hc != 0) ? hc : 8; //consts::k_default_number_of_cores:默认八核
}
};
size_t default_max_cpu_workers() noexcept
{
return static_cast<size_t>(thread::hardware_concurrency() * 1); //consts::k_cpu_threadpool_worker_count_factor
}
size_t default_max_background_workers() noexcept
{
return static_cast<size_t>(thread::hardware_concurrency() * 4); //consts::k_background_threadpool_worker_count_factor
}
class timer_state_base : public std::enable_shared_from_this<timer_state_base>
{
public:
using clock_type = std::chrono::high_resolution_clock;
using time_point = std::chrono::time_point<clock_type>;
using milliseconds = std::chrono::milliseconds;
private:
const std::weak_ptr<timer_queue> m_timer_queue; //计时器队列,不拥有
const std::shared_ptr<executor> m_executor; //执行者
const size_t m_due_time;
std::atomic_size_t m_frequency;
time_point m_deadline; //终止点,由构造函数设定,只在计时器队列线程里改变
std::atomic_bool m_cancelled;
const bool m_is_oneshot;
static time_point make_deadline(milliseconds diff) noexcept { return clock_type::now() + diff; }
public:
timer_state_base(size_t due_time, size_t frequency, std::shared_ptr<executor> executor, std::weak_ptr<timer_queue> timer_queue,
bool is_oneshot) noexcept : m_timer_queue(std::move(timer_queue)), m_executor(std::move(executor)), m_due_time(due_time),
m_frequency(frequency), m_deadline(make_deadline(milliseconds(due_time))), m_cancelled(false), m_is_oneshot(is_oneshot)
{
assert(static_cast<bool>(m_executor));
}
virtual ~timer_state_base() noexcept = default;
virtual void execute() = 0;
void fire() //开始触发
{
const auto frequency = m_frequency.load(std::memory_order_relaxed);
m_deadline = make_deadline(milliseconds(frequency));
assert(static_cast<bool>(m_executor));
m_executor->post([self = shared_from_this()]() mutable { self->execute(); });
}
bool expired(const time_point now) const noexcept { return m_deadline <= now; }
time_point get_deadline() const noexcept { return m_deadline; }
size_t get_frequency() const noexcept { return m_frequency.load(std::memory_order_relaxed); }
size_t get_due_time() const noexcept { return m_due_time; }
bool is_oneshot() const noexcept { return m_is_oneshot; }
std::shared_ptr<executor> get_executor() const noexcept { return m_executor; }
std::weak_ptr<timer_queue> get_timer_queue() const noexcept { return m_timer_queue; }
void set_new_frequency(size_t new_frequency) noexcept { m_frequency.store(new_frequency, std::memory_order_relaxed); }
void cancel() noexcept { m_cancelled.store(true, std::memory_order_relaxed); }
bool cancelled() const noexcept { return m_cancelled.load(std::memory_order_relaxed); }
}; //class timer_state_base
template<class callable_type>
class timer_state final : public timer_state_base
{
private:
callable_type m_callable;
public:
template<class given_callable_type>
timer_state(size_t due_time, size_t frequency, std::shared_ptr<executor> executor,
std::weak_ptr<timer_queue> timer_queue, bool is_oneshot, given_callable_type&& callable) :
timer_state_base(due_time, frequency, std::move(executor), std::move(timer_queue), is_oneshot),
m_callable(std::forward<given_callable_type>(callable))
{
}
void execute() override //没有取消,就执行
{
if (cancelled()) return;
m_callable();
}
}; //class timer_state
enum class timer_request { add, remove };
} //namespace details
class timer;
class timer_queue : public std::enable_shared_from_this<timer_queue>
{
public:
using timer_ptr = std::shared_ptr<details::timer_state_base>; //计时任务指针
using clock_type = std::chrono::high_resolution_clock;
using time_point = std::chrono::time_point<std::chrono::high_resolution_clock>;
using request_queue = std::vector<std::pair<timer_ptr, details::timer_request>>; //计时任务增删队列
friend class timer;
private:
std::atomic_bool m_atomic_abort;
std::mutex m_lock;
request_queue m_request_queue;
details::thread m_worker;
std::condition_variable m_condition;
bool m_abort;
bool m_idle;
const std::chrono::milliseconds m_max_waiting_time;
details::thread ensure_worker_thread(std::unique_lock<std::mutex>& lock)
{
assert(lock.owns_lock());
if (!m_idle) return {};
auto old_worker = std::move(m_worker);
m_worker = details::thread("timer_queue worker", [this] { work_loop(); }); //开新的工作循环线程
m_idle = false;
return old_worker; //返回原有线程
}
void add_timer(std::unique_lock<std::mutex>& lock, timer_ptr new_timer)
{
assert(lock.owns_lock());
m_request_queue.emplace_back(std::move(new_timer), details::timer_request::add);
lock.unlock();
m_condition.notify_one();
}
void remove_timer(timer_ptr existing_timer)
{
{
std::unique_lock<decltype(m_lock)> lock(m_lock);
m_request_queue.emplace_back(std::move(existing_timer), details::timer_request::remove);
}
m_condition.notify_one();
}
template<class callable_type>
timer_ptr make_timer_impl(size_t due_time, size_t frequency, std::shared_ptr<executor> executor,
bool is_oneshot, callable_type&& callable)
{
assert(static_cast<bool>(executor));
using decayed_type = typename std::decay_t<callable_type>;
auto timer_state = std::make_shared<details::timer_state<decayed_type>>(due_time, frequency,
std::move(executor), weak_from_this(), is_oneshot, std::forward<callable_type>(callable)); //新建计时任务
std::unique_lock<std::mutex> lock(m_lock);
if (m_abort) throw errors::runtime_shutdown("timer_queue has been shut down.");
auto old_thread = ensure_worker_thread(lock); //确保启动工作线程
add_timer(lock, timer_state); //添加计时任务
if (old_thread.joinable()) old_thread.join(); //等待原有线程结束
return timer_state;
}
struct deadline_comparator
{
bool operator()(const timer_queue::timer_ptr& a, const timer_queue::timer_ptr& b) const noexcept
{
return a->get_deadline() < b->get_deadline();
}
};
class timer_queue_internal
{
using timer_set = std::multiset<timer_queue::timer_ptr, deadline_comparator>; //允许重复,按序排列
using timer_set_iterator = typename timer_set::iterator;
using iterator_map = std::unordered_map<timer_queue::timer_ptr, timer_set_iterator>; //每个计时任务对应一个迭代器
private:
timer_set m_timers;
iterator_map m_iterator_mapper;
void add_timer_internal(timer_queue::timer_ptr new_timer)
{
assert(m_iterator_mapper.find(new_timer) == m_iterator_mapper.end());
auto timer_it = m_timers.emplace(new_timer); //加入集合
m_iterator_mapper.emplace(std::move(new_timer), timer_it); //关联迭代器
}
void remove_timer_internal(timer_queue::timer_ptr existing_timer)
{
auto timer_it = m_iterator_mapper.find(existing_timer);
if (timer_it == m_iterator_mapper.end()) {
assert(existing_timer->is_oneshot() || existing_timer->cancelled()); //计时任务触发时已从队列删除
return;
}
auto set_iterator = timer_it->second;
m_timers.erase(set_iterator);
m_iterator_mapper.erase(timer_it);
}
void process_request_queue(timer_queue::request_queue& queue) //遍历请求队列,完成增删操作
{
for (auto& request : queue) {
auto& timer_ptr = request.first;
const auto opt = request.second;
if (opt == details::timer_request::add)
add_timer_internal(std::move(timer_ptr));
else
remove_timer_internal(std::move(timer_ptr));
}
}
void reset_containers_memory() noexcept
{
assert(empty());
timer_set timers;
std::swap(m_timers, timers); //交换为空集合、映射,释放内存
iterator_map iterator_mapper;
std::swap(m_iterator_mapper, iterator_mapper);
}
public:
bool empty() const noexcept
{
assert(m_iterator_mapper.size() == m_timers.size());
return m_timers.empty();
}
timer_queue::time_point process_timers(timer_queue::request_queue& queue)
{
process_request_queue(queue);
const auto now = std::chrono::high_resolution_clock::now();
while (true) {
if (m_timers.empty()) break;
timer_set temp_set;
auto first_timer_it = m_timers.begin(); //最近的触发点
auto timer_ptr = *first_timer_it;
const auto is_oneshot = timer_ptr->is_oneshot();
if (!timer_ptr->expired(now)) break; //还没到点,无需处理
auto timer_node = m_timers.extract(first_timer_it); //从集合中拿出来,便于接下来修改
auto temp_it = temp_set.insert(std::move(timer_node)); //标准规定不能直接用裸节点句柄,所以放进临时集合
const auto cancelled = timer_ptr->cancelled();
if (!cancelled) (*temp_it)->fire(); //若未取消,启动触发
if (is_oneshot || cancelled) { m_iterator_mapper.erase(timer_ptr); continue; } //该删就删除映射,临时集合会自动删除
timer_node = temp_set.extract(temp_it); //否则重新解出来,放进原来的集合
auto new_it = m_timers.insert(std::move(timer_node));
assert(m_iterator_mapper.find(timer_ptr) != m_iterator_mapper.end()); //等 Clang 支持 unordered_map::contains
m_iterator_mapper[timer_ptr] = new_it; //更新映射对应迭代器
}
if (m_timers.empty()) { reset_containers_memory(); return now + std::chrono::hours(24); } //清空,返回24小时后
return (**m_timers.begin()).get_deadline(); //否则返回最近的触发时间点
}
}; //class timer_queue_internal
void work_loop() noexcept
{
time_point next_deadline;
timer_queue_internal internal_state;
while (true) {
std::unique_lock<decltype(m_lock)> lock(m_lock);
if (internal_state.empty()) { //为空则等待新请求
const auto res = m_condition.wait_for(lock, m_max_waiting_time, [this] { return !m_request_queue.empty() || m_abort; });
if (!res) { m_idle = true; lock.unlock(); return; } //若超时,置为空闲,解锁返回
} else //不空,等到下一个触发点
m_condition.wait_until(lock, next_deadline, [this] { return !m_request_queue.empty() || m_abort; });
if (m_abort) return;
auto request_queue = std::move(m_request_queue);
lock.unlock();
next_deadline = internal_state.process_timers(request_queue); //处理请求
const auto now = clock_type::now();
if (next_deadline <= now) continue; //貌似这一行有没有都无所谓啊
}
}
public:
timer_queue(std::chrono::milliseconds max_waiting_time) noexcept :
m_atomic_abort(false), m_abort(false), m_idle(true), m_max_waiting_time(max_waiting_time)
{
}
~timer_queue() noexcept { shutdown(); assert(!m_worker.joinable()); }
void shutdown() noexcept
{
const auto state_before = m_atomic_abort.exchange(true, std::memory_order_relaxed);
if (state_before) return; //已经关闭过了
std::unique_lock<decltype(m_lock)> lock(m_lock);
m_abort = true;
if (!m_worker.joinable()) return; //没啥要关闭的
m_request_queue.clear();
lock.unlock();
m_condition.notify_all();
m_worker.join();
}
bool shutdown_requested() const noexcept { return m_atomic_abort.load(std::memory_order_relaxed); }
template<class callable_type, class... argumet_types>
timer make_timer(std::chrono::milliseconds due_time, std::chrono::milliseconds frequency,
std::shared_ptr<executor> executor, callable_type&& callable, argumet_types&&... arguments)
{
if (!static_cast<bool>(executor))
throw std::invalid_argument("timer_queue::make_timer() - executor is null.");
return make_timer_impl(due_time.count(), frequency.count(), std::move(executor), false,
details::bind(std::forward<callable_type>(callable), std::forward<argumet_types>(arguments)...));
}
template<class callable_type, class... argumet_types>
timer make_one_shot_timer(std::chrono::milliseconds due_time, std::shared_ptr<executor> executor,
callable_type&& callable, argumet_types&&... arguments)
{
if (!static_cast<bool>(executor))
throw std::invalid_argument("timer_queue::make_one_shot_timer() - executor is null.");
return make_timer_impl(due_time.count(), 0, std::move(executor), true,
details::bind(std::forward<callable_type>(callable), std::forward<argumet_types>(arguments)...));
}
result<void> make_delay_object(std::chrono::milliseconds due_time, std::shared_ptr<executor> executor)
{
if (!static_cast<bool>(executor)) throw std::invalid_argument("timer_queue::make_delay_object() - executor is null.");
result_promise<void> promise;
auto task = promise.get_result();
make_timer_impl(due_time.count(), 0, std::move(executor), true, [tcs = std::move(promise)]() mutable { tcs.set_result(); });
return task; //延时执行任务
}
std::chrono::milliseconds max_worker_idle_time() const noexcept { return m_max_waiting_time; }
}; //class timer_queue
class timer
{
private:
std::shared_ptr<details::timer_state_base> m_state;
void throw_if_empty(const char* error_message) const
{
if (static_cast<bool>(m_state)) return;
throw errors::empty_timer(error_message);
}
public:
timer() noexcept = default;
~timer() noexcept { cancel(); }
timer(std::shared_ptr<details::timer_state_base> timer_impl) noexcept : m_state(std::move(timer_impl)) {}
timer(timer&& rhs) noexcept = default;
timer& operator=(timer&& rhs) noexcept
{
if (this == &rhs) return *this;
if (static_cast<bool>(*this)) cancel();
m_state = std::move(rhs.m_state);
return *this;
}
timer(const timer&) = delete;
timer& operator=(const timer&) = delete;
void cancel()
{
if (!static_cast<bool>(m_state)) return;
auto state = std::move(m_state);
state->cancel();
auto timer_queue = state->get_timer_queue().lock(); //弱指针转换成共享指针
if (!static_cast<bool>(timer_queue)) return;
timer_queue->remove_timer(std::move(state)); //从队列中删除
}
std::chrono::milliseconds get_due_time() const
{
throw_if_empty("timer::get_due_time() - timer is empty.");
return std::chrono::milliseconds(m_state->get_due_time());
}
std::shared_ptr<executor> get_executor() const
{
throw_if_empty("timer::get_executor() - timer is empty.");
return m_state->get_executor();
}
std::weak_ptr<timer_queue> get_timer_queue() const
{
throw_if_empty("timer::get_timer_queue() - timer is empty.");
return m_state->get_timer_queue();
}
std::chrono::milliseconds get_frequency() const
{
throw_if_empty("timer::get_frequency() - timer is empty.");
return std::chrono::milliseconds(m_state->get_frequency());
}
void set_frequency(std::chrono::milliseconds new_frequency)
{
throw_if_empty("timer::set_frequency() - timer is empty.");
return m_state->set_new_frequency(new_frequency.count());
}
explicit operator bool() const noexcept { return static_cast<bool>(m_state); }
}; //class timer
class inline_executor final : public executor //任务立即顺序执行,无并行
{
private:
std::atomic_bool m_abort;
void throw_if_aborted() const
{
if (m_abort.load(std::memory_order_relaxed)) details::throw_runtime_shutdown_exception(name);
}
public:
inline_executor() noexcept : executor("inline_executor"), m_abort(false) {}
void enqueue(task task) override { throw_if_aborted(); task(); }
void enqueue(std::span<task> tasks) override { throw_if_aborted(); for (auto& task : tasks) task(); }
int max_concurrency_level() const noexcept override { return 0; } //details::consts::k_inline_executor_max_concurrency_level
void shutdown() noexcept override { m_abort.store(true, std::memory_order_relaxed); }
bool shutdown_requested() const noexcept override { return m_abort.load(std::memory_order_relaxed); }
}; //class inline_executor
template<class concrete_executor_type>
class derivable_executor : public executor //使用 CRTP 技术提升性能,便于编译器优化。此类中用 self() 获得子类指针
{
private:
concrete_executor_type& self() noexcept { return *static_cast<concrete_executor_type*>(this); }
public:
derivable_executor(std::string_view name) : executor(name) {}
template<class callable_type, class... argument_types>
void post(callable_type&& callable, argument_types&&... arguments)
{
return do_post(self(), std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
}
template<class callable_type, class... argument_types>
auto submit(callable_type&& callable, argument_types&&... arguments)
{
return do_submit(self(), std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
}
template<class callable_type>
void bulk_post(std::span<callable_type> callable_list) { return do_bulk_post(self(), callable_list); }
template<class callable_type, class return_type = std::invoke_result_t<callable_type>>
std::vector<result<return_type>> bulk_submit(std::span<callable_type> callable_list)
{
return do_bulk_submit(self(), callable_list);
}
}; //class derivable_executor
class thread_pool_executor;
namespace details
{
class thread_pool_worker;
struct thread_pool_per_thread_data
{
thread_pool_worker* this_worker;
size_t this_thread_index;
const size_t this_thread_hashed_id;
thread_pool_per_thread_data() noexcept : this_worker(nullptr),
this_thread_index(static_cast<size_t>(-1)), this_thread_hashed_id(calculate_hashed_id())
{
}
static size_t calculate_hashed_id() noexcept
{
const auto this_thread_id = thread::get_current_virtual_id();
std::hash<size_t> hash;
return hash(this_thread_id);
}
};
static thread_local thread_pool_per_thread_data s_tl_thread_pool_data;
class idle_worker_set
{
enum class status { active, idle };
struct alignas(64) padded_flag { std::atomic<status> flag{ status::active }; }; //默认活动状态
private:
std::atomic_intptr_t m_approx_size; //空闲数量
const std::unique_ptr<padded_flag[]> m_idle_flags; //数组大小等于 size,在构造时确保
const size_t m_size;
bool try_acquire_flag(size_t index) noexcept
{
const auto worker_status = m_idle_flags[index].flag.load(std::memory_order_relaxed);
if (worker_status == status::active) return false;
const auto before = m_idle_flags[index].flag.exchange(status::active, std::memory_order_relaxed);
const auto swapped = (before == status::idle); //有可能在上一个 return 之后转为 active,所以还得判断一下
if (swapped) m_approx_size.fetch_sub(1, std::memory_order_relaxed);
return swapped;
}
public:
idle_worker_set(size_t size) : m_approx_size(0), m_idle_flags(std::make_unique<padded_flag[]>(size)), m_size(size) {}
void set_idle(size_t idle_thread) noexcept
{
const auto before = m_idle_flags[idle_thread].flag.exchange(status::idle, std::memory_order_relaxed);
if (before == status::idle) return;
m_approx_size.fetch_add(1, std::memory_order_release);
}
void set_active(size_t idle_thread) noexcept
{
const auto before = m_idle_flags[idle_thread].flag.exchange(status::active, std::memory_order_relaxed);
if (before == status::active) return;
m_approx_size.fetch_sub(1, std::memory_order_release);
}
size_t find_idle_worker(size_t caller_index) noexcept
{
if (m_approx_size.load(std::memory_order_relaxed) <= 0) return static_cast<size_t>(-1);
const auto starting_pos = (caller_index != static_cast<size_t>(-1)) ? caller_index
: (s_tl_thread_pool_data.this_thread_hashed_id % m_size); //根据哈希值计算一个特定的位置
for (size_t i = 0; i < m_size; i++) {
const auto index = (starting_pos + i) % m_size;
if (index == caller_index) continue;
if (try_acquire_flag(index)) return index; //找到一个空闲的就返回
}
return static_cast<size_t>(-1);
}
void find_idle_workers(size_t caller_index, std::vector<size_t>& result_buffer, size_t max_count) noexcept
{
assert(result_buffer.capacity() >= max_count); //后面使用的时候,已经确保了它一定是空的
const auto approx_size = m_approx_size.load(std::memory_order_relaxed);
if (approx_size <= 0) return;
assert(caller_index >= 0 && caller_index < m_size);
assert(caller_index == s_tl_thread_pool_data.this_thread_index); //确保调用者跟自己在同一个线程
size_t count = 0;
#undef min
const auto max_waiters = std::min(static_cast<size_t>(approx_size), max_count); //最多可用数
for (size_t i = 0; (i < m_size) && (count < max_waiters); i++) {
const auto index = (caller_index + i) % m_size;
if (index == caller_index) continue; //其实就排除第一个吧
if (try_acquire_flag(index)) { result_buffer.emplace_back(index); ++count; }
}
}
}; //class idle_worker_set
class executor_collection
{
private:
std::mutex m_lock;
std::vector<std::shared_ptr<executor>> m_executors;
public:
void register_executor(std::shared_ptr<executor> executor)
{
assert(static_cast<bool>(executor));
std::unique_lock<decltype(m_lock)> lock(m_lock); //类型已知,可以不用 decltype
assert(std::find(m_executors.begin(), m_executors.end(), executor) == m_executors.end());
m_executors.emplace_back(std::move(executor));
}
void shutdown_all() noexcept
{
std::unique_lock<decltype(m_lock)> lock(m_lock);
for (auto& executor : m_executors) { assert(static_cast<bool>(executor)); executor->shutdown(); }
m_executors = {}; //共享指针会自动销毁
}
}; //class executor_collection
class alignas(64) thread_pool_worker
{
private:
std::deque<task> m_private_queue; //本地任务队列
std::vector<size_t> m_idle_worker_list; //池中空闲工人索引列表
std::atomic_bool m_atomic_abort;
thread_pool_executor& m_parent_pool;
const size_t m_index; //当前工人在线程池中的序号
const size_t m_pool_size; //线程池大小
const std::chrono::milliseconds m_max_idle_time;
const std::string m_worker_name; //线程池执行者名称 + worker
alignas(64) std::mutex m_lock;
std::deque<task> m_public_queue; //外来任务队列
std::binary_semaphore m_semaphore; //二元信号量,0或1,指示有任务到来
bool m_idle; //默认 true,修改此状态变量需要加锁
bool m_abort; //修改也需要加锁
std::atomic_bool m_event_found; //任务已接收
thread m_thread;
void balance_work();
bool wait_for_task(std::unique_lock<std::mutex>& lock) noexcept;
bool drain_queue_impl()
{
auto aborted = false;
while (!m_private_queue.empty()) { //只要本地还有任务
balance_work(); //先做平衡
if (m_atomic_abort.load(std::memory_order_relaxed)) { aborted = true; break; }
assert(!m_private_queue.empty()); //本地至少还会留一项任务
auto task = std::move(m_private_queue.back()); //最后一项拿出来执行
m_private_queue.pop_back();
task();
}
if (aborted) {
std::unique_lock<std::mutex> lock(m_lock);
m_idle = true;
return false;
}
return true;
}
bool drain_queue() //等外来任务,放进私有队列后依次执行
{
std::unique_lock<std::mutex> lock(m_lock);
if (!wait_for_task(lock)) return false; //没等到任务,或者要退出了
assert(lock.owns_lock());
assert(!m_public_queue.empty() || m_abort);
m_event_found.store(false, std::memory_order_relaxed);
if (m_abort) { m_idle = true; return false; } //如果退出,置己为空闲
assert(m_private_queue.empty()); //私有任务一定是空的,要不然不会抽干队列
std::swap(m_private_queue, m_public_queue); //直接交换,效率最高
lock.unlock();
return drain_queue_impl();
}
void work_loop() noexcept //填充线程变量,启动执行抽干
{
s_tl_thread_pool_data.this_worker = this;
s_tl_thread_pool_data.this_thread_index = m_index;
while (true) { if (!drain_queue()) return; }
}
void ensure_worker_active(bool first_enqueuer, std::unique_lock<std::mutex>& lock)
{
assert(lock.owns_lock());
if (!m_idle) { //已在活动中,解锁,若首次入队则释放信号量,随后返回
lock.unlock();
if (first_enqueuer) m_semaphore.release();
return;
}
auto stale_worker = std::move(m_thread); //原有线程腾出来
m_thread = thread(m_worker_name, [this] { work_loop(); }); //启动新线程
m_idle = false;
lock.unlock();
if (stale_worker.joinable()) stale_worker.join(); //等待原有线程结束
}
public:
thread_pool_worker(thread_pool_executor& parent_pool, size_t index, size_t pool_size, std::chrono::milliseconds max_idle_time);
thread_pool_worker(thread_pool_worker&& rhs) noexcept :
m_parent_pool(rhs.m_parent_pool), m_index(rhs.m_index), m_pool_size(rhs.m_pool_size), m_max_idle_time(rhs.m_max_idle_time),
m_semaphore(0), m_idle(true), m_abort(true)
{
std::abort();
} //为什么不直接用删除声明?
~thread_pool_worker() noexcept { assert(m_idle); assert(!m_thread.joinable()); } //仅当空闲及线程非活动时
void enqueue_foreign(task& task);
void enqueue_foreign(std::span<task> tasks);
void enqueue_foreign(std::deque<task>::iterator begin, std::deque<task>::iterator end);
void enqueue_foreign(std::span<task>::iterator begin, std::span<task>::iterator end);
void enqueue_local(task& task);
void enqueue_local(std::span<task> tasks);
void shutdown() noexcept
{
assert(m_atomic_abort.load(std::memory_order_relaxed) == false);
m_atomic_abort.store(true, std::memory_order_relaxed);
{
std::unique_lock<std::mutex> lock(m_lock);
m_abort = true;
}
m_event_found.store(true, std::memory_order_release); //确保保存完毕后,再通知到工人
m_semaphore.release();
if (m_thread.joinable()) m_thread.join();
decltype(m_public_queue) public_queue; //临时导出的两个变量,以缩短因队列变化导致加锁的时间
decltype(m_private_queue) private_queue;
{
std::unique_lock<std::mutex> lock(m_lock); //只对 move 加锁,减少资源占用
public_queue = std::move(m_public_queue);
private_queue = std::move(m_private_queue);
}
public_queue.clear();
private_queue.clear();
}
std::chrono::milliseconds max_worker_idle_time() const noexcept { return m_max_idle_time; }
bool appears_empty() const noexcept
{
return m_private_queue.empty() && !m_event_found.load(std::memory_order_relaxed);
}
}; //class thread_pool_worker
} //namespace details
//适合非阻塞的短小任务,会自动进行线程注入和负载平衡。除默认类外,背景类适合那些小阻塞任务,如数据库和文件读写
class alignas(64) thread_pool_executor final : public derivable_executor<thread_pool_executor>
{
friend class details::thread_pool_worker;
private:
std::vector<details::thread_pool_worker> m_workers;
alignas(64) std::atomic_size_t m_round_robin_cursor; //轮询调度游标
alignas(64) details::idle_worker_set m_idle_workers;
alignas(64) std::atomic_bool m_abort;
void mark_worker_idle(size_t index) noexcept { assert(index < m_workers.size()); m_idle_workers.set_idle(index); }
void mark_worker_active(size_t index) noexcept { assert(index < m_workers.size()); m_idle_workers.set_active(index); }
details::thread_pool_worker& worker_at(size_t index) noexcept { assert(index <= m_workers.size()); return m_workers[index]; }
void find_idle_workers(size_t caller_index, std::vector<size_t>& buffer, size_t max_count) noexcept
{
m_idle_workers.find_idle_workers(caller_index, buffer, max_count); //找到的索引放到 buffer
}
public:
thread_pool_executor(std::string_view pool_name, size_t pool_size, std::chrono::milliseconds max_idle_time) :
derivable_executor<thread_pool_executor>(pool_name), m_round_robin_cursor(0), m_idle_workers(pool_size), m_abort(false)
{
m_workers.reserve(pool_size); //按线程池大小构建工人,分配好序号,再全部置为空闲
for (size_t i = 0; i < pool_size; i++) m_workers.emplace_back(*this, i, pool_size, max_idle_time);
for (size_t i = 0; i < pool_size; i++) m_idle_workers.set_idle(i);
}
void enqueue(task task) override
{
const auto this_worker = details::s_tl_thread_pool_data.this_worker; //工人进入工作循环后,这两个被赋值
const auto this_worker_index = details::s_tl_thread_pool_data.this_thread_index;
if (this_worker != nullptr && this_worker->appears_empty()) //本线程有工人且闲置,交给它
return this_worker->enqueue_local(task);
const auto idle_worker_pos = m_idle_workers.find_idle_worker(this_worker_index); //否则找一个闲置工人
if (idle_worker_pos != static_cast<size_t>(-1))
return m_workers[idle_worker_pos].enqueue_foreign(task); //交给找到的闲置工人
if (this_worker != nullptr) //所有工人都在忙,但本线程有工人,仍然交给它
return this_worker->enqueue_local(task);
const auto next_worker = m_round_robin_cursor.fetch_add(1, std::memory_order_relaxed) % m_workers.size();
m_workers[next_worker].enqueue_foreign(task); //本线程无工人,轮询调度下一个工人,交给它
}
void enqueue(std::span<task> tasks) override
{
if (details::s_tl_thread_pool_data.this_worker != nullptr) //本线程有工人,全交给它入队
return details::s_tl_thread_pool_data.this_worker->enqueue_local(tasks);
//任务数量少于工人数量,就去找闲置工人,或者轮询入队
if (tasks.size() < m_workers.size()) { for (auto& task : tasks) enqueue(std::move(task)); return; }
const auto task_count = tasks.size();
const auto total_worker_count = m_workers.size();
const auto donation_count = task_count / total_worker_count; //每个工人至少几项任务
auto extra = task_count - donation_count * total_worker_count; //没有平均分完的还有几项
size_t begin = 0;
size_t end = donation_count;
for (size_t i = 0; i < total_worker_count; i++) {
assert(begin < task_count);
if (extra != 0) { end++; extra--; } //还有多余任务,当前工人多分派一项
assert(end <= task_count);
auto tasks_begin_it = tasks.begin() + begin;
auto tasks_end_it = tasks.begin() + end;
assert(tasks_begin_it < tasks.end());
assert(tasks_end_it <= tasks.end());
m_workers[i].enqueue_foreign(tasks_begin_it, tasks_end_it); //划好分片交给当前工人
begin = end;
end += donation_count;
}
}
bool shutdown_requested() const noexcept override { return m_abort.load(std::memory_order_relaxed); }
void shutdown() noexcept override
{
const auto abort = m_abort.exchange(true, std::memory_order_relaxed);
if (abort) return; //已经关闭过了,直接返回。否则遍历所有工人逐一关闭
for (auto& worker : m_workers) worker.shutdown();
}
int max_concurrency_level() const noexcept override { return static_cast<int>(m_workers.size()); }
std::chrono::milliseconds max_worker_idle_time() const noexcept { return m_workers[0].max_worker_idle_time(); }
}; //class thread_pool_executor
namespace details
{
thread_pool_worker::thread_pool_worker(thread_pool_executor& parent_pool, size_t index, size_t pool_size, std::chrono::milliseconds max_idle_time) :
m_atomic_abort(false), m_parent_pool(parent_pool), m_index(index), m_pool_size(pool_size), m_max_idle_time(max_idle_time),
m_worker_name(details::make_executor_worker_name(parent_pool.name)), m_semaphore(0), m_idle(true), m_abort(false), m_event_found(false)
{
m_idle_worker_list.reserve(pool_size);
}
void thread_pool_worker::balance_work()
{
const auto task_count = m_private_queue.size();
if (task_count < 2) return; //最多就一项任务,没必要平衡
const auto max_idle_worker_count = std::min(m_pool_size - 1, task_count - 1); //假定其他线程都空闲,给自己也留一项任务
if (max_idle_worker_count == 0) return; //单线程
m_parent_pool.find_idle_workers(m_index, m_idle_worker_list, max_idle_worker_count); //空闲列表只在此函数内使用
const auto idle_count = m_idle_worker_list.size();
if (idle_count == 0) return;
assert(idle_count <= task_count); //TODO:这个断言怎么来的?
const auto total_worker_count = (idle_count + 1); //工人总数,包括自己。不然所有任务都会捐出去
const auto donation_count = task_count / total_worker_count; //每个工人平均任务数
auto extra = task_count - donation_count * total_worker_count; //剩下的余数
size_t begin = 0;
size_t end = donation_count;
for (const auto idle_worker_index : m_idle_worker_list) { //这里跟 executor 一样的分配策略
assert(idle_worker_index != m_index);
assert(idle_worker_index < m_pool_size);
assert(begin < task_count);
if (extra != 0) { end++; extra--; }
assert(end <= task_count);
auto donation_begin_it = m_private_queue.begin() + begin;
auto donation_end_it = m_private_queue.begin() + end;
assert(donation_begin_it < m_private_queue.end());
assert(donation_end_it <= m_private_queue.end());
m_parent_pool.worker_at(idle_worker_index).enqueue_foreign(donation_begin_it, donation_end_it);
begin = end;
end += donation_count;
}
assert(m_private_queue.size() == task_count); //TODO:此断言的必要性是?
//一直到 +begin 是已经捐出去的部分,从私有队列中删除。剩下的是留给自己的任务
assert(std::all_of(m_private_queue.begin(), m_private_queue.begin() + begin, [](auto& task) { return !static_cast<bool>(task); }));
assert(std::all_of(m_private_queue.begin() + begin, m_private_queue.end(), [](auto& task) { return static_cast<bool>(task); }));
m_private_queue.erase(m_private_queue.begin(), m_private_queue.begin() + begin);
assert(!m_private_queue.empty());
m_idle_worker_list.clear(); //清空空闲列表,下次查找时一定是空的
}
bool thread_pool_worker::wait_for_task(std::unique_lock<std::mutex>& lock) noexcept
{
assert(lock.owns_lock()); //此函数只在抽干队列时调用,预先已加锁
if (!m_public_queue.empty() || m_abort) return true; //有外来任务或者中断了
lock.unlock(); //先解锁,外来任务队列和退出变量暂时不用了
m_parent_pool.mark_worker_idle(m_index);
auto event_found = false;
const auto deadline = std::chrono::steady_clock::now() + m_max_idle_time;
while (true) {
if (!m_semaphore.try_acquire_until(deadline)) { //没等到 release,时间没到就继续等,否则结束等待
if (std::chrono::steady_clock::now() <= deadline) continue;
else break;
}
if (!m_event_found.load(std::memory_order_relaxed)) continue; //没发现事件,继续等
lock.lock(); //要访问外来队列和退出标志了,加锁
if (m_public_queue.empty() && !m_abort) { lock.unlock(); continue; } //其他任务是空的,也没中断,就继续等
event_found = true;
break;
}
if (!lock.owns_lock()) lock.lock();
if (!event_found || m_abort) { m_idle = true; lock.unlock(); return false; } //没发现事件,或者中断了
assert(!m_public_queue.empty());
m_parent_pool.mark_worker_active(m_index);
return true;
}
void thread_pool_worker::enqueue_foreign(task& task)
{
std::unique_lock<std::mutex> lock(m_lock);
if (m_abort) throw_runtime_shutdown_exception(m_parent_pool.name);
m_event_found.store(true, std::memory_order_relaxed); //其他工人派任务来了
const auto is_empty = m_public_queue.empty();
m_public_queue.emplace_back(std::move(task)); //任务压到公用队列
ensure_worker_active(is_empty, lock); //首参数为是否队列中第一项任务
}
void thread_pool_worker::enqueue_foreign(std::span<task> tasks)
{
std::unique_lock<std::mutex> lock(m_lock);
if (m_abort) throw_runtime_shutdown_exception(m_parent_pool.name);
m_event_found.store(true, std::memory_order_relaxed);
const auto is_empty = m_public_queue.empty();
m_public_queue.insert(m_public_queue.end(), std::make_move_iterator(tasks.begin()), std::make_move_iterator(tasks.end()));
ensure_worker_active(is_empty, lock); //注意上面的 make_move_iterator,高效地完成批量移动式插入
}
void thread_pool_worker::enqueue_foreign(std::deque<task>::iterator begin, std::deque<task>::iterator end)
{
std::unique_lock<std::mutex> lock(m_lock);
if (m_abort) throw_runtime_shutdown_exception(m_parent_pool.name);
m_event_found.store(true, std::memory_order_relaxed);
const auto is_empty = m_public_queue.empty();
m_public_queue.insert(m_public_queue.end(), std::make_move_iterator(begin), std::make_move_iterator(end));
ensure_worker_active(is_empty, lock);
}
void thread_pool_worker::enqueue_foreign(std::span<task>::iterator begin, std::span<task>::iterator end)
{
std::unique_lock<std::mutex> lock(m_lock);
if (m_abort) throw_runtime_shutdown_exception(m_parent_pool.name);
m_event_found.store(true, std::memory_order_relaxed);
const auto is_empty = m_public_queue.empty();
m_public_queue.insert(m_public_queue.end(), std::make_move_iterator(begin), std::make_move_iterator(end));
ensure_worker_active(is_empty, lock);
}
void thread_pool_worker::enqueue_local(task& task) //自己给自己派的任务
{
if (m_atomic_abort.load(std::memory_order_relaxed)) throw_runtime_shutdown_exception(m_parent_pool.name);
m_private_queue.emplace_back(std::move(task));
}
void thread_pool_worker::enqueue_local(std::span<task> tasks)
{
if (m_atomic_abort.load(std::memory_order_relaxed)) throw_runtime_shutdown_exception(m_parent_pool.name);
m_private_queue.insert(m_private_queue.end(), std::make_move_iterator(tasks.begin()), std::make_move_iterator(tasks.end()));
}
} //namepsace details
//每个任务开一个线程,线程不可重用。适合长期运行的任务,比如工作循环,或者长时间阻塞任务
class alignas(64) thread_executor final : public derivable_executor<thread_executor>
{
private:
std::mutex m_lock;
std::list<details::thread> m_workers;
std::condition_variable m_condition;
std::list<details::thread> m_last_retired;
bool m_abort;
std::atomic_bool m_atomic_abort;
void enqueue_impl(std::unique_lock<std::mutex>& lock, task& task)
{
assert(lock.owns_lock()); //enqueue 一定会先加锁
auto& new_thread = m_workers.emplace_front(); //先从前面入列(可通过 begin 访问),下面再赋值启动
new_thread = details::thread(details::make_executor_worker_name(name),
[this, self_it = m_workers.begin(), task = std::move(task)]() mutable { task(); retire_worker(self_it); });
}
void retire_worker(std::list<details::thread>::iterator it) //it 指向的就是新入列的任务
{
std::unique_lock<decltype(m_lock)> lock(m_lock);
auto last_retired = std::move(m_last_retired); //全部移到临时列表,应该也至多只有一个
m_last_retired.splice(m_last_retired.begin(), m_workers, it); //新来的任务放进队列
lock.unlock();
m_condition.notify_one(); //workers 少了一个,所以要通知一次
if (last_retired.empty()) return;
assert(last_retired.size() == 1);
last_retired.front().join(); //原来的任务 join
}
public:
thread_executor() : derivable_executor<thread_executor>("thread_executor"), m_abort(false), m_atomic_abort(false) {}
~thread_executor() noexcept { assert(m_workers.empty()); assert(m_last_retired.empty()); }
void enqueue(task task) override
{
std::unique_lock<decltype(m_lock)> lock(m_lock);
if (m_abort) details::throw_runtime_shutdown_exception(name);
enqueue_impl(lock, task);
}
void enqueue(std::span<task> tasks) override
{
std::unique_lock<decltype(m_lock)> lock(m_lock);
if (m_abort) details::throw_runtime_shutdown_exception(name);
for (auto& task : tasks) enqueue_impl(lock, task); //加一个任务,就开启一个线程
}
int max_concurrency_level() const noexcept override
{
#undef max
return std::numeric_limits<int>::max(); //details::consts::k_thread_executor_max_concurrency_level
}
bool shutdown_requested() const noexcept override { return m_atomic_abort.load(std::memory_order_relaxed); }
void shutdown() noexcept override
{
const auto abort = m_atomic_abort.exchange(true, std::memory_order_relaxed);
if (abort) return; //调用过 shutdown 了
std::unique_lock<decltype(m_lock)> lock(m_lock);
m_abort = true;
m_condition.wait(lock, [this] { return m_workers.empty(); }); //如果任务队列还没结束,就等待
if (m_last_retired.empty()) return;
assert(m_last_retired.size() == 1); //最后一项当前任务 join 后就全部关闭
m_last_retired.front().join();
m_last_retired.clear();
}
}; //class thread_executor
class worker_thread_executor;
namespace details { static thread_local worker_thread_executor* s_tl_this_worker = nullptr; }
//单任务单线程,适合执行多项关联任务的复杂线程
class alignas(64) worker_thread_executor final : public derivable_executor<worker_thread_executor>
{
private:
std::deque<task> m_private_queue;
std::atomic_bool m_private_atomic_abort;
details::thread m_thread;
alignas(64) std::mutex m_lock;
std::deque<task> m_public_queue;
std::binary_semaphore m_semaphore;
std::atomic_bool m_atomic_abort;
bool m_abort;
bool drain_queue_impl()
{
while (!m_private_queue.empty()) { //只要私有队列中还有任务
auto task = std::move(m_private_queue.front()); //从队列中抽出来执行
m_private_queue.pop_front();
if (m_private_atomic_abort.load(std::memory_order_relaxed)) return false;
task();
}
return true;
}
bool drain_queue()
{
std::unique_lock<decltype(m_lock)> lock(m_lock);
wait_for_task(lock);
assert(lock.owns_lock());
assert(!m_public_queue.empty() || m_abort); //要么外部有任务,要么取消了
if (m_abort) return false;
assert(m_private_queue.empty());
std::swap(m_private_queue, m_public_queue); //直接交换,效率最高
lock.unlock();
return drain_queue_impl();
}
void wait_for_task(std::unique_lock<std::mutex>& lock)
{
assert(lock.owns_lock());
if (!m_public_queue.empty() || m_abort) return; //有外部任务,或者取消
while (true) {
lock.unlock();
m_semaphore.acquire();
lock.lock();
if (!m_public_queue.empty() || m_abort) break;
}
}
void work_loop() noexcept
{
details::s_tl_this_worker = this;
while (true) if (!drain_queue()) return; //不断地等待任务并执行,直到 abort
}
void enqueue_local(task& task)
{
if (m_private_atomic_abort.load(std::memory_order_relaxed)) details::throw_runtime_shutdown_exception(name);
m_private_queue.emplace_back(std::move(task));
}
void enqueue_local(std::span<task> tasks)
{
if (m_private_atomic_abort.load(std::memory_order_relaxed)) details::throw_runtime_shutdown_exception(name);
m_private_queue.insert(m_private_queue.end(), std::make_move_iterator(tasks.begin()), std::make_move_iterator(tasks.end()));
}
void enqueue_foreign(task& task)
{
std::unique_lock<decltype(m_lock)> lock(m_lock);
if (m_abort) details::throw_runtime_shutdown_exception(name);
const auto is_empty = m_public_queue.empty();
m_public_queue.emplace_back(std::move(task));
lock.unlock();
if (is_empty) m_semaphore.release(); //原本是空的,就要发一个信号,唤醒等待线程
}
void enqueue_foreign(std::span<task> tasks)
{
std::unique_lock<decltype(m_lock)> lock(m_lock);
if (m_abort) details::throw_runtime_shutdown_exception(name);
const auto is_empty = m_public_queue.empty();
m_public_queue.insert(m_public_queue.end(), std::make_move_iterator(tasks.begin()), std::make_move_iterator(tasks.end()));
lock.unlock();
if (is_empty) m_semaphore.release();
}
public:
worker_thread_executor() : derivable_executor<worker_thread_executor>("worker_thread_executor"),
m_private_atomic_abort(false), m_abort(false), m_semaphore(0), m_atomic_abort(false)
{
m_thread = details::thread(details::make_executor_worker_name(name), [this] { work_loop(); });
}
void enqueue(task task) override
{
if (details::s_tl_this_worker == this) return enqueue_local(task);
enqueue_foreign(task);
}
void enqueue(std::span<task> tasks) override
{
if (details::s_tl_this_worker == this) return enqueue_local(tasks);
enqueue_foreign(tasks);
}
int max_concurrency_level() const noexcept override { return 1; } //details::consts::k_worker_thread_max_concurrency_level
bool shutdown_requested() const noexcept override { return m_atomic_abort.load(std::memory_order_relaxed); }
void shutdown() noexcept override
{
const auto abort = m_atomic_abort.exchange(true, std::memory_order_relaxed);
if (abort) return;
{
std::unique_lock<std::mutex> lock(m_lock);
m_abort = true;
}
m_semaphore.release();
if (m_thread.joinable()) m_thread.join();
decltype(m_private_queue) private_queue;
decltype(m_public_queue) public_queue;
{
std::unique_lock<std::mutex> lock(m_lock);
private_queue = std::move(m_private_queue);
public_queue = std::move(m_public_queue);
}
private_queue.clear();
public_queue.clear();
}
}; //class worker_thread_executor
//自己不执行任务,由调用者从外部启动任务
class alignas(64) manual_executor final : public derivable_executor<manual_executor>
{
private:
mutable std::mutex m_lock;
std::deque<task> m_tasks;
std::condition_variable m_condition;
bool m_abort;
std::atomic_bool m_atomic_abort;
template<class clock_type, class duration_type>
static std::chrono::system_clock::time_point to_system_time_point(std::chrono::time_point<clock_type, duration_type> time_point)
{
const auto src_now = clock_type::now();
const auto dst_now = std::chrono::system_clock::now();
return dst_now + std::chrono::duration_cast<std::chrono::milliseconds>(time_point - src_now);
}
static std::chrono::system_clock::time_point time_point_from_now(std::chrono::milliseconds ms)
{
return std::chrono::system_clock::now() + ms;
}
size_t loop_impl(size_t max_count)
{
if (max_count == 0) return 0;
size_t executed = 0;
while (true) {
if (executed == max_count) break;
std::unique_lock<decltype(m_lock)> lock(m_lock);
if (m_abort) break; //中断循环之后,锁自动解开
if (m_tasks.empty()) break;
auto task = std::move(m_tasks.front());
m_tasks.pop_front();
lock.unlock();
task();
++executed;
}
if (shutdown_requested()) details::throw_runtime_shutdown_exception(name);
return executed;
}
size_t loop_until_impl(size_t max_count, std::chrono::time_point<std::chrono::system_clock> deadline)
{
if (max_count == 0) return 0;
size_t executed = 0;
deadline += std::chrono::milliseconds(1);
while (true) {
if (executed == max_count) break;
const auto now = std::chrono::system_clock::now();
if (now >= deadline) break; //总数到了,或者终止时间到了,都会中断
std::unique_lock<decltype(m_lock)> lock(m_lock);
const auto found_task = m_condition.wait_until(lock, deadline, [this] { return !m_tasks.empty() || m_abort; });
if (m_abort) break;
if (!found_task) break;
assert(!m_tasks.empty()); //找到了,就一定不空
auto task = std::move(m_tasks.front());
m_tasks.pop_front();
lock.unlock();
task();
++executed;
}
if (shutdown_requested()) details::throw_runtime_shutdown_exception(name);
return executed;
}
void wait_for_tasks_impl(size_t count) //等够多项任务
{
if (count == 0) {
if (shutdown_requested()) details::throw_runtime_shutdown_exception(name);
return;
}
std::unique_lock<decltype(m_lock)> lock(m_lock);
m_condition.wait(lock, [this, count] { return (m_tasks.size() >= count) || m_abort; });
if (m_abort) details::throw_runtime_shutdown_exception(name);
assert(m_tasks.size() >= count);
}
size_t wait_for_tasks_impl(size_t count, std::chrono::time_point<std::chrono::system_clock> deadline)
{
deadline += std::chrono::milliseconds(1);
std::unique_lock<decltype(m_lock)> lock(m_lock);
m_condition.wait_until(lock, deadline, [this, count] { return (m_tasks.size() >= count) || m_abort; });
if (m_abort) details::throw_runtime_shutdown_exception(name);
return m_tasks.size();
}
public:
manual_executor() : derivable_executor<manual_executor>("manual_executor"), m_abort(false), m_atomic_abort(false) {}
void enqueue(task task) override
{
std::unique_lock<decltype(m_lock)> lock(m_lock);
if (m_abort) details::throw_runtime_shutdown_exception(name);
m_tasks.emplace_back(std::move(task));
lock.unlock();
m_condition.notify_all(); //有任务入队,唤醒等待线程
}
void enqueue(std::span<task> tasks) override
{
std::unique_lock<decltype(m_lock)> lock(m_lock);
if (m_abort) details::throw_runtime_shutdown_exception(name);
m_tasks.insert(m_tasks.end(), std::make_move_iterator(tasks.begin()), std::make_move_iterator(tasks.end()));
lock.unlock();
m_condition.notify_all();
}
int max_concurrency_level() const noexcept override { return std::numeric_limits<int>::max(); } //k_manual_executor_max_concurrency_level
bool shutdown_requested() const noexcept override { return m_atomic_abort.load(std::memory_order_relaxed); }
void shutdown() noexcept override
{
const auto abort = m_atomic_abort.exchange(true, std::memory_order_relaxed);
if (abort) return;
decltype(m_tasks) tasks;
{
std::unique_lock<decltype(m_lock)> lock(m_lock);
m_abort = true;
tasks = std::move(m_tasks);
}
m_condition.notify_all();
tasks.clear();
}
size_t size() const noexcept
{
std::unique_lock<decltype(m_lock)> lock(m_lock);
return m_tasks.size();
}
bool empty() const noexcept { return size() == 0; }
size_t clear()
{
std::unique_lock<decltype(m_lock)> lock(m_lock);
if (m_abort) details::throw_runtime_shutdown_exception(name);
const auto tasks = std::move(m_tasks);
lock.unlock();
return tasks.size();
}
bool loop_once() { return loop_impl(1) != 0; }
bool loop_once_for(std::chrono::milliseconds max_waiting_time) //执行结束或超时后返回
{
if (max_waiting_time == std::chrono::milliseconds(0)) return loop_impl(1) != 0;
return loop_until_impl(1, time_point_from_now(max_waiting_time));
}
template<class clock_type, class duration_type>
bool loop_once_until(std::chrono::time_point<clock_type, duration_type> timeout_time)
{
return loop_until_impl(1, to_system_time_point(timeout_time));
}
size_t loop(size_t max_count) { return loop_impl(max_count); }
size_t loop_for(size_t max_count, std::chrono::milliseconds max_waiting_time)
{
if (max_count == 0) return 0;
if (max_waiting_time == std::chrono::milliseconds(0)) return loop_impl(max_count);
return loop_until_impl(max_count, time_point_from_now(max_waiting_time));
}
template<class clock_type, class duration_type>
size_t loop_until(size_t max_count, std::chrono::time_point<clock_type, duration_type> timeout_time)
{
return loop_until_impl(max_count, to_system_time_point(timeout_time));
}
void wait_for_task() { wait_for_tasks_impl(1); }
bool wait_for_task_for(std::chrono::milliseconds max_waiting_time)
{
return wait_for_tasks_impl(1, time_point_from_now(max_waiting_time)) == 1;
}
template<class clock_type, class duration_type>
bool wait_for_task_until(std::chrono::time_point<clock_type, duration_type> timeout_time)
{
return wait_for_tasks_impl(1, to_system_time_point(timeout_time)) == 1;
}
void wait_for_tasks(size_t count) { wait_for_tasks_impl(count); }
size_t wait_for_tasks_for(size_t count, std::chrono::milliseconds max_waiting_time)
{
return wait_for_tasks_impl(count, time_point_from_now(max_waiting_time));
}
template<class clock_type, class duration_type>
size_t wait_for_tasks_until(size_t count, std::chrono::time_point<clock_type, duration_type> timeout_time)
{
return wait_for_tasks_impl(count, to_system_time_point(timeout_time));
}
}; //class manual_executor
namespace details
{
template<class executor_type>
class resume_on_awaitable : public std::suspend_always
{
private:
await_context m_await_ctx;
executor_type& m_executor;
public:
resume_on_awaitable(executor_type& executor) noexcept : m_executor(executor) {}
resume_on_awaitable(const resume_on_awaitable&) = delete;
resume_on_awaitable(resume_on_awaitable&&) = delete;
resume_on_awaitable& operator=(const resume_on_awaitable&) = delete;
resume_on_awaitable& operator=(resume_on_awaitable&&) = delete;
void await_suspend(std::coroutine_handle<void> handle)
{
m_await_ctx.set_coro_handle(handle);
try {
m_executor.template post<await_via_functor>(&m_await_ctx);
} catch (...) {
// the exception caused the enqeueud task to be broken and resumed with an interrupt, no need to do anything here.
}
}
void await_resume() const { m_await_ctx.throw_if_interrupted(); }
};
}
template<class executor_type>
auto resume_on(std::shared_ptr<executor_type> executor)
{
static_assert(std::is_base_of_v<concurrencpp::executor, executor_type>,
"resume_on() - given executor does not derive from concurrencpp::executor");
if (!static_cast<bool>(executor))
throw std::invalid_argument("resume_on - given executor is null.");
return details::resume_on_awaitable<executor_type>(*executor); //返回挂起协程的 awaitable,在指定 executor 中恢复
}
template<class executor_type>
auto resume_on(executor_type& executor) noexcept
{
return details::resume_on_awaitable<executor_type>(executor);
}
struct runtime_options
{
size_t max_cpu_threads; //CPU核心数,默认八核
std::chrono::milliseconds max_thread_pool_executor_waiting_time; //默认120秒
size_t max_background_threads; //CPU核心数的四倍
std::chrono::milliseconds max_background_executor_waiting_time; //默认120秒
std::chrono::milliseconds max_timer_queue_waiting_time; //默认120秒
runtime_options() noexcept :
max_cpu_threads(details::default_max_cpu_workers()),
max_thread_pool_executor_waiting_time(std::chrono::seconds(120)),
max_background_threads(details::default_max_background_workers()),
max_background_executor_waiting_time(std::chrono::seconds(120)),
max_timer_queue_waiting_time(std::chrono::seconds(120))
{
}
runtime_options(const runtime_options&) noexcept = default;
runtime_options& operator=(const runtime_options&) noexcept = default;
}; //struct runtime_options
class runtime
{
private:
std::shared_ptr<inline_executor> m_inline_executor;
std::shared_ptr<thread_pool_executor> m_thread_pool_executor;
std::shared_ptr<thread_pool_executor> m_background_executor;
std::shared_ptr<thread_executor> m_thread_executor;
details::executor_collection m_registered_executors;
std::shared_ptr<timer_queue> m_timer_queue;
public:
runtime() : runtime(runtime_options()) {}
runtime(const runtime_options& options)
{
m_timer_queue = std::make_shared<concurrencpp::timer_queue>(options.max_timer_queue_waiting_time);
m_inline_executor = std::make_shared<concurrencpp::inline_executor>();
m_registered_executors.register_executor(m_inline_executor);
m_thread_pool_executor = std::make_shared<concurrencpp::thread_pool_executor>("concurrencpp::thread_pool_executor",
options.max_cpu_threads, options.max_thread_pool_executor_waiting_time);
m_registered_executors.register_executor(m_thread_pool_executor);
m_background_executor = std::make_shared<concurrencpp::thread_pool_executor>("concurrencpp::background_executor",
options.max_background_threads, options.max_background_executor_waiting_time);
m_registered_executors.register_executor(m_background_executor);
m_thread_executor = std::make_shared<concurrencpp::thread_executor>();
m_registered_executors.register_executor(m_thread_executor);
}
~runtime() noexcept
{
m_timer_queue->shutdown();
m_registered_executors.shutdown_all();
}
std::shared_ptr<timer_queue> timer_queue() const noexcept { return m_timer_queue; }
std::shared_ptr<inline_executor> inline_executor() const noexcept { return m_inline_executor; }
std::shared_ptr<thread_pool_executor> background_executor() const noexcept { return m_background_executor; }
std::shared_ptr<thread_pool_executor> thread_pool_executor() const noexcept { return m_thread_pool_executor; }
std::shared_ptr<thread_executor> thread_executor() const noexcept { return m_thread_executor; }
std::shared_ptr<worker_thread_executor> make_worker_thread_executor()
{
auto executor = std::make_shared<worker_thread_executor>();
m_registered_executors.register_executor(executor);
return executor;
}
std::shared_ptr<manual_executor> make_manual_executor()
{
auto executor = std::make_shared<manual_executor>();
m_registered_executors.register_executor(executor);
return executor;
}
//details::consts::k_concurrencpp_version_major, k_concurrencpp_version_minor, k_concurrencpp_version_revision
static std::tuple<unsigned int, unsigned int, unsigned int> version() noexcept { return { 0, 1, 3 }; }
template<class executor_type, class... argument_types>
std::shared_ptr<executor_type> make_executor(argument_types&&... arguments)
{
static_assert(
std::is_base_of_v<executor, executor_type>,
"runtime::make_executor - <<executor_type>> is not a derived class of executor.");
static_assert(std::is_constructible_v<executor_type, argument_types...>,
"runtime::make_executor - can not build <<executor_type>> from <<argument_types...>>.");
static_assert(!std::is_abstract_v<executor_type>,
"runtime::make_executor - <<executor_type>> is an abstract class.");
auto executor = std::make_shared<executor_type>(std::forward<argument_types>(arguments)...);
m_registered_executors.register_executor(executor);
return executor;
}
}; //class runtime
} //namespace concurrencpp
using namespace concurrencpp;
namespace test01
{
std::vector<int> make_random_vector()
{
std::vector<int> vec(64 * 1'024);
std::srand(std::time(nullptr));
for (auto& i : vec) i = ::rand();
return vec;
}
result<size_t> count_even(std::shared_ptr<thread_pool_executor> tpe, const std::vector<int>& vector)
{
const auto vecor_size = vector.size();
const auto concurrency_level = tpe->max_concurrency_level();
const auto chunk_size = vecor_size / concurrency_level;
std::vector<result<size_t>> chunk_count;
for (auto i = 0; i < concurrency_level; i++) {
const auto chunk_begin = i * chunk_size;
const auto chunk_end = chunk_begin + chunk_size;
auto result = tpe->submit([&vector, chunk_begin, chunk_end]() -> size_t {
return std::count_if(vector.begin() + chunk_begin, vector.begin() + chunk_end, [](auto i) {
return i % 2 == 0;
});
});
chunk_count.emplace_back(std::move(result));
}
size_t total_count = 0;
for (auto& result : chunk_count)
total_count += co_await result;
co_return total_count;
}
void test()
{
runtime runtime;
const auto vector = make_random_vector();
auto result = count_even(runtime.thread_pool_executor(), vector);
const auto total_count = result.get();
std::cout << "there are " << total_count << " even numbers in the vector" << std::endl;
}
}
namespace test02
{
int fibonacci_sync(int i)
{
return i < 2 ? i : fibonacci_sync(i - 1) + fibonacci_sync(i - 2);
}
result<int> fibonacci(executor_tag, std::shared_ptr<thread_pool_executor> tpe, const int curr)
{
if (curr <= 10)
co_return fibonacci_sync(curr);
auto fib_1 = fibonacci({}, tpe, curr - 1);
auto fib_2 = fibonacci({}, tpe, curr - 2);
co_return co_await fib_1 + co_await fib_2;
}
void test()
{
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci({}, runtime.thread_pool_executor(), 30).get();
std::cout << "fibonacci(30) = " << fibb_30 << std::endl;
}
}
namespace test03
{
void test()
{
concurrencpp::result_promise<std::string> promise;
auto result = promise.get_result();
std::thread my_3_party_executor([promise = std::move(promise)]() mutable {
std::this_thread::sleep_for(std::chrono::seconds(1)); //Imitate real work
promise.set_result("hello world");
});
auto asynchronous_string = result.get();
std::cout << "result promise returned string: " << asynchronous_string << std::endl;
my_3_party_executor.join();
}
}
namespace test04
{
concurrencpp::result<void> consume_shared_result(concurrencpp::shared_result<int> shared_result,
std::shared_ptr<concurrencpp::executor> resume_executor)
{
std::cout << "Awaiting shared_result to have a value" << std::endl;
const auto& async_value = co_await shared_result;
concurrencpp::resume_on(resume_executor);
std::cout << "In thread id " << std::this_thread::get_id() << ", got: " << async_value
<< ", memory address: " << &async_value << std::endl;
}
void test()
{
concurrencpp::runtime runtime;
auto result = runtime.background_executor()->submit([] {
std::this_thread::sleep_for(std::chrono::seconds(1));
return 100;
});
concurrencpp::shared_result<int> shared_result(std::move(result));
concurrencpp::result<void> results[8];
for (size_t i = 0; i < 8; i++)
results[i] = consume_shared_result(shared_result, runtime.thread_pool_executor());
std::cout << "Main thread waiting for all consumers to finish" << std::endl;
auto all_consumed = concurrencpp::when_all(std::begin(results), std::end(results));
all_consumed.get();
std::cout << "All consumers are done, exiting" << std::endl;
}
}
namespace test05
{
concurrencpp::null_result delayed_task(std::shared_ptr<concurrencpp::timer_queue> tq,
std::shared_ptr<concurrencpp::thread_pool_executor> ex)
{
size_t counter = 1;
while(true) {
std::cout << "task was invoked " << counter << " times." << std::endl;
counter++;
co_await tq->make_delay_object(1500ms, ex);
}
}
void test()
{
concurrencpp::runtime runtime;
std::cout << "make_timer:" << std::endl;
std::atomic_size_t counter = 1;
concurrencpp::timer timer = runtime.timer_queue()->make_timer(1500ms, 2000ms, runtime.thread_pool_executor(),
[&] {
const auto c = counter.fetch_add(1);
std::cout << "timer was invoked for the " << c << "th time" << std::endl;
});
std::this_thread::sleep_for(12s);
std::cout << "make_one_shot_timer:" << std::endl;
concurrencpp::timer timer2 = runtime.timer_queue()->make_one_shot_timer(3000ms, runtime.thread_executor(),
[&] { std::cout << "hello and goodbye" << std::endl; });
std::this_thread::sleep_for(4s);
std::cout << "delayed_task:" << std::endl;
delayed_task(runtime.timer_queue(), runtime.thread_pool_executor());
std::this_thread::sleep_for(10s);
}
}
namespace task06
{
class logging_executor : public concurrencpp::derivable_executor<logging_executor>
{
private:
mutable std::mutex _lock;
std::queue<concurrencpp::task> _queue;
std::condition_variable _condition;
bool _shutdown_requested;
std::thread _thread;
const std::string _prefix;
void work_loop()
{
while (true) {
std::unique_lock<std::mutex> lock(_lock);
if (_shutdown_requested) return;
if (!_queue.empty()) {
auto task = std::move(_queue.front());
_queue.pop();
lock.unlock();
std::cout << _prefix << " A task is being executed" << std::endl;
task();
continue;
}
_condition.wait(lock, [this] { return !_queue.empty() || _shutdown_requested; });
}
}
public:
logging_executor(std::string_view prefix) : derivable_executor<logging_executor>("logging_executor"),
_shutdown_requested(false), _prefix(prefix)
{
_thread = std::thread([this] { work_loop(); });
}
void enqueue(concurrencpp::task task) override
{
std::cout << _prefix << " A task is being enqueued!" << std::endl;
std::unique_lock<std::mutex> lock(_lock);
if (_shutdown_requested)
throw concurrencpp::errors::runtime_shutdown("logging executor - executor was shutdown.");
_queue.emplace(std::move(task));
_condition.notify_one();
}
void enqueue(std::span<concurrencpp::task> tasks) override
{
std::cout << _prefix << tasks.size() << " tasks are being enqueued!" << std::endl;
std::unique_lock<std::mutex> lock(_lock);
if (_shutdown_requested)
throw concurrencpp::errors::runtime_shutdown("logging executor - executor was shutdown.");
for (auto& task : tasks)
_queue.emplace(std::move(task));
_condition.notify_one();
}
int max_concurrency_level() const noexcept override { return 1; }
bool shutdown_requested() const noexcept override
{
std::unique_lock<std::mutex> lock(_lock);
return _shutdown_requested;
}
void shutdown() noexcept override
{
std::cout << _prefix << " shutdown requested" << std::endl;
std::unique_lock<std::mutex> lock(_lock);
if (_shutdown_requested) return; //nothing to do.
_shutdown_requested = true;
lock.unlock();
_condition.notify_one();
_thread.join();
}
};
void test()
{
concurrencpp::runtime runtime;
auto logging_ex = runtime.make_executor<logging_executor>("Session #1234");
for (size_t i = 0; i < 10; i++)
logging_ex->post([] { std::cout << "hello world" << std::endl; });
std::getchar();
}
}
int main()
{
test01::test();
test02::test();
test03::test();
test04::test();
test05::test();
test06::test();
return 0;
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。