加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
acceptor.hpp 5.25 KB
一键复制 编辑 原始数据 按行查看 历史
gaoqingfeng 提交于 2024-03-16 09:05 . update
#pragma once
/*
* anet tcp acceptor
*/
#include <map>
#include <cstdlib>
#include <memory>
#include "define.hpp"
#include "connection.hpp"
#include "event_loop.hpp"
#include "log.h"
#include "anet.hpp"
#include "asio/detail/noncopyable.hpp"
namespace anet {
namespace tcp {
const unsigned int gStartServerId = (0x1 << 20);
class CAcceptor final: asio::noncopyable {
public:
explicit CAcceptor(CEventLoop &loop) : m_loop(loop),
m_ioContext(loop.getAcceptorIOContext()),
m_acceptor(m_ioContext), m_sessionFactory(nullptr),
m_codec(nullptr), m_nextId(gStartServerId),m_maxConnectNum(0) {
m_connections.clear();
}
virtual ~CAcceptor() {
this->stop();
}
public:
void setMaxConnectNum(int num) {
if (num < 0) {
num = 0;
}
m_maxConnectNum = num;
}
void setSessionFactory(ISessionFactory *factory) {
assert(factory != nullptr && "session factory is null");
m_sessionFactory = factory;
}
void setPacketParser(ICodec *codec) {
assert(codec != nullptr && "codec is null");
m_codec = codec;
}
// start to listen on addr:port.
bool start(const std::string &addr, unsigned short port) {
if (m_codec == nullptr) {
LogCrit("packet parser is null");
return false;
}
if (m_sessionFactory == nullptr) {
LogCrit("session factory is null");
return false;
}
this->startListen(addr, port);
return this->doAccept();
}
// stop acceptor.
void stop() {
// release connection list.
for (const auto& e : m_connections) {
e.second->close();
}
m_connections.clear();
if (m_acceptor.is_open()) {
asio::error_code ec;
m_acceptor.close(ec);
if (ec) {
LogCrit("acceptor close error:%d", ec.value());
}
}
}
private:
void startListen(const std::string &addr, unsigned short port) {
asio::ip::address asioAddr;
asioAddr.from_string(addr);
asioEndPoint remotePoint(asioAddr, port);
// no blocking
m_acceptor.open(asio::ip::tcp::v4());
m_acceptor.non_blocking(true);
// Set no delay = true, just use Nagle.
m_acceptor.set_option(asio::ip::tcp::no_delay(true));
// set reuse address
m_acceptor.set_option(asioAcceptor::reuse_address(true));
// set keep alive
m_acceptor.set_option(asio::socket_base::keep_alive(true));
// send/recv buffer option setting.
asio::socket_base::send_buffer_size sndBuffOpt(gMaxWriteBuffLength);
m_acceptor.set_option(sndBuffOpt);
asio::socket_base::receive_buffer_size recvBuffOpt(gMaxReadBuffLength);
m_acceptor.set_option(recvBuffOpt);
m_acceptor.bind(remotePoint);
m_acceptor.listen(gMaxListenBlog);
// log out.
LogInfo("listen on %s:%d succ", addr.c_str(), port);
}
// start accept.
bool doAccept() {
auto &ioContext = m_loop.getNextIOContext();
auto conn = std::make_shared<CConnection>(ioContext, m_loop);
m_acceptor.async_accept(conn->getSocket(),
[this, conn](asio::error_code ec) {
// return if it is closed by outside.
if (!m_acceptor.is_open()) {
LogCrit("acceptor is closed now");
return;
}
if (!ec) {
// there is max connection limit, then check it.
if (m_maxConnectNum > 0 && m_connections.size() >= m_maxConnectNum) {
conn->closeSocket();
LogCrit("current connection number is beyond %d", m_maxConnectNum);
this->doAccept();
return;
}
auto session = m_sessionFactory->createSession();
assert(session != nullptr && "session is null");
if (session == nullptr) {
LogCrit("create session null error");
return;
}
// insert to connection map.
this->insertConnection(conn);
// set remove callback function.
conn->setRemoveFunc(std::bind(&CAcceptor::removeConnection, this, std::placeholders::_1));
// start to working.
conn->doStart(session, m_codec);
// do accept again.
this->doAccept();
} else {
LogCrit("acceptor return error:%d, just exit.", ec.value());
}
});
return true;
}
// insert conn
void insertConnection(connSharePtr conn) {
auto objId = m_nextId++;
conn->SetObjId(objId);
m_connections[conn->GetObjId()] = conn;
}
// remove connection from m_connections.
void removeConnection(connSharePtr conn) {
// run in the m_ioContext to remove the conn from m_connections.
asio::post(m_ioContext, [conn,this]() {
auto num = m_connections.erase(conn->GetObjId());
assert(num == 1 && "remove connection error");
});
}
protected:
// all others connection's io_context.
CEventLoop &m_loop;
// acceptor's io_context.
asioIOContext &m_ioContext;
// asio acceptor object.
asioAcceptor m_acceptor;
// the user must keep the life of the session factory and codec.
ISessionFactory *m_sessionFactory{ nullptr };
ICodec *m_codec{ nullptr };
// all connection's map which is only visited in the CAcceptor's m_ioContext thread.
typedef std::map<unsigned int, connSharePtr> ConnectionMap;
ConnectionMap m_connections;
// connection's id generator, this variable is m_connections's thread safe.
unsigned int m_nextId{ gStartServerId };
// max connection limit, where 0 is unlimited.
int m_maxConnectNum{ 0 };
};
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化