加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
threadpool.h 8.79 KB
一键复制 编辑 原始数据 按行查看 历史
jin 提交于 2024-01-11 10:09 . coding
#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <unordered_map>
#include <utility>
#include <sys/time.h>
static const size_t MAX_TASK_THRESHOLD = 20;
static const size_t MAX_THREAD_THRESHOLD = INT8_MAX;
static const size_t MAX_IDLE_TIME = 60; // second
// class BaseThread {
// public:
// using ThreadFunc = std::function<void(size_t)>;
// BaseThread(ThreadFunc func) : _func(func)/* , _tid(0) */{
// // std::thread::id tid = std::this_thread::get_id();
// // _tid = *(size_t *)&tid;
// struct timeval tv;
// gettimeofday(&tv, NULL);
// // std::cout << "Millisecond: " << tv.tv_sec * 1000 + tv.tv_usec / 1000 << std::endl;
// // std::cout << "Microsecond: " << tv.tv_sec * 1000 * 1000 + tv.tv_usec << std::endl;
// _tid = tv.tv_sec * 1000 * 1000 + tv.tv_usec;
// std::cout << "thread id: " << _tid << std::endl;
// }
// ~BaseThread() {}
// void start() {
// // 创建一个线程来执行一个线程函数
// std::thread t(_func, _tid);
// t.detach();
// }
// size_t get_tid() {
// // std::thread::id tid = std::this_thread::get_id();
// // return *(int*)&tid;
// std::cout << " get thread id: " << _tid << std::endl;
// return _tid;
// }
// private:
// size_t _tid;
// // static int _gid;
// ThreadFunc _func;
// };
// // int Thread::_gid = 0;
enum poolModel {
FIXED,
CACHED
};
class ThreadPool {
public:
ThreadPool()
:_model(poolModel::CACHED),
_init_thread_size(0),
_cur_thread_num(0),
_thread_threshold(MAX_THREAD_THRESHOLD),
_is_running(false),
_idle_thread_num(0),
_task_num(0),
_max_task_threshold(MAX_TASK_THRESHOLD) {
std::cout << "ThreadPool()" << std::endl;
}
~ThreadPool() {
_is_running = false;
std::unique_lock<std::mutex> lock(_task_mutex);
// 先让主线程等等一下线程池中的线程
_not_empty.notify_all();
_exit.wait(lock, [&] { return _threads.size() == 0; });
}
// 开启线程
void start(size_t init_thread_size = std::thread::hardware_concurrency()) {
std::cout << "init therad size: " << init_thread_size << std::endl;
// 设置线程运行状态
_is_running = true;
// 设置线程数量
_init_thread_size = _cur_thread_num = init_thread_size;
// 创建线程
for (size_t i = 0; i < init_thread_size; i++) {
auto ptr = std::make_unique<BaseThread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
size_t tid = ptr->get_tid();
_threads.emplace(tid, std::move(ptr));
}
for (auto& t : _threads) {
std::cout << "ID: " << t.first << std::endl;
// std::cout << "Func: " << t.second << std::endl;
}
// 运行线程函数,每运行一个线程就有一个空闲线程
for (auto& t : _threads) {
t.second->start();
_idle_thread_num++;
}
// for (size_t i = 0; i < _init_thread_size; i++) {
// _threads[i]->start();
// _idle_thread_num++;
// }
}
// 设置工作模式
void set_pool_model(poolModel model) {
if (!check_running()) {
_model = model;
}
}
// 设置任务队列阈值
void set_max_task_threshold(size_t threshold) {
if (!check_running()) {
_max_task_threshold = threshold;
}
}
// 设置线程阈值
void set_thread_threshold(size_t threshold) {
if (!check_running() && poolModel::CACHED == _model) {
_thread_threshold = threshold;
}
}
// 提交任务
template<class Func, class ...Args>
auto submit(Func&& func, Args&& ...args)->std::future<decltype(func(args...))> {
// 首先获取返回值类型
using ReturnType = decltype(func(args...));
// 将bind绑定的 return_type Func(Args ...args)函数对象赋值给package_task里的返回值为return_type ,无形参列表的函数
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
std::future<ReturnType> result = task->get_future();
std::unique_lock<std::mutex> lock(_task_mutex);
if (!_not_full.wait_for(lock, std::chrono::seconds(1),
[&]()->bool { return _task_queue.size() < _max_task_threshold; })) {
printf("Task queue is full, fail to sumbit tasks.\n");
// 返回一个空的return_type 类型
auto empty = std::make_shared<std::packaged_task<ReturnType()>>([]()->ReturnType { return ReturnType(); });
(*empty)();
return empty->get_future();
}
// 利用中间函数void()来执行packaged_task打包的对象函数task
_task_queue.emplace([task]() { (*task)(); });
_task_num++;
// 任务队列不为空,唤醒线程来执行
_not_empty.notify_all();
// CACHED模式下: 空闲线程 小于现有有任务数量且线程数量达不到线程阈值,则动态开辟线程
if (poolModel::CACHED == _model && _idle_thread_num < _task_num && _thread_threshold > _cur_thread_num) {
printf("Dynamic create thread.\n");
auto pt = std::make_unique<BaseThread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
pt->start();
int tid = pt->get_tid();
_threads.emplace(tid, std::move(pt));
_cur_thread_num++;
_idle_thread_num++;
}
return result;
}
private:
// 因为控制安全的变量都在ThreadPool中,且线程高度的行为都是一样的
// (在任务队列中竞争任务),所以这里设置线程处理函数
void threadFunc(size_t tid) {
auto last_time = std::chrono::high_resolution_clock::now();
while (true) {
Task task;
{
std::cout << std::this_thread::get_id() << " 尝试获取新任务" << std::endl;
// lock
std::unique_lock<std::mutex> lock(_task_mutex);
while (_task_num == 0) {
if (!_is_running) {
// 回收执行完任务的线程
_threads.erase(tid);
std::cout << "Task num: " << _task_num << std::endl;
std::cout << "Thread: " << std::this_thread::get_id() << " exit!!!" << std::endl;
// 唤醒用户线程
_exit.notify_all();
return ;
}
// CACHED 模式
if (poolModel::CACHED == _model) {
// CACHED: 在大于固定线程大小数量时,一个空闲线程超过60s未使用就会销毁
// 每1s检测一次
if (std::cv_status::timeout == _not_empty.wait_for(lock, std::chrono::seconds(1))) {
auto now = std::chrono::high_resolution_clock::now();
auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - last_time);
if (dur.count() >= MAX_IDLE_TIME && _cur_thread_num > _init_thread_size) {
// 回收线程对象
// 通过线程ID映射来回收线程对象
_threads.erase(tid);
_cur_thread_num--;
_idle_thread_num++;
std::cout << "Thread: " << std::this_thread::get_id() << " exit!!!" << std::endl;
return ;
}
}
}
else {
// FIXED
// std::cout << "wait here!!!!!!!!!!" << std::endl;
_not_empty.wait(lock);
}
}
std::cout << std::this_thread::get_id() << " 成功获取新任务." << std::endl;
_idle_thread_num--; // 空闲线程减少
task = _task_queue.front();
_task_queue.pop();
_task_num--;
// 如果任务队列中还有任务,唤醒其它线程
if (!_task_queue.empty()) {
_not_empty.notify_all();
}
_not_full.notify_all();
} // 线程取走任务后就立刻释放锁资源
// 执行任务
if (task) {
task(); // 执行function<void()>
std::cout << "One task done !!!!!!!!!!!!!!" << std::endl;
}
_idle_thread_num++; // 任务执行完毕空闲线程增加
last_time = std::chrono::high_resolution_clock::now(); // 更新线程执行完的时间
}
}
// 检查线程是否运行
bool check_running() const {
return _is_running;
}
private:
poolModel _model;
std::unordered_map<size_t, std::unique_ptr<BaseThread>> _threads;
size_t _init_thread_size;
std::atomic_uint _cur_thread_num;
std::atomic_uint _idle_thread_num;
size_t _thread_threshold;
std::atomic_bool _is_running;
using Task = std::function<void()>; // void() 作为一个包装函数
std::queue<Task> _task_queue;
std::atomic_uint _task_num;
size_t _max_task_threshold;
std::mutex _task_mutex;
std::condition_variable _not_full;
std::condition_variable _not_empty;
std::condition_variable _exit;
};
#endif // !_THREADPOOL_H_
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化