代码拉取完成,页面将自动刷新
#include<iostream>
#include<functional>
#include<unistd.h>
#include<fcntl.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<sys/epoll.h>
#include<sys/eventfd.h>
#include<sys/timerfd.h>
#include<pthread.h>
#include<cstdio>
#include<ctime>
#include<cstring>
#include<cassert>
#include<typeinfo>
#include<vector>
#include<strings.h>
#include<unordered_map>
#include<thread>
#include<mutex>
#include<memory>
#include<condition_variable>
//日志等级
#define NORMAL 0
#define DEBUG 1
#define ERROR 2
//要打印的日志等级
#define ENABLE_LOG_LEVEL NORMAL //假设我们只打印DEBUG及以上的等级
#define LOG(level,format,...) do{\
if(level<ENABLE_LOG_LEVEL) break; /*过滤掉低等级的日志信息*/ \
time_t t =time(NULL);\
struct tm* ptm = localtime(&t);\
assert(ptm);\
char str[16]={0};\
size_t ret = strftime(str,sizeof str -1,"%H:%M:%S",ptm);\
assert(ret!=-1);\
fprintf(stdout,"%p %s [%s:%d] " format"\n",(void*)pthread_self(),str,__FILE__,__LINE__,##__VA_ARGS__);\
}while(0)
//普通日志
#define NORMAL_LOG(format,...) LOG(NORMAL,format,##__VA_ARGS__);
//调试日志
#define DEBUG_LOG(format,...) LOG(DEBUG,format,##__VA_ARGS__);
//错误日志
#define ERROR_LOG(format,...) LOG(ERROR,format,##__VA_ARGS__);
class Any
{
private:
class BaseData
{
public:
//提供几个虚函数,交给子类继承,未来能够通过父类指针调用子类的重写的虚函数
virtual const std::type_info& Type() = 0; //获取数据类型
virtual BaseData* Clone()const = 0; //克隆出一份子类对象,但是返回其父类切片指针
virtual ~BaseData(){} //析构函数必须是虚函数
};
template<class T>
class Data :public BaseData// 保存数据的类
{
public:
T*_pd;
//保存数据的指针
public:
Data(const T& val):_pd(nullptr){_pd = new T(val);std::cout<<"构造:"<<*_pd<<" addr:"<<(void*)_pd<<std::endl;} //数据保存在 Any 中,new一段空间拷贝一个,因为传进来的可能是一个临时对象
~Data(){std::cout<<"析构:"<<*_pd<<" addr:"<<(void*)_pd<<std::endl;delete _pd;}
const std::type_info& Type() {if(!_pd) return typeid(void);
return typeid(T);/*typeid(*_pd)*/}
BaseData* Clone()const {return new Data<T>(*_pd);}
};
private:
BaseData* _pbase; //在Any类中只需要保存一个父类指针就行了
public:
//要提供的接口
Any():_pbase(nullptr){} //提供空构造
~Any(){delete _pbase;}
template<class T> //构造
Any(const T& val):_pbase(new Data<T>(val)){}
BaseData* Clone()const{return _pbase->Clone();} //调用 Data 的 Clone 接口就行
Any(const Any& a) //拷贝构造
{
if(a.Type()==typeid(void)) _pbase=nullptr; //注意在拷贝之前要判断被拷贝的对象是否为空对象
else _pbase = a.Clone();
}
Any& operator=(const Any&a) //赋值重载
{
Any tmp(a);
std::swap(_pbase,tmp._pbase);
return *this;
}
template<class T>
Any& operator=(const T&val) //支持隐式类型转换
{
Any tmp(val);
std::swap(_pbase,tmp._pbase);
return *this;
}
const std::type_info& Type()const {if(!_pbase) return typeid(void); _pbase->Type();} //获取数据类型
template<class T>
T* GetData()
{
//首先判断返回类型是否和我们的数据类型一致
assert(typeid(T)==Type());
//走到这里说明我们的数据类型就是 T ,换句话说,父类指针指向的是一个 Data<T> ,那么我们就可以强转成子类指针获取公有成员了
return dynamic_cast<Data<T>*>(_pbase)->_pd;
}
};
class Buffer
{
private:
std::vector<char> _buf; //空间
uint64_t _read_offset; //读偏移
uint64_t _write_offset; //写偏移
private: //私有接口,不对外,用于内部功能的实现
uint64_t FrontWriteSize()const //获取读偏移之前的空闲空间
{return _read_offset;}
uint64_t BehindWriteSize()const //获取写偏移之后的空闲空间
{return _buf.size()-_write_offset;}
uint64_t TotalWriteSize()const //获取总的可写空间
{return FrontWriteSize()+BehindWriteSize();}
char* WritePosition() //写入的起始位置
{return &_buf[_write_offset];}
bool EnsureWriteSize(size_t len) //用来移动数据以及扩容等,保证写空间足够
{
if(BehindWriteSize()>=len) return true;
if(TotalWriteSize()>=len)
{
//将可读数据往前挪动
memcpy(&_buf[0],ReadPosition(),ReadSize());
_read_offset =0;
_write_offset = ReadSize();
return true;
}
//空间不够需要扩容
_buf.resize(_write_offset+len);
return true;
}
public:
bool MoveReadOffset(size_t len) //移动读偏移
{
assert(len<=ReadSize());
_read_offset += len;
return true;
}
bool MoveWriteOffset(size_t len) //游动写偏移
{
assert(len<=BehindWriteSize());
_write_offset += len;
return true;
}
public: //对外提供的功能接口
char* ReadPosition() //获取读起始位置
{return &_buf[_read_offset];}
#define INIT_SIZE 1024
Buffer() //构造函数
:_buf(1024),_read_offset(0),_write_offset(0){}
uint64_t ReadSize() const //获取可读数据大小
{return _write_offset-_read_offset;}
bool Read(char*out,size_t len) //读取数据,不移动读偏移
{
assert(len<=ReadSize());
memcpy(out,ReadPosition(),len);
return true;
}
bool ReadAndPop(char*out,size_t len) //读取数据并移动读偏移
{
Read(out,len);
MoveReadOffset(len);
return true;
}
bool Write(const char*in,size_t len) //写入数据,不移动写偏移
{
EnsureWriteSize(len);
memcpy(WritePosition(),in,len);
return true;
}
bool WriteAndPush(const char*in,size_t len) //写入数据并移动写偏移
{
Write(in,len);
MoveWriteOffset(len);
return true;
}
std::string ReadAsString(size_t len) //读取数据,返回string,不移动读偏移
{
assert(len<=ReadSize());
std::string ret;
ret.resize(len);
memcpy(&ret[0],ReadPosition(),len);
return ret;
}
std::string ReadAsStringAndPop(size_t len) //读取数据,返回string,并移动读偏移
{
std::string ret = ReadAsString(len);
MoveReadOffset(len);
return ret;
}
bool WriteString(const std::string& in) //写入string的数据不移动写偏移
{
return Write(WritePosition(),in.size());
}
bool WriteStringAndPush(const std::string&in) //写入string的数据并移动写偏移
{
return WriteAndPush(in.c_str(),in.size());
}
#define MAX_LINE_SIZE 8092 //我们规定一行不能超过8092,因为一般来说不可能出现怎么长的一行内容,出现了,就说明可能出问题了
std::string GetLine() //获取一行数据
{
int i = 0;
char* start = ReadPosition();
for(;i<ReadSize()&&i<MAX_LINE_SIZE;++i)
{
if(start[i]=='\n') break;
}
std::string ret;
if(i!=ReadSize()&&i!=MAX_LINE_SIZE) ret = std::string(start,i+1); //注意要提取 i+1 个字节,而不是i个
return ret;
}
std::string GetLineAndPop() //获取一行数据并移动读偏移
{
std::string ret = GetLine();
//如果没有获取到一行,ret.size() 就是 0
MoveReadOffset(ret.size());
return ret;
}
void WriteBufferAndPush(Buffer buf)
{
WriteAndPush(buf.ReadPosition(),buf.ReadSize());
}
void Clear() //清除缓冲区数据
{_buf.clear();}
};
class Socket
{
private:
int _sock;
public:
Socket():_sock(-1){}
Socket(int sock):_sock(sock){}
~Socket(){/*Close();*/}
int Fd()const {return _sock;}
void Create() //创建套接字
{
_sock = socket(AF_INET,SOCK_STREAM,0);
if(_sock == -1)
{
ERROR_LOG("Create failed");
abort();
}
}
void Close() //关闭
{
close(_sock);
}
bool Bind(const std::string ip ,uint16_t port)const //绑定地址信息
{
struct sockaddr_in addr;
addr.sin_addr.s_addr = inet_addr(ip.c_str());
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
int ret = bind(_sock,(struct sockaddr*)&addr,sizeof addr);
if(ret==-1)
{
ERROR_LOG("Bind failed");
abort();
}
return true;
}
int Acceptor()const //获取新连接
{
int newfd = accept(_sock,nullptr,nullptr);
if(newfd<=0)
{
//if(errno == EAGAIN ||errno ==EWOULDBLOCK ||errno == EINTR) //前两个表示没有新连接,EINTR表示获取过程被信号打断
//return 0 ;
//ERROR_LOG("Acceptor failed"); //否则表示获取失败
return -1; //后续逻辑我们不需要关心信号打断等原因,因为没获取到就是没获取到,直接返回-1给上一层处理
}
return newfd;
}
bool Connect(std::string ip , uint16_t port)const //建立连接
{
struct sockaddr_in addr;
addr.sin_addr.s_addr = inet_addr(ip.c_str());
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
int ret = connect(_sock,(struct sockaddr*)&addr,sizeof addr);
if(ret < 0) //连接出错
{
ERROR_LOG("Connect failed");
return false;
}
return true;
}
ssize_t Send(const char*in,size_t len)const //写入数据到内核发送缓冲区(非阻塞写入)
{
ssize_t ret = write(_sock,in,len);
if(ret < 0)
if(!(errno == EAGAIN ||errno == EWOULDBLOCK || errno ==EINTR))
{
ERROR_LOG("Send failed");
return -1;
}
return ret>0?ret:0;
}
ssize_t Read(char* out, size_t len) //读取数据 (非阻塞读取)
{
ssize_t ret = read(_sock,out,len);
if(ret < 0)
{
if(!(errno == EAGAIN ||errno == EWOULDBLOCK || errno ==EINTR))
{
ERROR_LOG("Read failed");
return -1; //出错返回 -1
}
}
return ret>0?ret:-1; //没有出错就返回>=0
}
#define LISTEN_BACKLOG 1024
bool Listen() //开始监听
{
int ret = listen(_sock,LISTEN_BACKLOG);
if(ret < 0)
{
ERROR_LOG("Listen failed");
return false;
}
return true;
}
void SetNonBlock() //设置非阻塞
{
int flag = fcntl(_sock,F_GETFD);
fcntl(_sock,F_SETFD,flag | O_NONBLOCK | O_CLOEXEC); //顺便设置不可被拷贝了
// fcntl(_sock,F_SETFD,flag | O_NONBLOCK );
}
bool SetAddrReuse() //设置地址复用
{
int val = 1;
int ret = setsockopt(_sock,SOL_SOCKET,SO_REUSEADDR | SO_REUSEPORT ,&val,sizeof val);
//设置 SO_REUSEADDR 可以绑定处于 TIME_WAIT 状态的端口
//设置 SO_REUSEPORT 可以让一个端口被多个 socket 绑定,可以用于实现负载均衡
if(ret == -1)
{
ERROR_LOG("SetAddrReuse failed");
return false;
}
return true;
}
#define DEFAULT_SERVER_IP "0.0.0.0"
void CreatServerSocket(uint16_t port) //只需要传端口号就行了
{
Create();
SetNonBlock();
bool ret = SetAddrReuse();
if(!ret)
{
ERROR_LOG("SetAddrReuse failed");
abort();
}
ret = Bind(DEFAULT_SERVER_IP,port);
if(!ret)
{
ERROR_LOG("Bind failed");
abort();
}
ret = Listen();
if(!ret)
{
ERROR_LOG("Listen failed");
abort();
}
}
void CreatClientSocket(std::string ip,uint16_t port)
{
Create();
SetNonBlock();
Connect(ip,port);
}
};
class EventLoop;
class Poller;
class Channel
{
private:
using EventCallBack = std::function<void()>;
int _fd; //文件描述符
uint32_t _events; //监控的事件
uint32_t _revents; //就绪的事件
EventCallBack _read_cb; //读事件回调
EventCallBack _write_cb; //写事件回调
EventCallBack _error_cb; //错误事件回调
EventCallBack _close_cb; //挂断事件回调 一般来说,挂断事件和错误事件的处理方式是一样的
EventCallBack _event_cb; //任意事件回调
EventLoop* _loop; //后续会添加的成员
private:
//私有接口,用于真正和 Poller 模块和 EventLoop 模块联动,进行事件监控的调整
void UpdateEvents(); //op 就是未来传递给 epoll_ctl 的op参数
public:
Channel(int fd,EventLoop* loop):_fd(fd),_events(0),_revents(0),_loop(loop){}
//启动读事件监控
int Fd()const{return _fd;}//用于获取文件描述符
uint32_t Events()const{return _events;}
void EnableRead()
{
if(HasRead()) return ;//说明已经监控了读事件了
_events|= EPOLLIN;
UpdateEvents();
}
//是否启动了读事件
bool HasRead()const
{
return _events & EPOLLIN;
}
//取消读事件监控
void DisableRead()
{
if(!HasRead()) return ; //说明本来就没有监控读事件
_events &= (~EPOLLIN);
UpdateEvents();
}
//启动写事件监控
void EnableWrite()
{
if(HasWrite()) return ;
_events |= EPOLLOUT;
UpdateEvents();
}
//是否启动了写事件
bool HasWrite()const
{
return _events & EPOLLOUT;
}
//取消写事件监控
void DisableWwrite()
{
if(!HasWrite()) return;
_events &= (~EPOLLOUT);
UpdateEvents();
}
//取消所有事件监控
void DisableAll()
{
_events = 0;
UpdateEvents();
}
//移除监控
void Remove();
//设置就绪事件
void SetRevents(uint32_t revents){_revents = revents;}
//处理就绪事件
void HandlerEvents()
{
//读事件需要处理
if(_revents & (EPOLLIN | EPOLLPRI | EPOLLRDHUP))
{
// NORMAL_LOG("读事件到来:%d",_fd);
if(_read_cb) _read_cb();
if(_event_cb) _event_cb();
}
//剩下三个事件只需要处理其中一种
if(_revents & EPOLLOUT)
{
if(_event_cb) _event_cb(); //因为写事件可能会关闭连接,所以没办法,我们只能放在前面调用了
if(_write_cb) _write_cb();
}
else if(_revents & EPOLLHUP)
{
if(_event_cb) _event_cb();
if(_close_cb) _close_cb();
}
else if(_revents & EPOLLERR)
{
if(_event_cb) _event_cb();
if(_close_cb) _close_cb();
}
// _revents = 0; //这行代码不需要,因为后续只有触发了事件,调用SetRevent的Channel才会调用这个接口,SetRevent是覆盖式的
// if(_event_cb) _event_cb();
return;
}
//设置读事件回调
void SetReadCallBack(const EventCallBack& cb){ _read_cb = cb;}
//设置写事件回调
void SetWriteCallBack(const EventCallBack& cb){ _write_cb = cb;}
//设置错误事件回调
void SetErrorCallBack(const EventCallBack& cb){ _error_cb = cb;}
//设置挂断事件回调
void SetCloseCallBack(const EventCallBack& cb){ _close_cb = cb;}
//设置任意事件回调
void SetEventCallBack(const EventCallBack& cb){ _event_cb = cb;}
};
class Poller
{
private:
#define REVENTS_SIZE 1024
int _epfd; //epoll模型的操作句柄
std::unordered_map<int,Channel*> _channels; //保存管理的套接字以及对应的Channel
struct epoll_event _revents[REVENTS_SIZE]; //用于从epoll模型中获取就绪的文件描述符及其就绪事件
private:
//判断该文件描述符是否在epoll模型中
bool IsInPoller(Channel* channel) const
{
auto it = _channels.find(channel->Fd());
if(it == _channels.end()) return false;
return true;
}
void Update(Channel* channel,int op)
{
struct epoll_event ev;
ev.data.fd = channel->Fd();
ev.events = channel->Events();
int ret = epoll_ctl(_epfd,op,channel->Fd(),&ev);
if(ret == -1)
{
ERROR_LOG("epoll_ctl failed,errno:%d",errno);
perror(strerror(errno));
abort();
}
}
static int CreateEpfd()
{
#define EPOLL_SIZE 1024 //这个值大于 0 就行,无需关心
int fd = epoll_create(EPOLL_SIZE);
}
public: //提供的接口
Poller():_epfd(CreateEpfd()){}
void UpdateEvents(Channel* channel)
{
if(IsInPoller(channel))
{
Update(channel,EPOLL_CTL_MOD); //交给子函数去更新事件
return ;
}
_channels.insert(std::make_pair(channel->Fd(),channel)); //添加管理
Update(channel,EPOLL_CTL_ADD);
}
void Remove(Channel*channel)
{
//可以先判断一下有没有
auto it = _channels.find(channel->Fd());
if(it != _channels.end())
{
_channels.erase(it);
Update(channel,EPOLL_CTL_DEL);
}
}
size_t Poll(std::vector<Channel*>* actives) //返回给EventLoop模型
{
bzero(_revents,sizeof _revents);
int cnt = epoll_wait(_epfd,_revents,REVENTS_SIZE,-1); //阻塞等待
if(cnt<0)
{
if(errno == EINTR) return cnt; //信号打断
ERROR_LOG("epoll_wait failed");
abort();
}
for(int i=0;i<cnt;++i)
{
//设置Channel对象中的就绪事件
auto it = _channels.find(_revents[i].data.fd);
if(it == _channels.end())
{
ERROR_LOG("find channel failed");
abort();
}
it->second->SetRevents(_revents[i].events); //设置Channel事件
actives->push_back(it->second); //在EventLoop中只需要调用Channel的HandlerEvents进行数据处理就行了
}
return cnt;
}
};
class TimerTask
{
using task = std::function<void()>; //无参的回调,如果需要参数,有上层进行使用std::bind() 进行参数的绑定
using releasetask = std::function<void(uint64_t)>;
private:
uint64_t _id;
uint64_t _delay;
task _cb;
releasetask _release;
bool _is_canceled; //表示是否被取消
public:
TimerTask(uint64_t id,uint64_t delay,task cb,releasetask rcb):_id(id),_delay(delay),_cb(cb),_release(rcb),_is_canceled(false){
// std::cout<<"构造,id:"<<id<<"----addr:"<<this<<std::endl;
}
~TimerTask(){if(!_is_canceled)_cb();_release(_id);
// std::cout<<"析构,id:"<<_id<<"----addr:"<<this<<std::endl;
} //_iscanceled 为false表示该任务未被取消,这时候执行任务回调
uint64_t GetDelay(){return _delay;} //获取定时任务设置的延时
void CancelTask(){_is_canceled = true;}
void EableTimerTask(){_is_canceled = false;}
};
class TimerWheel
{
using task = std::function<void()>;
private:
std::vector<std::vector<std::shared_ptr<TimerTask>>> _wheel;
std::unordered_map<uint64_t,std::weak_ptr<TimerTask>> _tasks;
int _timer_idx;
int _timerfd;
EventLoop* _loop;
Channel* _timerfd_channel;
#define MAXTIME 60
private:
static int CreateTimerfd()
{
int timerfd = timerfd_create(CLOCK_MONOTONIC,TFD_CLOEXEC|TFD_NONBLOCK);
assert(timerfd!=-1);
struct itimerspec timeout;
//第一次超时时间间隔
timeout.it_value.tv_sec = 2; // 第一次超时为 3 s
timeout.it_value.tv_nsec = 0;
//第二次以及之后的超时时间间隔
timeout.it_interval.tv_sec = 1; // 往后每隔 1s 超时一次
timeout.it_interval.tv_nsec = 0;
int ret = timerfd_settime(timerfd,0,&timeout,NULL); //设置定时通知
assert(ret!=-1); //返回值为-1表示设置失败,但是一般是不会失败的,可以不用关心
return timerfd;
}
void AddTimerTaskInLoop(uint64_t id , uint64_t delay,task cb)
{
assert(_tasks.find(id) == _tasks.end()); //确保 id 合法
std::shared_ptr<TimerTask> pt(new TimerTask(id,delay,cb,std::bind(&TimerWheel::RealeaseTask ,this,std::placeholders::_1))); //构建任务对象
std::weak_ptr<TimerTask> wpt(pt); //weak_ptr
int pos = (_timer_idx + delay) % MAXTIME; //计算到期时间
_wheel[pos].push_back(pt); //定时任务放入时间轮
_tasks[id] = wpt; //添加到map中管理
}
//刷新/延迟定时任务
void RefreshTimerTaskInLoop(uint64_t id)
{
auto it = _tasks.find(id);
if(it==_tasks.end()) return; //id 不合法直接返回false
std::shared_ptr<TimerTask> pt = it->second.lock(); //构造新的shared_ptr
int pos = (_timer_idx + pt->GetDelay()) % MAXTIME; //找到新的位置
_wheel[pos].push_back(pt);
pt->EableTimerTask();
}
//取消定时任务
void CancelTimerTaskInLoop(uint64_t id)
{
auto it = _tasks.find(id);
if(it==_tasks.end()) return;
(it->second).lock()->CancelTask();
}
public:
TimerWheel(EventLoop* loop):_wheel(MAXTIME),_timer_idx(0),_loop(loop),_timerfd(CreateTimerfd()),_timerfd_channel(new Channel(_timerfd,_loop))
{
_timerfd_channel->SetReadCallBack(std::bind(&TimerWheel::OnTime,this));
_timerfd_channel->EnableRead();
}
~TimerWheel(){delete _timerfd_channel;}
void OnTime()
{
TimerRead();
RunTick();
}
void TimerRead()
{
uint64_t val = 0;
int ret = read(_timerfd,&val,sizeof val);
if(ret<0)
{
if(errno == EAGAIN ||errno == EWOULDBLOCK ||errno == EINTR) return;
ERROR_LOG("timerfd read failed");
abort();
}
return;
}
//添加定时任务
bool AddTimerTask(uint64_t id , uint64_t delay,task cb);
//刷新/延迟定时任务
bool RefreshTimerTask(uint64_t id);
//删除map的映射
bool RealeaseTask(uint64_t id)
{
auto it = _tasks.find(id);
if(it==_tasks.end()) return false;
_tasks.erase(it);
return true;
}
//取消定时任务
bool CancelTimerTask(uint64_t id);
//移动秒针
void RunTick()
{
_timer_idx ++;
_timer_idx %= MAXTIME;
// DEBUG_LOG("tick:%d",_timer_idx);
_wheel[_timer_idx].clear();
}
bool HasTimerTask(uint64_t id) const
{
auto it = _tasks.find(id);
return it != _tasks.end();
}
};
class EventLoop
{
private:
using Task = std::function<void()>;
std::thread::id _thread_id; //绑定的线程的id
std::vector<Task> _queue; //任务队列
std::mutex _mutex; //保证任务队列的安全
Poller _poller; //用于事件监控
int _eventfd; //用于事件通知
Channel* _eventfd_channel; //管理eventfd的事件
TimerWheel _timer_wheel; //超时管理模块
private:
void RunAllTask()
{
std::vector<Task> tasks;
{
std::unique_lock<std::mutex> lock(_mutex); //加锁
tasks.swap(_queue); //取任务
}
for(auto&f:tasks)
f(); //执行任务
}
bool IsInLoop()const {return std::this_thread::get_id() == _thread_id ;}
void PushTask(const Task& f)
{
{
std::unique_lock<std::mutex> lock(_mutex);
_queue.push_back(f);
}
WakeUp();
}
static int CreateEventfd() //用于创建一个 eventfd
{
int fd = eventfd(0,EFD_CLOEXEC|EFD_NONBLOCK);
if(fd==-1)
{
ERROR_LOG("create eventfd failed");
abort();
}
return fd;
}
void EventReadCallBack()const
{
uint64_t cnt = 0;
int ret = read(_eventfd,&cnt,sizeof cnt);
if(ret<0)
{
if(errno ==EAGAIN ||errno==EWOULDBLOCK ||errno ==EINTR) return;
ERROR_LOG("read eventfd failed");
}
return;
}
public: //对外提供的功能接口
EventLoop():_thread_id(std::this_thread::get_id()),_eventfd(CreateEventfd()),_eventfd_channel(new Channel(_eventfd,this)),_timer_wheel(this)
{
_eventfd_channel->SetReadCallBack(std::bind(&EventLoop::EventReadCallBack,this)); //设置读回调函数
_eventfd_channel->EnableRead(); //启动读事件监听
}
void UpdateEvent(Channel* channel)
{
_poller.UpdateEvents(channel);
}
void AssertInLoop()const{assert(_thread_id == std::this_thread::get_id());}
void RemoveEvent(Channel* channel)
{
_poller.Remove(channel);
}
void RunInLoop(const Task& f)
{
if(IsInLoop()) f(); //如果是绑定的线程就直接执行
else PushTask(f);
}
void Start()
{
while(1)
{
//1 监听事件
std::vector<Channel*> actives;
int ret = _poller.Poll(&actives);
//2 执行IO回调
for(auto channel:actives) channel->HandlerEvents();
//3 执行任务队列的任务
RunAllTask();
}
}
void WakeUp() //唤醒EventLoop线程
{
uint64_t val = 1;
int ret = write(_eventfd,&val,sizeof val);
if(ret < 0)
{
if(errno == EAGAIN ||errno == EWOULDBLOCK ||errno==EINTR) return;
ERROR_LOG("WakeUp failed");
abort();
}
return;
}
~EventLoop(){delete _eventfd_channel;}
void AddTimerTask(uint64_t id , uint64_t delay,Task f)
{
_timer_wheel.AddTimerTask(id,delay,f);
}
void RefreshTimerTask(uint64_t id)
{
_timer_wheel.RefreshTimerTask(id);
}
void CancelTimerTask(uint64_t id)
{
_timer_wheel.CancelTimerTask(id);
}
bool HasTimerTask(uint64_t id)const
{return _timer_wheel.HasTimerTask(id);}
};
void Channel::UpdateEvents() //op 就是未来传递给 epoll_ctl 的op参数
{
//后续调用EventLoop提供的接口
_loop->UpdateEvent(this);
}
//移除监控
void Channel::Remove()
{
DisableAll();
//调用_loop提供的Remove接口
_loop->RemoveEvent(this);
}
//添加定时任务
bool TimerWheel::AddTimerTask(uint64_t id , uint64_t delay,task cb)
{
_loop->RunInLoop(std::bind(&TimerWheel::AddTimerTaskInLoop,this,id,delay,cb));
}
//刷新/延迟定时任务
bool TimerWheel::RefreshTimerTask(uint64_t id)
{
_loop->RunInLoop(std::bind(&TimerWheel::RefreshTimerTaskInLoop,this,id));
}
bool TimerWheel::CancelTimerTask(uint64_t id)
{
_loop->RunInLoop(std::bind(&TimerWheel::CancelTimerTaskInLoop,this,id));
}
enum CONN_STATU
{
CLOSED, //关闭状态,不再进行任何操作,释放资源
CONNECTING, //连接待处理,还需要进行各项设置才能开始监听通信
CONNECTED, //连接建立,可以通信
CLOSING //连接带关闭,尽快处理缓冲区剩余数据
};
class Connection;
using PtrConnection = std::shared_ptr<Connection>;
class Connection :public std::enable_shared_from_this<Connection>
{
private:
uint64_t _id; //连接的id,也是连接对应的超时任务的id
int _sockfd; //保存对应的描述符
EventLoop* _loop; //绑定的EventLoop
CONN_STATU _con_statu; //连接状态
Socket _socket; //套接字操作模块
Channel _channel; //事件管理模块
Buffer _in_buffer; //输入缓冲区
Buffer _out_buffer; //输出缓冲区
Any _context; //连接上下文
bool _enable_inactive_release; //标识是否开启非活跃连接超时释放机制
using EventCallBack = std::function<void(const PtrConnection&)>; //用户设置的任意事件回调
using ConnectCallBack = std::function<void(const PtrConnection&)>; //用户设置的连接建立回调
using MessageCallBack = std::function<void(const PtrConnection&,Buffer*)>; //用户设置的新数据回调,需要传递输入缓冲区
using CloseCallBack = std::function<void(const PtrConnection&)>; //用户设置的连接关闭回调
using SvrCloseCallBack = std::function<void(const PtrConnection&)>; //TcpServer设置的连接关闭回调
ConnectCallBack _connect_cb;
MessageCallBack _message_cb;
CloseCallBack _close_cb;
EventCallBack _event_cb;
SvrCloseCallBack _svr_close_cb;
private:
void ReleaseInLoop() //实际关闭连接
{
// assert(_con_statu == CLOSING); //有可能是非活跃超时而引起的调用,所以无需断言
_con_statu = CLOSED; //更新状态
if(_enable_inactive_release) DisableInactuveRelease(); //取消定时任务,我们的定时任务的key值使用的是_id,所以还需要将_id传进去
_channel.Remove(); //移除事件监控,这里会有问题
_socket.Close(); //关闭套接字
if(_close_cb) _close_cb(shared_from_this()); //先调用用户设置的关闭连接调用
if(_svr_close_cb) _svr_close_cb(shared_from_this()); //再调用TcpServer提供的接口用于移除管理,删除基础计数
//注意调用的顺序,因为Release接口并不需要保留一个shared_ptr参数,那么就意味着,可能调用完 _svr_close_cb之后,计数就为0,把资源释放了,那么这时候已经无法找到_close_cb了
}
void ShutDownInLoop()
{
_con_statu = CLOSING; //设置连接待关闭
if(_in_buffer.ReadSize()) //有数据待处理
if(_message_cb) _message_cb(shared_from_this(),&_in_buffer);
//所有数据都处理完之后,处理待发送数据
if(_out_buffer.ReadSize()) //如果有数据待发送
{
_channel.EnableWrite(); //启动写事件监听
//触发写事件之后,在写事件回调中会调用Release进行连接释放
}
else Release(); //如果没有数据待发送就直接关闭
}
void HandlerRead() //读事件回调
{
// 1 从套接字读取数据
#define READ_SIZE 65535
char buffer[READ_SIZE] = {0};
int ret = _socket.Read(buffer,READ_SIZE-1);
if(ret < 0 ) //对Socket的Read做处理,只有真正出错的时候才返回 -1 ,其他的时候都返回 >= 0 的值
{
//说明套接字出错,那么此时也不能直接关闭连接,因为输出缓冲区中还有数据待发送,所以是调用ShutDown接口
ShutDown(); //先处理剩余数据,再实际关闭连接
}
// 2 放入输入缓冲区
_in_buffer.WriteAndPush(buffer,ret);
// 3 调用新数据回调
if(_in_buffer.ReadSize()) //可能没读到数据,被信号打断了或者其他原因
{
if(_message_cb) _message_cb(shared_from_this(),&_in_buffer); //shared_from_this() 会从当前对象的第一个创建的shared_ptr中进行拷贝
}
// if(_out_buffer.ReadSize()) _channel.EnableWrite(); //如果写入了新的数据,那么开启写事件监控 //这行代码放在Send接口中更合适
}
void HandlerWrite() //写事件回调
{
// 1 向套接字写入数据
int ret =_socket.Send(_out_buffer.ReadPosition(),_out_buffer.ReadSize()); //直接尝试全部写完
// 2 判断是否写完
if(ret < 0) //在 Socket 的Send接口中做处理,只有真正出错时才返回 -1 ,那么这时候不需要再继续任何处理数据的操作了,直接关闭连接
{
Release(); // 这个接口是实际关闭连接的接口
return;
}
if(ret == _out_buffer.ReadSize()) //说明写完了,那么可以关闭写事件的监控了
{
_channel.DisableWwrite(); //关闭写事件监控,直到下一次
}
//否则就表示没写完,那么就先不管比写事件监控
_out_buffer.MoveWriteOffset(ret); //不管怎么样都要移动读偏移
//然后判断连接是否是待关闭状态,如果是,写完这次数据我们就必须要关闭连接了
if(_con_statu == CLOSING) Release();
}
void HandlerClose() //挂断事件回调
{
if(_in_buffer.ReadSize())
_message_cb(shared_from_this(),&_in_buffer); //挂断了,读写都关闭了,无法发出去了,所以看还有没有功能性的请求就行了
Release();
}
void HandlerError() //错误事件回调
{
HandlerClose();
}
void HandlerEvent() //任意事件回调
{
if(_enable_inactive_release) //如果启动了非活跃超时释放
_loop->RefreshTimerTask(_id);
if(_event_cb) _event_cb(shared_from_this());
}
void EnableInactiveReleaseInLoop(uint64_t delay = 30) //启动非活跃连接销毁
{
_enable_inactive_release = true;
//判断是否已经有超时任务
if(_loop->HasTimerTask(_id)) //说明这个连接有定时任务在时间轮,那么直接刷新延迟、
{
_loop->RefreshTimerTask(_id); //直接刷新,前面的时间轮的刷新策略需要更新,再刷新的时候需要启动任务,也就是 _is_cancled = true
}
else //没有定时任务,那么需要添加
{
_loop->AddTimerTask(_id,delay,std::bind(&Connection::Release,this)); //把销毁任务添加进去
}
}
void DisableInactuveReleaseInLoop() //取消非活跃连接销毁
{
_enable_inactive_release = false;
//判断时间轮中是否有该任务,如果有,那么需要取消
if(_loop->HasTimerTask(_id))
_loop->CancelTimerTask(_id);
}
void EstablishedInLoop()
{
if(_con_statu!=CONNECTING) abort(); //出错,因为这个函数调用时一定是处于连接待处理阶段的,不可能是其他的状态
_con_statu = CONNECTED; //更新状态
//设置定时任务?
_channel.EnableRead(); //启动读事件监听
if(_connect_cb) _connect_cb(shared_from_this()); //调用用户设置的连接建立回调
}
void SendInLoop(Buffer buf) //发送数据(用户层面)
{
if(_con_statu == CLOSED) return;
_out_buffer.WriteBufferAndPush(buf);
//启动写事件监控
if(_out_buffer.ReadSize()) _channel.EnableWrite();
}
void UpgradeInLoop(const Any& context,const ConnectCallBack& con , const MessageCallBack& msg, const CloseCallBack& clo,const EventCallBack& evt)
{
_context = context;
_connect_cb = con;
_message_cb = msg;
_event_cb = evt;
_close_cb = clo;
}
public: //功能接口
Connection(uint64_t id , int sockfd ,EventLoop* loop)
:_id(id),_sockfd(sockfd),_loop(loop),_con_statu(CONNECTING),_socket(sockfd),_channel(_sockfd,_loop),_enable_inactive_release(false)
{
_channel.SetReadCallBack(std::bind(&Connection::HandlerRead,this));
_channel.SetWriteCallBack(std::bind(&Connection::HandlerWrite,this));
_channel.SetCloseCallBack(std::bind(&Connection::HandlerClose,this));
_channel.SetErrorCallBack(std::bind(&Connection::HandlerError,this));
_channel.SetEventCallBack(std::bind(&Connection::HandlerEvent,this));
}
void SetMessageCallBack(const MessageCallBack& cb){_message_cb = cb;}
void SetConnectCallBack(const ConnectCallBack& cb){_connect_cb = cb;}
void SetCloseCallBack(const CloseCallBack& cb){_close_cb = cb;}
void SetEventCallBack(const EventCallBack& cb){_event_cb = cb;}
void SetSvrCloseCallBack(const SvrCloseCallBack& cb){_svr_close_cb = cb;}
void EnableInactiveRelease(uint64_t delay) //启动非活跃连接销毁
{
_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop,this,delay));
}
void DisableInactuveRelease() //取消非活跃连接销毁
{
_loop->RunInLoop(std::bind(&Connection::DisableInactuveReleaseInLoop,this));
}
void SetContext(const Any& context){_context = context;} //设置上下文
Any* GetContext() {return &_context;}
void Established()
{
_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop,this));
}
void Send(const char* in,size_t len)
{
Buffer buf;
buf.WriteAndPush(in,len);
_loop->RunInLoop(std::bind(&Connection::SendInLoop,this,buf));
}
void Upgrade(const Any& context,const ConnectCallBack& con , const MessageCallBack& msg, const CloseCallBack& clo,const EventCallBack& evt)
{
_loop->AssertInLoop();
_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop,this,context,con,msg,clo,evt));
}
void ShutDown()
{
_loop->RunInLoop(std::bind(&Connection::ShutDownInLoop,this));
}
void Release() //不提供给外部
{
_loop->RunInLoop(std::bind(&Connection::ReleaseInLoop,this));
//NORMAL_LOG("Release:%d",_sockfd);
}
int Fd()const {return _sockfd;}
uint64_t Id()const {return _id;}
~Connection(){
DEBUG_LOG("CONNECTION FREE : %p",this);
}
};
class Acceptor
{
private:
Socket _lst_socket;
EventLoop* _loop;
Channel _channel;
using ReadCallBack = std::function<void(int)> ;
ReadCallBack _read_cb;
private:
Socket CreateListenSocket(uint16_t port)
{
Socket lst;
lst.CreatServerSocket(port);
return lst;
}
void HandlerRead()
{
//首先接收新的描述符
int newfd = _lst_socket.Acceptor();
if(newfd < 0) return;
if (_read_cb) _read_cb(newfd);
else close(newfd); //否则就直接关闭,免得浪费资源
}
public:
Acceptor(EventLoop*loop,uint16_t port):_lst_socket(CreateListenSocket(port)),_loop(loop),_channel(_lst_socket.Fd(),_loop)
{
//设置读回调
_channel.SetReadCallBack(std::bind(&Acceptor::HandlerRead,this));
}
void SetReadCallBack(const ReadCallBack& cb){_read_cb=cb;}
void Listen() //开始监听
{
//启动读事件监控
_channel.EnableRead();
}
};
//EventLoop 类,为什么每一个EventLoop模块绑定一个线程
//但是有一个问题就是:怎么绑定呢? 我们的EventLoop在构造的时候就绑定线程id了,那么当然是创建线程,然后在线程的入口函数中创建一个EventLoop模块,最后将对象指针传递给TcpServer模块进行负载均衡
//虽然我们前面就是这样设计的,但是为什么这样呢?能不能先创建一批线程,然后在主线程中创建一批EventLoop对象,然后将各个线程的id绑定进EventLoop?
//这个操作是可行的,但是我们创建处EventLoop对象之后,到为其绑定线程之前,是有一个窗口的,那么在这个事件窗口到来的连接或者连接的操作就会出现问题,可能就无法满足全部在一个线程中执行了
class EventLoopThread
{
private:
EventLoop* _loop;
std::thread _thread;
std::mutex _mutex; //保护_loop安全
std::condition_variable _cond; //实现同步
private:
//线程的入口函数
void StartRoutine()
{
//加锁创建对象
EventLoop* loop = new EventLoop();
{
std::unique_lock<std::mutex> lock(_mutex);
_loop = loop;
}
//唤醒条件变量下的线程
_cond.notify_all();
//启动EventLoop循环
loop -> Start();
}
public:
EventLoopThread():_loop(nullptr),_thread(std::bind(&EventLoopThread::StartRoutine,this)){}
//提供一个接口用于获取内部的EventLoop
//意味着这个_loop会被多个线程竞争,那么需要锁和条件变量来实现同步互斥
//因为未来线程刚创建的时候,在还没有创建好EventLoop对象的时候,这时候就可能会被主线程或者其他线程来获取Loop了,那么这时候是线程不安全的,所以需要加锁保护
//同时,为了防止线程中的EventLoop对象还没创建就有线程来获取,我们需要再使用一个条件变量。 申请到锁之后,如果条件不满足,线程还需要在条件变量下等待,直到条件满足再来竞争锁并获取锁
EventLoop* GetEventLoop()
{
EventLoop* ret = nullptr;
{
std::unique_lock<std::mutex> lock(_mutex); //加锁
_cond.wait(lock,[&](){return _loop!=nullptr;}); //判断函数返回值为真
//走到这里说明被唤醒了
ret = _loop;
}
return ret;
}
};
class EventLoopThreadPool //Reactor模型线程池
{
private:
size_t _thread_cnt; //从属Reactor线程数量
EventLoop* _base_loop; //主Reactor线程的EventLoop
std::vector<EventLoopThread*> _pool; //从属线程池 , 这里不要直接适用对象,而是存储线程对象的指针更好,因为EventLoopThread不支持拷贝构造
std::vector<EventLoop*> _loops; //从属Reactor线程的EventLoop对象集合
uint64_t _loops_idx; //下一次分配的EventLoop*的下标
public:
EventLoopThreadPool(EventLoop* loop):_thread_cnt(0),_base_loop(loop),_loops_idx(0){}
void SetThreadCount(size_t cnt){_thread_cnt = cnt;} //设置从属线程数量
void Start() //启动线程池
{
_pool.resize(_thread_cnt); //创建线程,但是这里有问题,thread只能进行移动构造,无法拷贝构造
_loops.resize(_thread_cnt);
//获取EventLoop指针
for(int i =0 ;i < _thread_cnt ; ++i)
{
//创建对象
_pool[i] = new EventLoopThread();
_loops[i] = _pool[i]->GetEventLoop();
}
}
EventLoop* GetEventLoop() //分配从属EventLoop*
{
if(_thread_cnt == 0) return _base_loop;
return _loops[(_loops_idx++)%_thread_cnt];
}
};
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。