加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
http_client.hpp 18.32 KB
一键复制 编辑 原始数据 按行查看 历史
gaoqingfeng 提交于 2023-09-16 22:22 . update

#pragma once
/*
* http client for anet implementation
* anet ��http �ͻ���
* Copyright (C) 2019 - 2023 gavingqf(gavingqf@126.com)
*
* Distributed under the Boost Software License, Version 1.0.
* (See accompanying file LICENSE_1_0.txt or copy at
* http://www.boost.org/LICENSE_1_0.txt)
*/
#include <functional>
#include <utility>
#include "event_loop.hpp"
#include "client.hpp"
#include "log.h"
#include "http_info.hpp"
#include "anet.hpp"
#include "asio/detail/noncopyable.hpp"
#include "semaphore.hpp"
namespace anet {
namespace http {
// http require callback typedef.
typedef std::function<void (const std::string &res,httpStatus status)> httpReqCallback;
// http require struct
struct HttpReq {
// method
std::string method{ "" };
// path
std::string path{ "" };
// version
std::string version{ "" };
// content type
std::string contentType{ "" };
// content
std::string content{ "" };
// connection mode
std::string connectionMode{ "" };
// http status
httpStatus status{ httpStatus::NOT_FIND };
// parse http content.
bool parse(const std::string &content) {
// == post parse == //
auto pos = content.find(gHttpSpaceLine);
assert(pos != gHttpStrFindInvalid && "invalid content");
if (pos == gHttpStrFindInvalid) {
return false;
}
// req content.
this->content = std::move(std::string(&content[pos + gHttpSpaceLineLen]));
// try to parse method and body
std::vector<std::string> vecContents;
strSplits(content, gHttpCRLF, vecContents);
{// parse content type.
auto find = false;
for (size_t i = 0; i < vecContents.size(); i++) {
auto p = vecContents[i].find(gHttpContentTypeLine);
if (gHttpStrFindInvalid != p) {
this->contentType = std::move(std::string(&vecContents[i][p + strlen(gHttpContentTypeLine)]));
find = true;
break;
}
}
if (!find) {
this->contentType = gHttpContentTypeJson;
}
}
{// parse Connection flag.
auto find = false;
for (size_t i = 0; i < vecContents.size(); i++) {
auto p = vecContents[i].find(gHttpConnectionLine);
if (gHttpStrFindInvalid != p) {
this->connectionMode = std::move(std::string(&vecContents[i][p + strlen(gHttpConnectionLine)]));
find = true;
break;
}
}
if (!find) {
this->connectionMode = gHttpConnectionClose;
}
}
// parse the first data.
std::vector<std::string> vec;
strSplits(vecContents[0], " ", vec);
// build http parameter.
assert(vec.size() >= 3 && "parse http info error");
if (vec.size() < 3) {
return false;
}
// get version and status.
this->version = std::move(vec[0]);
this->status = httpStatus(std::atoi(vec[1].c_str()));
return true;
}
};
// http client response define.
typedef std::pair<httpStatus, std::string> response;
// http req vector.
typedef std::vector<std::pair<HttpReq, httpReqCallback>> VecHttpReq;
// http client codec.
class CHttpClientCodec final: public anet::tcp::ICodec {
public:
CHttpClientCodec() = default;
virtual ~CHttpClientCodec() = default;
public:
virtual int parsePacket(const char *data, int len) override {
std::string strData(data, len);
auto pos = strData.find(gHttpSpaceLine);
if (pos == gHttpStrFindInvalid) {
return anet::tcp::retNotComplete;
}
auto lenPos = strData.find(gHttpContentLenLine);
if (lenPos == gHttpStrFindInvalid) {
return anet::tcp::retError; // just close it.
}
// content len flag string.
std::string conLenString(&data[lenPos + gHttpContentLenLineLen]);
auto contentLen = std::atoi(conLenString.c_str());
if (len >= int(pos + gHttpSpaceLineLen + contentLen)) {
return int(pos + gHttpSpaceLineLen + contentLen);
} else {
return anet::tcp::retNotComplete;
}
}
};
// http client session.
class CHttpClientSession final: public anet::tcp::ISession {
public:
explicit CHttpClientSession(const VecHttpReq &req) :
m_vecReq(req), m_index(0), m_close(true) {
}
virtual ~CHttpClientSession() = default;
public:
virtual void onConnected(anet::tcp::connSharePtr conn) override {
m_close = false;
m_conn = conn;
// host name
auto &&host = std::string(conn->getRemoteAddr());
host += ":";
host += std::to_string(conn->getRemotePort());
for (size_t i = 0; i < m_vecReq.size(); i++) {
this->sendHttpReq(m_vecReq[i].first, host);
}
}
virtual void onRecv(const char *data, int len) override {
HttpReq httpReq;
auto ret = httpReq.parse(std::string(data, data + len));
assert(ret && "parse http content error");
if (!ret) {
return;
}
auto &&reqCallback = m_vecReq[m_index].second;
reqCallback(httpReq.content, httpReq.status);
}
virtual void onTerminate() override {
m_close = true;
}
virtual void release() override {
delete this;
}
bool isClose() const {
return m_close;
}
private:
// send http response.
void sendHttpReq(const HttpReq &httpReq,const std::string &host) {
std::string req;
req.reserve(m_sendBuffSize);
if (0 == stricmp(httpReq.method.c_str(), gHttpGetMethod)) {
req = gHttpGetMethod;
req += " ";
req += httpReq.path;
// content
if (!httpReq.content.empty()) {
req += "?";
req += httpReq.content;
}
req += " ";
req += httpReq.version;
} else if (0 == stricmp(httpReq.method.c_str(), gHttpPostMethod)) {
req = gHttpPostMethod;
req += " ";
req += httpReq.path;
req += " ";
req += httpReq.version;
}
req.append(gHttpCRLF, gHttpCRLFLen);
// Content type
if (!httpReq.contentType.empty()) {
req.append(gHttpContentTypeLine, strlen(gHttpContentTypeLine));
req.append(httpReq.contentType);
req.append(";");
req.append(gHttpCharsetUtf8);
req.append(gHttpCRLF, gHttpCRLFLen);
} else {
req.append(gHttpContentTypeLine, strlen(gHttpContentTypeLine));
req.append(gHttpContentTypeUrlEncoded);
req.append(gHttpCRLF, gHttpCRLFLen);
}
// connection close flag.
req.append(gHttpConnectionLine, strlen(gHttpConnectionLine));
req.append(gHttpConnectionClose, strlen(gHttpConnectionClose));
req.append(gHttpCRLF, gHttpCRLFLen);
// !! above http/1.1 must add "host" command.
if (0 == stricmp(httpReq.version.c_str(), gHttpVersion11) ||
0 == stricmp(httpReq.version.c_str(), gHttpVersion20)) {
req.append("Host:" + host);
req.append(gHttpCRLF, gHttpCRLFLen);
}
// for POST
if (0 == stricmp(httpReq.method.c_str(), gHttpPostMethod)) {
// content len
std::string dataLen(gHttpContentLenLine);
dataLen += std::to_string(httpReq.content.size());
req += dataLen;
req.append(gHttpSpaceLine, gHttpSpaceLineLen);
// content.
req += httpReq.content;
} else {
// space line
req.append(gHttpSpaceLine, gHttpSpaceLineLen);
}
this->send(req);
}
protected:
void send(const std::string &req) {
if (isClose()) return;
m_conn->send(req);
}
private:
anet::tcp::connSharePtr m_conn{nullptr};
VecHttpReq m_vecReq;
size_t m_index;
std::atomic_bool m_close{ true };
static const int m_sendBuffSize{ 1024 };
};
//
// http client for asynchronous mode.
//
class CHttpClient final: asio::noncopyable {
public:
explicit CHttpClient(anet::tcp::CEventLoop &loop) : m_client(loop) {}
virtual ~CHttpClient() = default;
public:
bool connect(const std::string &addr, unsigned short port) {
auto sess = new CHttpClientSession(m_vecHttpReq);
m_client.setSession(sess);
m_client.setPacketParser(new CHttpClientCodec());
return m_client.syncConnect(addr, port);
}
// async post.
bool async_post(const std::string &path,
const std::string &req, httpReqCallback &&call
) {
if (!checkPath(path)) {
return false;
}
HttpReq httpReq;
httpReq.content = req;
httpReq.method = gHttpPostMethod;
httpReq.contentType = gHttpContentTypeJson;
httpReq.version = gHttpVersion11;
httpReq.connectionMode = gHttpConnectionClose;
httpReq.path = path;
m_vecHttpReq.push_back({ httpReq,call });
return true;
}
bool async_post(const std::string &path, const std::string &req,
const std::string &contentType, const std::string &httpVersion,
httpReqCallback &&call
) {
if (!checkPath(path)) {
return false;
}
HttpReq httpReq;
httpReq.content = req;
httpReq.method = gHttpPostMethod;
httpReq.contentType = contentType;
httpReq.version = httpVersion;
httpReq.connectionMode = gHttpConnectionClose;
httpReq.path = path;
m_vecHttpReq.push_back({ httpReq,call });
return true;
}
// async get.
bool async_get(const std::string &path,
const std::string &req, httpReqCallback &&call
) {
if (!checkPath(path)) {
return false;
}
HttpReq httpReq;
httpReq.content = req;
httpReq.path = path;
httpReq.method = gHttpGetMethod;
httpReq.contentType = gHttpContentTypeUrlEncoded;
httpReq.version = gHttpVersion11;
httpReq.connectionMode = gHttpConnectionClose;
m_vecHttpReq.push_back({ httpReq,call });
return true;
}
bool async_get(const std::string &path, const std::string &req,
const std::string &contentType, const std::string &httpVersion,
httpReqCallback &&call
) {
if (!checkPath(path)) {
return false;
}
HttpReq httpReq;
httpReq.content = req;
httpReq.path = path;
httpReq.method = gHttpGetMethod;
httpReq.contentType = contentType;
httpReq.version = httpVersion;
httpReq.connectionMode = gHttpConnectionClose;
m_vecHttpReq.push_back({ httpReq,call });
return true;
}
protected:
bool checkPath(const std::string& path) {
for (size_t i = 0; i < m_vecHttpReq.size(); i++) {
if (m_vecHttpReq[i].first.path == path) {
return false;
}
}
return true;
}
private:
// TCP client
anet::tcp::CClient m_client;
// http req vector
VecHttpReq m_vecHttpReq;
};
// http synchronous session factory.
template<typename session>
class CHttpSyncSessionFactory final {
public:
CHttpSyncSessionFactory() : m_nextId(1) {}
~CHttpSyncSessionFactory() = default;
CHttpSyncSessionFactory(const CHttpSyncSessionFactory &rhs) = delete;
CHttpSyncSessionFactory operator=(const CHttpSyncSessionFactory &rhs) = delete;
typedef std::shared_ptr<session> sessionSharePtr;
typedef std::map<unsigned int, sessionSharePtr> Id2SessionPtr;
public:
// return share_ptr<session> object.
sessionSharePtr createSession() {
std::lock_guard<std::mutex> mu(m_lock);
// create session's share pointer.
unsigned int sessionId = m_nextId++;
auto sessSharePtr = std::make_shared<session>();
// set id.
sessSharePtr->setId(sessionId);
sessSharePtr->SetFactory(this);
// save it to map to keep the life.
m_sessions[sessSharePtr->getId()] = sessSharePtr;
return sessSharePtr;
}
// release session.
void ReleaseSession(const session *pSession) {
if (pSession == nullptr) {
return;
}
std::lock_guard<std::mutex> mu(m_lock);
m_sessions.erase(pSession->getId());
}
protected:
mutable std::mutex m_lock;
Id2SessionPtr m_sessions;
unsigned int m_nextId{ 1 };
};
// http synchronous session and its factory.
class CHttpClientSyncSession;
typedef CHttpSyncSessionFactory<CHttpClientSyncSession> httSyncSessionFactory;
//
// http client session for synchronous mode.
//
class CHttpClientSyncSession final : public anet::tcp::ISession,
public std::enable_shared_from_this<CHttpClientSyncSession> {
public:
CHttpClientSyncSession() = default;
virtual ~CHttpClientSyncSession() = default;
public:
// connected callback
virtual void onConnected(anet::tcp::connSharePtr conn) {
m_close = false;
m_conn = conn;
}
// recv data callback: data and its length.
virtual void onRecv(const char *data, int len) {
HttpReq httpReq;
auto ret = httpReq.parse(std::string(data, data + len));
if (ret) {
m_res.first = httpReq.status;
m_res.second = httpReq.content;
} else {
m_res.first = httpStatus::NOT_FIND;
m_res.second = "";
}
m_sem.signal();
m_signaled = true;
}
// connection terminate callback
virtual void onTerminate() {
if (!m_signaled) {
m_sem.signal();
m_signaled = true;
}
m_close = true;
}
// release callback
virtual void release() {
assert(m_factory != nullptr);
m_factory->ReleaseSession(this);
}
void SetFactory(httSyncSessionFactory *factory) {
assert(factory != nullptr && "invalid session factory.");
m_factory = factory;
}
// send http response.
void sendHttpReq(const HttpReq &httpReq, const std::string &host) {
std::string req;
req.reserve(m_sendBuffSize);
if (0 == stricmp(httpReq.method.c_str(), gHttpGetMethod)) {
req = gHttpGetMethod;
req += " ";
req += httpReq.path;
// content
if (!httpReq.content.empty()) {
req += "?";
req += httpReq.content;
}
req += " ";
req += httpReq.version;
} else if (0 == stricmp(httpReq.method.c_str(), gHttpPostMethod)) {
req = gHttpPostMethod;
req += " ";
req += httpReq.path;
req += " ";
req += httpReq.version;
}
req.append(gHttpCRLF, gHttpCRLFLen);
// Content type
if (!httpReq.contentType.empty()) {
req.append(gHttpContentTypeLine, strlen(gHttpContentTypeLine));
req.append(httpReq.contentType);
req.append(gHttpCRLF, gHttpCRLFLen);
} else {
req.append(gHttpContentTypeLine, strlen(gHttpContentTypeLine));
req.append(gHttpContentTypeUrlEncoded);
req.append(gHttpCRLF, gHttpCRLFLen);
}
// connection close flag.
req.append(gHttpConnectionLine, strlen(gHttpConnectionLine));
req.append(gHttpConnectionClose, strlen(gHttpConnectionClose));
req.append(gHttpCRLF, gHttpCRLFLen);
// !! above http/1.1 must add host command.
if (0 == stricmp(httpReq.version.c_str(), gHttpVersion11) ||
0 == stricmp(httpReq.version.c_str(), gHttpVersion20)) {
req.append("Host:" + host);
req.append(gHttpCRLF, gHttpCRLFLen);
}
// for POST
if (0 == stricmp(httpReq.method.c_str(), gHttpPostMethod)) {
// content len
std::string dataLen(gHttpContentLenLine);
dataLen += std::to_string(httpReq.content.size());
req += dataLen;
// space line
req.append(gHttpSpaceLine, gHttpSpaceLineLen);
// content.
req += httpReq.content;
} else {
// space line
req.append(gHttpSpaceLine, gHttpSpaceLineLen);
}
this->send(req);
}
// tcp connection.
auto getConn() {
return m_conn;
}
void close() {
if (m_close) return;
m_conn->close();
}
// wait for result.
response waitFor() {
auto timeOut = std::chrono::milliseconds{3000};
if (!m_sem.wait_for(timeOut)) {
return { httpStatus::NOT_FIND,"" };
} else {
return m_res;
}
}
void setId(unsigned int id) {
m_id = id;
}
unsigned int getId() const {
return m_id;
}
protected:
void send(const std::string &req) {
if (m_close) return;
m_conn->send(req);
}
private:
anet::tcp::connSharePtr m_conn{ nullptr };
std::atomic_bool m_close{ true };
std::atomic_bool m_signaled{ false };
// session factory.
httSyncSessionFactory *m_factory{ nullptr };
// sync semaphore.
anet::utils::CSemaphore m_sem{0};
// session id.
unsigned int m_id{ 0 };
// response pair.
response m_res{ httpStatus::NOT_FIND, "" };
static const int m_sendBuffSize{ 1024 };
};
// http client for synchronous mode.
// this class must set a httSyncSessionFactory to manager session.
class CHttpSyncClient final : asio::noncopyable {
public:
explicit CHttpSyncClient(httSyncSessionFactory *factory) :
m_sessionFactory(factory) {
}
virtual ~CHttpSyncClient() = default;
public:
// return post {Status, content} synchronously.
response post(const std::string &addr, const std::string &path, const std::string &req) {
// connect to remote server.
if (!syncConnect(addr)) {
return { httpStatus::NOT_FIND,"" };
}
// send requirement.
this->sendReq(path, req, gHttpPostMethod);
// wait for response return.
return m_session->waitFor();
}
// return get {Status, content} synchronously.
response get(const std::string &addr, const std::string &path, const std::string &req) {
// connect to remote server.
if (!syncConnect(addr)) {
return { httpStatus::NOT_FIND,"" };
}
// send requirement.
this->sendReq(path, req, gHttpGetMethod);
// wait for result.
return m_session->waitFor();
}
void close() {
if (m_session != nullptr) {
m_session->close();
m_session = nullptr;
}
}
private:
void sendReq(const std::string &path, const std::string &req, const char *method) {
HttpReq httpReq;
httpReq.content = req;
httpReq.method = method;
httpReq.contentType = gHttpContentTypeUrlEncoded;
httpReq.version = gHttpVersion11;
httpReq.connectionMode = gHttpConnectionClose;
httpReq.path = path;
// host command.
auto &&host = std::string(m_session->getConn()->getRemoteAddr());
host += ":";
host += std::to_string(m_session->getConn()->getRemotePort());
m_session->sendHttpReq(httpReq, host);
}
bool syncConnect(const std::string &addr) {
this->close(); // try to close before connection.
std::vector<std::string> vec;
strSplits(addr, ":", vec);
if (vec.size() < 2) {
return false;
}
// initialize client's codec and session.
m_client.setPacketParser(&m_codec);
m_session = m_sessionFactory->createSession();
m_client.setSession(m_session.get());
// try to connect remote server.
auto port = (unsigned short)std::atoi(vec[1].c_str());
return m_client.syncConnect(vec[0], port);
}
private:
// TCP client
anet::tcp::CClient m_client{ m_loop };
// Event loop.
anet::tcp::CEventLoop m_loop{ 1 };
// Client codec.
CHttpClientCodec m_codec;
// client session.
using httpSyncSessionPtr = std::shared_ptr<CHttpClientSyncSession>;
httpSyncSessionPtr m_session{ nullptr };
// session factory.
httSyncSessionFactory *m_sessionFactory{ nullptr };
};
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化