代码拉取完成,页面将自动刷新
同步操作将从 cloudzone/cloudmq-go-client 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
package rocketmq
import (
"errors"
"github.com/golang/glog"
"sync"
"sync/atomic"
)
const (
MEMORY_FIRST_THEN_STORE = 0
READ_FROM_MEMORY = 1
READ_FROM_STORE = 2
)
type OffsetStore interface {
//load() error
updateOffset(mq *MessageQueue, offset int64, increaseOnly bool)
readOffset(mq *MessageQueue, flag int) int64
//persistAll(mqs []MessageQueue)
//persist(mq MessageQueue)
//removeOffset(mq MessageQueue)
//cloneOffsetTable(topic string) map[MessageQueue]int64
}
type RemoteOffsetStore struct {
groupName string
mqClient *MqClient
offsetTable map[MessageQueue]int64
offsetTableLock sync.RWMutex
}
func (self *RemoteOffsetStore) readOffset(mq *MessageQueue, readType int) int64 {
switch readType {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY:
self.offsetTableLock.RLock()
offset, ok := self.offsetTable[*mq]
self.offsetTableLock.RUnlock()
if ok {
return offset
} else if readType == READ_FROM_MEMORY {
return -1
}
case READ_FROM_STORE:
offset, err := self.fetchConsumeOffsetFromBroker(mq)
if err != nil {
glog.Error(err)
return -1
}
self.updateOffset(mq, offset, false)
return offset
}
return -1
}
func (self *RemoteOffsetStore) fetchConsumeOffsetFromBroker(mq *MessageQueue) (int64, error) {
brokerAddr, _, found := self.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false)
if !found {
if err := self.mqClient.updateTopicRouteInfoFromNameServerByTopic(mq.topic); err != nil {
return 0, err
}
brokerAddr, _, found = self.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false)
}
if found {
requestHeader := &QueryConsumerOffsetRequestHeader{}
requestHeader.Topic = mq.topic
requestHeader.QueueId = mq.queueId
requestHeader.ConsumerGroup = self.groupName
return self.mqClient.queryConsumerOffset(brokerAddr, requestHeader, 3000)
}
return 0, errors.New("fetch consumer offset error")
}
func (self *RemoteOffsetStore) persist(mq *MessageQueue) {
offset, ok := self.offsetTable[*mq]
if ok {
err := self.updateConsumeOffsetToBroker(mq, offset)
if err != nil {
glog.Error(err)
}
}
}
type UpdateConsumerOffsetRequestHeader struct {
consumerGroup string
topic string
queueId int32
commitOffset int64
}
func (self *RemoteOffsetStore) updateConsumeOffsetToBroker(mq *MessageQueue, offset int64) error {
addr, found, _ := self.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false)
if !found {
if err := self.mqClient.updateTopicRouteInfoFromNameServerByTopic(mq.topic); err != nil {
return err
}
addr, found, _ = self.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false)
}
if found {
requestHeader := &UpdateConsumerOffsetRequestHeader{
consumerGroup: self.groupName,
topic: mq.topic,
queueId: mq.queueId,
commitOffset: offset,
}
self.mqClient.updateConsumerOffsetOneway(addr, requestHeader, 5*1000)
return nil
}
return errors.New("not found broker")
}
func (self *RemoteOffsetStore) updateOffset(mq *MessageQueue, offset int64, increaseOnly bool) {
if mq != nil {
self.offsetTableLock.RLock()
offsetOld, ok := self.offsetTable[*mq]
self.offsetTableLock.RUnlock()
if !ok {
self.offsetTableLock.Lock()
self.offsetTable[*mq] = offset
self.offsetTableLock.Unlock()
} else {
if increaseOnly {
atomic.AddInt64(&offsetOld, offset)
self.offsetTableLock.Lock()
self.offsetTable[*mq] = offsetOld
self.offsetTableLock.Unlock()
} else {
self.offsetTableLock.Lock()
self.offsetTable[*mq] = offset
self.offsetTableLock.Unlock()
}
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。