代码拉取完成,页面将自动刷新
#ifndef XG_RABITTMQCONNECT_H
#define XG_RABITTMQCONNECT_H
////////////////////////////////////////////////////////////////
#include <string>
#include <functional>
#include <amqp_tcp_socket.h>
#include <amqp_ssl_socket.h>
#define CHECK_FALSE_RETURN(FUNC) if (FUNC){} else return false;
using namespace std;
class RabbitmqConnect
{
protected:
amqp_socket_t* sock;
amqp_rpc_reply_t res;
amqp_connection_state_t conn;
public:
RabbitmqConnect()
{
res.reply_type = AMQP_RESPONSE_NORMAL;
sock = NULL;
}
~RabbitmqConnect()
{
close();
}
bool getResponse()
{
res = amqp_get_rpc_reply(conn);
CHECK_FALSE_RETURN(res.reply_type == AMQP_RESPONSE_NORMAL);
return true;
}
int getErrorCode() const
{
return res.library_error;
}
string getErrorString() const
{
switch (res.reply_type)
{
case AMQP_RESPONSE_NORMAL: return "SUCCESS";
case AMQP_RESPONSE_NONE: return "RESPONSE_NONE";
case AMQP_RESPONSE_LIBRARY_EXCEPTION: return amqp_error_string2(res.library_error);
case AMQP_RESPONSE_SERVER_EXCEPTION:
if (res.reply.id == AMQP_CONNECTION_CLOSE_METHOD || res.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
{
auto msg = ((amqp_connection_close_t*)(res.reply.decoded))->reply_text;
return string((char*)(msg.bytes), (char*)(msg.bytes) + msg.len);
}
return "RESPONSE_SERVER_EXCEPTION";
default: return "UNKNOWN ERROR";
}
}
public:
void close()
{
if (sock)
{
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
sock = NULL;
}
}
bool connect(const string& host, int port, bool ssl = false)
{
close();
conn = amqp_new_connection();
sock = ssl ? amqp_ssl_socket_new(conn) : amqp_tcp_socket_new(conn);
if (sock == NULL)
{
amqp_destroy_connection(conn);
return false;
}
if (amqp_socket_open(sock, host.c_str(), port))
{
amqp_destroy_connection(conn);
sock = NULL;
return false;
}
return true;
}
int send(const string& exchange, const string& qname, const string& data)
{
amqp_basic_properties_t props;
props.delivery_mode = 2;
props.content_type = amqp_cstring_bytes("text/plain");
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
return amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes(qname.c_str()), 0, 0, &props, amqp_cstring_bytes(data.c_str()));
}
bool login(const string& usr, const string& pwd, amqp_sasl_method_enum method = AMQP_SASL_METHOD_PLAIN)
{
res = amqp_login(conn, "/", 0, 128 * 1024, 0, method, usr.c_str(), pwd.c_str());
CHECK_FALSE_RETURN(res.reply_type == AMQP_RESPONSE_NORMAL);
amqp_channel_open(conn, 1);
return getResponse();
}
bool recv(const string& exchange, const string& qname, std::function<void(const char*, int)> func, int timeout = 5, bool aotodelete = false)
{
timeval tv;
amqp_bytes_t mq = amqp_cstring_bytes(qname.c_str());
amqp_queue_declare(conn, 1, mq, 0, 1, 0, aotodelete ? 1 : 0, amqp_empty_table);
CHECK_FALSE_RETURN(getResponse());
amqp_queue_bind(conn, 1, mq, amqp_cstring_bytes(exchange.c_str()), mq, amqp_empty_table);
CHECK_FALSE_RETURN(getResponse());
amqp_basic_consume(conn, 1, mq, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
CHECK_FALSE_RETURN(getResponse());
tv.tv_sec = timeout;
tv.tv_usec = 0;
while (true)
{
amqp_envelope_t data;
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &data, &tv, 0);
CHECK_FALSE_RETURN(res.reply_type == AMQP_RESPONSE_NORMAL);
func((char*)(data.message.body.bytes), data.message.body.len);
amqp_destroy_envelope(&data);
}
return false;
}
};
////////////////////////////////////////////////////////////////
#endif
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。