代码拉取完成,页面将自动刷新
同步操作将从 gavingqf/anet 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
#pragma once
/*
* anet tcp acceptor
*/
#include <map>
#include <cstdlib>
#include <memory>
#include "define.hpp"
#include "connection.hpp"
#include "event_loop.hpp"
#include "log.h"
#include "anet.hpp"
#include "asio/detail/noncopyable.hpp"
namespace anet {
namespace tcp {
const unsigned int gStartServerId = (0x1 << 20);
class CAcceptor final: asio::noncopyable {
public:
explicit CAcceptor(CEventLoop &loop) : m_loop(loop),
m_ioContext(loop.getAcceptorIOContext()),
m_acceptor(m_ioContext), m_sessionFactory(nullptr),
m_codec(nullptr), m_nextId(gStartServerId),m_maxConnectNum(0) {
m_connections.clear();
}
virtual ~CAcceptor() {
this->stop();
}
public:
void setMaxConnectNum(int num) {
if (num < 0) {
num = 0;
}
m_maxConnectNum = num;
}
void setSessionFactory(ISessionFactory *factory) {
assert(factory != nullptr && "session factory is null");
m_sessionFactory = factory;
}
void setPacketParser(ICodec *codec) {
assert(codec != nullptr && "codec is null");
m_codec = codec;
}
// start to listen on addr:port.
bool start(const std::string &addr, unsigned short port) {
if (m_codec == nullptr) {
LogCrit("packet parser is null");
return false;
}
if (m_sessionFactory == nullptr) {
LogCrit("session factory is null");
return false;
}
this->startListen(addr, port);
return this->doAccept();
}
// stop acceptor.
void stop() {
// release connection list.
for (const auto& e : m_connections) {
e.second->close();
}
m_connections.clear();
if (m_acceptor.is_open()) {
asio::error_code ec;
m_acceptor.close(ec);
if (ec) {
LogCrit("acceptor close error:%d", ec.value());
}
}
}
private:
void startListen(const std::string &addr, unsigned short port) {
asio::ip::address asioAddr;
asioAddr.from_string(addr);
asioEndPoint remotePoint(asioAddr, port);
// no blocking
m_acceptor.open(asio::ip::tcp::v4());
m_acceptor.non_blocking(true);
// Set no delay = true, just use Nagle.
m_acceptor.set_option(asio::ip::tcp::no_delay(true));
// set reuse address
m_acceptor.set_option(asioAcceptor::reuse_address(true));
// set keep alive
m_acceptor.set_option(asio::socket_base::keep_alive(true));
// send/recv buffer option setting.
asio::socket_base::send_buffer_size sndBuffOpt(gMaxWriteBuffLength);
m_acceptor.set_option(sndBuffOpt);
asio::socket_base::receive_buffer_size recvBuffOpt(gMaxReadBuffLength);
m_acceptor.set_option(recvBuffOpt);
m_acceptor.bind(remotePoint);
m_acceptor.listen(gMaxListenBlog);
// log out.
LogInfo("listen on %s:%d succ", addr.c_str(), port);
}
// start accept.
bool doAccept() {
auto &ioContext = m_loop.getNextIOContext();
auto conn = std::make_shared<CConnection>(ioContext, m_loop);
m_acceptor.async_accept(conn->getSocket(),
[this, conn](asio::error_code ec) {
// return if it is closed by outside.
if (!m_acceptor.is_open()) {
LogCrit("acceptor is closed now");
return;
}
if (!ec) {
// there is max connection limit, then check it.
if (m_maxConnectNum > 0 && m_connections.size() >= m_maxConnectNum) {
conn->closeSocket();
LogCrit("current connection number is beyond %d", m_maxConnectNum);
this->doAccept();
return;
}
auto session = m_sessionFactory->createSession();
assert(session != nullptr && "session is null");
if (session == nullptr) {
LogCrit("create session null error");
return;
}
// insert to connection map.
this->insertConnection(conn);
// set remove callback function.
conn->setRemoveFunc(std::bind(&CAcceptor::removeConnection, this, std::placeholders::_1));
// start to working.
conn->doStart(session, m_codec);
// do accept again.
this->doAccept();
} else {
LogCrit("acceptor return error:%d, just exit.", ec.value());
}
});
return true;
}
// insert conn
void insertConnection(connSharePtr conn) {
auto objId = m_nextId++;
conn->SetObjId(objId);
m_connections[conn->GetObjId()] = conn;
}
// remove connection from m_connections.
void removeConnection(connSharePtr conn) {
// run in the m_ioContext to remove the conn from m_connections.
asio::post(m_ioContext, [conn,this]() {
auto num = m_connections.erase(conn->GetObjId());
assert(num == 1 && "remove connection error");
});
}
protected:
// all others connection's io_context.
CEventLoop &m_loop;
// acceptor's io_context.
asioIOContext &m_ioContext;
// asio acceptor object.
asioAcceptor m_acceptor;
// the user must keep the life of the session factory and codec.
ISessionFactory *m_sessionFactory{ nullptr };
ICodec *m_codec{ nullptr };
// all connection's map which is only visited in the CAcceptor's m_ioContext thread.
typedef std::map<unsigned int, connSharePtr> ConnectionMap;
ConnectionMap m_connections;
// connection's id generator, this variable is m_connections's thread safe.
unsigned int m_nextId{ gStartServerId };
// max connection limit, where 0 is unlimited.
int m_maxConnectNum{ 0 };
};
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。