代码拉取完成,页面将自动刷新
///
/// @file EventLoop.cc
/// @author lemon(haohb13@gmail.com)
/// @date 2021-05-11 15:56:30
///
#include "EventLoop.hpp"
#include "TcpConnection.hpp"
#include "Acceptor.hpp"
#include <unistd.h>
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <arpa/inet.h>
#include <sstream>
int g_server_fd1;
int g_server_fd2;
std::map<std::string, int> g_belongs;
std::map<int, bool> g_isAlive;
namespace wd
{
EventLoop::EventLoop(Acceptor & acceptor)
: _efd(createEpollfd())
, _acceptor(acceptor)
, _isLooping(false)
, _evtList(1024)
{
addEpollReadFd(_acceptor.fd());
}
EventLoop::~EventLoop()
{
if(_efd) {
close(_efd);
}
}
void EventLoop::loop()
{
_isLooping = true;
g_server_fd1 = connectToServer("127.0.0.1", 10000);
g_server_fd2 = connectToServer("127.0.0.1", 10001);
//加载每台服务器所销售的商品
loadGoodsFromServer(g_server_fd1);
loadGoodsFromServer(g_server_fd2);
g_isAlive[g_server_fd1] = true;
g_isAlive[g_server_fd2] = true;
//start the threads that accept new connection and handle businesses
m_accept_thread.reset(new std::thread(handleNewConnection, this));
for (int i = 0; i < WORKER_THREAD_NUM; ++i) {
m_work_thread[i].reset(new std::thread(handleMessage, this));
}
while(_isLooping) {
waitEpollfd();
}
}
int EventLoop::connectToServer(const char *ip, const int port) {
int nClientSocket = ::socket(AF_INET, SOCK_STREAM, 0);
if (-1 == nClientSocket) {
std::cout << "socket error" << std::endl;
return -1;
}
sockaddr_in ServerAddress;
memset(&ServerAddress, 0, sizeof(sockaddr_in));
ServerAddress.sin_family = AF_INET;
if (::inet_pton(AF_INET, ip, &ServerAddress.sin_addr) != 1) {
std::cout << "inet_pton error" << std::endl;
::close(nClientSocket);
return -1;
}
ServerAddress.sin_port = htons(port);
if (::connect(nClientSocket, (sockaddr *) &ServerAddress, sizeof(ServerAddress)) == -1) {
std::cout << "connect error" << std::endl;
::close(nClientSocket);
return -1;
}
return nClientSocket;
}
void EventLoop::loadGoodsFromServer(int server_fd) {
std::string s = "getGoodsInfo\n";
int ret = send(server_fd, s.c_str(), s.length(), 0);
if(ret == -1) {
perror("send");
exit(-1);
}
char buf[4096];
ret = recv(server_fd, buf, sizeof(buf), 0);
if(ret == -1) {
perror("recv");
exit(-1);
}
std::stringstream ss(buf);
std::string goods;
while(ss >> goods) {
g_belongs[goods] = server_fd;
}
}
void EventLoop::unloop()
{
_isLooping = false;
}
void EventLoop::setConnectionCallback(TcpConnectionCallback && cb)
{
_onConnectionCb = std::move(cb);
}
void EventLoop::setMessageCallback(TcpConnectionCallback && cb)
{
_onMessageCb = std::move(cb);
}
void EventLoop::setCloseCallback(TcpConnectionCallback && cb)
{
_onCloseCb = std::move(cb);
}
void EventLoop::waitEpollfd()
{
int nready = 0;
do {
nready = ::epoll_wait(_efd, &*_evtList.begin(), _evtList.size(), 5000);
}while(nready == -1 && errno == EINTR);
//When the errno will be EINTR
//The call was interrupted by a signal handler before either (1) any of the requested events occurred or (2) the timeout expired
if(nready == -1) {
perror("epoll_wait");
return;
} else if(nready == 0) {
return;
} else {
//nready > 0
if(nready == _evtList.size()) {
_evtList.resize(2 * nready);
}
for(int idx = 0; idx < nready; ++idx) {
int fd = _evtList[idx].data.fd;
if(fd == _acceptor.fd() &&(_evtList[idx].events & EPOLLIN)) {
std::cout << "EPOLLOUT" << EPOLLOUT << std::endl;
//signal the accept thread
m_accept_cond.notify_one();
} else {
if(_evtList[idx].events & EPOLLIN) {
//put the ready client into a list and then signal the worker thread
// using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
TcpConnectionPtr conn(new TcpConnection(fd)); //smart pointer will auto-release once it out of it's range
conn->setConnectionCallback(_onConnectionCb);
conn->setMessageCallback(_onMessageCb);
conn->setCloseCallback(_onCloseCb);
_conns.insert(std::make_pair(fd, conn));
m_worker_cond.notify_one();
}
}
}
}
}
//新连接到达,如何将连接交给子线程处理?
void EventLoop::handleNewConnection(EventLoop *eventLoop) {
//the accept_thread will block on the accept_cond
while(eventLoop->_isLooping) {
{
std::unique_lock<std::mutex> guard(eventLoop->m_accept_mutex);
eventLoop->m_accept_cond.wait(guard);
if (!eventLoop->_isLooping) {
break;
}
int newfd = eventLoop->_acceptor.accept();
//should I new a TcpConnection for newfd here?
eventLoop->addEpollReadFd(newfd);
}
}
}
void EventLoop::handleMessage(EventLoop *eventLoop)
{
//worker thread will block on m_worker_mutex
while(eventLoop->_isLooping) {
TcpConnectionPtr pConn;
int fd;
{
std::unique_lock<std::mutex> guard(eventLoop->m_worker_mutex);
//to prevent from unreal wakeup, the while judgement must be used.
while(eventLoop->_conns.empty()) {
eventLoop->m_worker_cond.wait(guard);
}
//fetch a connection
fd = eventLoop->_conns.begin()->first; //the problem may be here, test the usage of map;
pConn = eventLoop->_conns.begin()->second; //pConn is a smart pointer
pConn->toString();
//remove the connection
eventLoop->_conns.erase( eventLoop->_conns.begin());
}
//2. 判断该连接是否断开 where to set close
bool isColsed = pConn->isClosed();
//2.1 如果连接断开,执行连接断开时的事件处理器
if(isColsed) {
pConn->handleCloseCallback();
eventLoop->delEpollReadFd(fd);
} else {
//2.2 如果连接没有断开,执行消息到达时的事件处理器
pConn->handleMessageCallback();
}
}
}
int EventLoop::createEpollfd()
{
int fd = epoll_create1(0);
if(fd < 0) {
perror("epoll_create1");
}
return fd;
}
void EventLoop::addEpollReadFd(int fd)
{
//TODO:the following 6 sentence--set the NONBLOCK attribute-- should be included into a function
int oldflag = ::fcntl(fd, F_GETFL, 0);
int newflag = oldflag | O_NONBLOCK;
if (::fcntl(fd, F_SETFL, newflag) == -1) {
std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;
return;
}
//set edge trigger mode
//The edge trigger mode must be set, otherwise the mission will be executed multiple times because when the IO thread fetching data from TCP buffer, the main thread, executing epoll_wait, will return multiple times and dispatch the mission to other threads.
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = fd;
int ret = ::epoll_ctl(_efd, EPOLL_CTL_ADD, fd, &ev);
if(ret < 0) {
perror("epoll_ctl");
}
}
void EventLoop::delEpollReadFd(int fd)
{
struct epoll_event ev;
ev.data.fd = fd;
int ret = ::epoll_ctl(_efd, EPOLL_CTL_DEL, fd, &ev);
if(ret < 0) {
perror("epoll_ctl");
}
}
}//end of namespace wd
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。