加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
EventLoop.cc 4.46 KB
一键复制 编辑 原始数据 按行查看 历史
#include "EventLoop.h"
#include "Logger.h"
#include "Poller.h"
#include "Channel.h"
#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <memory>
//防止一个线程创建多个EventLoop
__thread EventLoop* t_loopInThisThread = nullptr;
//定义默认的Poller IO复用接口的超时时间
const int kPollTimeMs = 10000;
//创建wakeupfd,用来唤醒subReactor处理新来的channel
int createEventFd(){
int evtfd = ::eventfd(0,EFD_NONBLOCK | EFD_CLOEXEC);
if(evtfd < 0){
LOG_FATAL("eventfd error:%d \n",errno);
}
return evtfd;
}
EventLoop::EventLoop()
:looping_(false)
,quit_(false)
,callingPendingFunctors_(false)
,threadId_(CurrentThread::tid())
,poller_(Poller::newDefaultPoller(this))
,wakeupFd_(createEventFd())
,wakeupChannel_(new Channel(this,wakeupFd_))
{
LOG_DEBUG("EventLoop create %p in thread %d \n", this, threadId_);
if(t_loopInThisThread){
LOG_FATAL("Another EventLoop %p exists in this thread %d \n",t_loopInThisThread, threadId_);
}
else{
t_loopInThisThread = this;
}
//设置wakeupfd的事件类型以及发生事件后的回调操作
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead,this));
//这样每一个EventLoop都将监听wakeupChannel的EPOLLIN读事件了
wakeupChannel_->enableReading();
}
EventLoop::~EventLoop(){
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupFd_);
t_loopInThisThread == nullptr;
}
void EventLoop::loop(){
looping_ = true;
quit_ = false;
LOG_INFO("EventLoop %p start looping \n", this);
while(!quit_){
activeChannels_.clear();
//监听两类fd,一种是client的fd,一种是wakeupfd
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for(Channel* channel : activeChannels_){
//Poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理相应的事件
channel->handleEvent(pollReturnTime_);
}
//执行当前EventLoop事件循环需要处理的回调操作
//mainLoop 事先注册了一个回调cb(需要subLoop来执行),wakeup subLoop后,执行下面的方法,执行之前mainLoop注册的cb操作
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop looping. \n", this);
looping_ = false;
} //开启事件循环
void EventLoop::quit(){
//1、loop在自己的线程中调用quit
quit_ = true;
//2、如果是在其它线程中调用的quit_,在一个subLoop(workerthread)中,调用了mainLoop(IO线程)的quit
if(!isInLoopThread()){
wakeup();
}
} //退出事件循环
//在当前loop中执行cb
void EventLoop::runInLoop(Functor cb){
if(isInLoopThread()){ // 在当前的loop线程中,执行cb
cb();
}
else{ //在非当前loop线程中执行cb,就需要唤醒loop所在线程执行cb
queueInLoop(cb);
}
}
//把cb放入队列中,唤醒loop所在的线程,执行cb
void EventLoop::queueInLoop(Functor cb){
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}
//唤醒相应的,需要执行上面回调操作的loop线程了
//|| callingPendingFunctors_的意思是:当前loop正在执行回调,但是loop又有了新的回调
if(!isInLoopThread() || callingPendingFunctors_){
wakeup(); // 唤醒loop所在线程
}
}
void EventLoop::handleRead(){
uint64_t one = 1;
ssize_t n = read(wakeupFd_,&one,sizeof one);
if(n != sizeof one){
LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8.",n);
}
}
//用来唤醒loop所在的线程的,向wakeupFd_写一个数据即可,wakeupChannel就发生读事件,当前loop线程就会被唤醒
void EventLoop::wakeup(){
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one,sizeof one);
if(n != sizeof one){
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8. \n", n);
}
}
//EventLoop的方法 调用的就是 Poller的方法
void EventLoop::updateChannel(Channel* channel){
poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel* channel){
poller_->removeChannel(channel);
}
bool EventLoop::hasChannel(Channel* channel){
return poller_->hasChannel(channel);
}
//执行回调
void EventLoop::doPendingFunctors(){
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for(const Functor& functor : functors){
functor(); //执行当前Loop需要执行的回调操作
}
callingPendingFunctors_ = false;
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化