加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
rabbitmq.go 5.74 KB
一键复制 编辑 原始数据 按行查看 历史
package rabbitmq
import (
"fmt"
"github.com/assembla/cony"
"github.com/streadway/amqp"
"strings"
)
// 封装基于cony 的易用包
//MsgHandle 注册函数
type MsgHandle func(amqp.Delivery)
//SimpleConsumer 消费者
type SimpleConsumer struct {
cli *cony.Client
qe Queue
cns *cony.Consumer
}
//Consumer 消费者
type Consumer struct {
cli *cony.Client
qe QueueExchange
cns *cony.Consumer
}
//Publisher 生产者
type Publisher struct {
cli *cony.Client
qe QueueExchange
pbl *cony.Publisher
}
// Queue hold definition of AMQP queue
type Queue struct {
Name string
Durable bool
AutoDelete bool
Exclusive bool
Args amqp.Table
}
//Declarer 定义
type Declarer struct {
que *cony.Queue // 队列名称
exc cony.Exchange // 交换机名称
bnd cony.Binding //
}
//QueueExchange 定义队列交换机对象
type QueueExchange struct {
QueueName string // 队列名称
RoutingKey string // key值
ExchangeName string // 交换机名称
ExchangeType string // 交换机类型
//Extra 额外的参数
QueueDurable bool
QueueAutoDelete bool
QueueExclusive bool
QueueArgs map[string]interface{}
ExchangeDurable bool
ExchangeAutoDelete bool
ExchangeArgs map[string]interface{}
BindArgs map[string]interface{}
}
//Extra 额外的参数
type Extra struct {
QueueDurable bool
QueueAutoDelete bool
QueueExclusive bool
QueueArgs map[string]interface{}
ExchangeDurable bool
ExchangeAutoDelete bool
ExchangeArgs map[string]interface{}
BindArgs map[string]interface{}
}
//RabbitMQ mq对象
type RabbitMQ struct {
cli *cony.Client
addr string // 地址
}
//New 创建rmq
func New(addr string) *RabbitMQ {
// Construct new client with the flag url
// and default backoff policy
cli := cony.NewClient(
cony.URL(addr),
cony.Backoff(cony.DefaultBackoff),
)
// Client loop sends out declarations(exchanges, queues, bindings
// etc) to the AMQP server. It also handles reconnecting.
go func() {
for cli.Loop() {
select {
case err := <-cli.Errors():
fmt.Printf("Client error: %v\n", err)
case blocked := <-cli.Blocking():
fmt.Printf("Client is blocked %v\n", blocked)
}
}
}()
rmq := &RabbitMQ{
cli: cli,
addr: addr,
}
return rmq
}
//BindQueue 绑定队列(创建队列)
func (r *RabbitMQ) BindQueue(exchangeName, exchangeType, routingKey, queueName string) {
qe := QueueExchange{
QueueName: queueName,
RoutingKey: routingKey,
ExchangeName: exchangeName,
ExchangeType: exchangeType,
}
de := r.Declare(qe)
if qe.QueueName == "" {
r.cli.Declare([]cony.Declaration{
cony.DeclareExchange(de.exc),
})
} else {
r.cli.Declare([]cony.Declaration{
cony.DeclareQueue(de.que),
cony.DeclareExchange(de.exc),
cony.DeclareBinding(de.bnd),
})
}
}
//Declare 定义队列或交换机
func (r *RabbitMQ) Declare(qe QueueExchange) Declarer {
// Declarations
// The queue name will be supplied by the AMQP server
que := &cony.Queue{
Name: strings.Trim(qe.QueueName, " "),
Durable: qe.QueueDurable,
AutoDelete: qe.QueueAutoDelete,
Exclusive: qe.QueueExclusive,
Args: qe.QueueArgs,
}
exc := cony.Exchange{
Name: strings.Trim(qe.ExchangeName, " "),
Kind: qe.ExchangeType,
Durable: qe.ExchangeDurable,
AutoDelete: qe.ExchangeAutoDelete,
Args: qe.ExchangeArgs,
}
bnd := cony.Binding{
Queue: que,
Exchange: exc,
Key: strings.Trim(qe.RoutingKey, " "),
Args: qe.BindArgs,
}
return Declarer{
que: que,
exc: exc,
bnd: bnd,
}
}
//Bind 绑定
func (r *RabbitMQ) Bind(de Declarer) {
slice := make([]cony.Declaration, 0)
// 根据名字决定是否定义相关对象
if de.que.Name != "" {
slice = append(slice, cony.DeclareQueue(de.que))
}
if de.exc.Name != "" {
slice = append(slice, cony.DeclareExchange(de.exc))
}
if de.bnd.Key != "" {
slice = append(slice, cony.DeclareBinding(de.bnd))
}
if len(slice) > 0 {
r.cli.Declare(slice)
}
}
//NewConsumer 创建
func (r *RabbitMQ) NewConsumer(qe QueueExchange) *Consumer {
// Construct new client with the flag url
// and default backoff policy
cli := r.cli
// Declarations
de := r.Declare(qe)
r.Bind(de)
// Declare and register a consumer
cns := cony.NewConsumer(
de.que,
cony.Qos(1),
)
cli.Consume(cns)
return &Consumer{
cli: cli,
qe: qe,
cns: cns,
}
}
//NewSimpleConsumer 创建
func (r *RabbitMQ) NewSimpleConsumer(queue Queue) *SimpleConsumer {
cli := r.cli
q := &cony.Queue{
Name: queue.Name, // autogenerated queue name
Durable: queue.Durable,
AutoDelete: queue.AutoDelete,
Exclusive: queue.Exclusive,
Args: queue.Args,
}
// Declare and register a consumer
cns := cony.NewConsumer(
q,
cony.Qos(1),
)
cli.Consume(cns)
return &SimpleConsumer{
cli: cli,
qe: queue,
cns: cns,
}
}
//Receive 消费消息
func (c *Consumer) Receive(h MsgHandle) {
for {
msg := <-c.cns.Deliveries()
h(msg)
}
}
//Receive 消费消息
func (c *SimpleConsumer) Receive(h MsgHandle) {
for {
msg := <-c.cns.Deliveries()
h(msg)
}
}
//NewPublisher 创建生产者
func (r *RabbitMQ) NewPublisher(qe QueueExchange) *Publisher {
// Construct new client with the flag url
// and default backoff policy
cli := r.cli
// Declarations
de := r.Declare(qe)
r.Bind(de)
pbl := cony.NewPublisher(de.exc.Name, qe.RoutingKey)
cli.Publish(pbl)
return &Publisher{
cli: cli,
qe: qe,
pbl: pbl,
}
}
//Pub 发布消息
func (p *Publisher) Pub(data []byte) error {
var err error
// 发送任务消息
err = p.pbl.Publish(amqp.Publishing{
ContentType: "text/plain",
Body: data,
})
return err
}
//PubRaw 发布消息
func (p *Publisher) PubRaw(raw amqp.Publishing) error {
// 发送任务消息
return p.pbl.Publish(raw)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化