当前仓库属于关闭状态,部分功能使用受限,详情请查阅 仓库状态说明
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
store.go 3.54 KB
一键复制 编辑 原始数据 按行查看 历史
Gogs 提交于 2016-07-19 13:29 . fix go build -race warings
package rocketmq
import (
"errors"
"github.com/golang/glog"
"sync"
"sync/atomic"
)
const (
MEMORY_FIRST_THEN_STORE = 0
READ_FROM_MEMORY = 1
READ_FROM_STORE = 2
)
type OffsetStore interface {
//load() error
updateOffset(mq *MessageQueue, offset int64, increaseOnly bool)
readOffset(mq *MessageQueue, flag int) int64
//persistAll(mqs []MessageQueue)
//persist(mq MessageQueue)
//removeOffset(mq MessageQueue)
//cloneOffsetTable(topic string) map[MessageQueue]int64
}
type RemoteOffsetStore struct {
groupName string
mqClient *MqClient
offsetTable map[MessageQueue]int64
offsetTableLock sync.RWMutex
}
func (self *RemoteOffsetStore) readOffset(mq *MessageQueue, readType int) int64 {
switch readType {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY:
self.offsetTableLock.RLock()
offset, ok := self.offsetTable[*mq]
self.offsetTableLock.RUnlock()
if ok {
return offset
} else if readType == READ_FROM_MEMORY {
return -1
}
case READ_FROM_STORE:
offset, err := self.fetchConsumeOffsetFromBroker(mq)
if err != nil {
glog.Error(err)
return -1
}
self.updateOffset(mq, offset, false)
return offset
}
return -1
}
func (self *RemoteOffsetStore) fetchConsumeOffsetFromBroker(mq *MessageQueue) (int64, error) {
brokerAddr, _, found := self.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false)
if !found {
if err := self.mqClient.updateTopicRouteInfoFromNameServerByTopic(mq.topic); err != nil {
return 0, err
}
brokerAddr, _, found = self.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false)
}
if found {
requestHeader := &QueryConsumerOffsetRequestHeader{}
requestHeader.Topic = mq.topic
requestHeader.QueueId = mq.queueId
requestHeader.ConsumerGroup = self.groupName
return self.mqClient.queryConsumerOffset(brokerAddr, requestHeader, 3000)
}
return 0, errors.New("fetch consumer offset error")
}
func (self *RemoteOffsetStore) persist(mq *MessageQueue) {
offset, ok := self.offsetTable[*mq]
if ok {
err := self.updateConsumeOffsetToBroker(mq, offset)
if err != nil {
glog.Error(err)
}
}
}
type UpdateConsumerOffsetRequestHeader struct {
consumerGroup string
topic string
queueId int32
commitOffset int64
}
func (self *RemoteOffsetStore) updateConsumeOffsetToBroker(mq *MessageQueue, offset int64) error {
addr, found, _ := self.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false)
if !found {
if err := self.mqClient.updateTopicRouteInfoFromNameServerByTopic(mq.topic); err != nil {
return err
}
addr, found, _ = self.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false)
}
if found {
requestHeader := &UpdateConsumerOffsetRequestHeader{
consumerGroup: self.groupName,
topic: mq.topic,
queueId: mq.queueId,
commitOffset: offset,
}
self.mqClient.updateConsumerOffsetOneway(addr, requestHeader, 5*1000)
return nil
}
return errors.New("not found broker")
}
func (self *RemoteOffsetStore) updateOffset(mq *MessageQueue, offset int64, increaseOnly bool) {
if mq != nil {
self.offsetTableLock.RLock()
offsetOld, ok := self.offsetTable[*mq]
self.offsetTableLock.RUnlock()
if !ok {
self.offsetTableLock.Lock()
self.offsetTable[*mq] = offset
self.offsetTableLock.Unlock()
} else {
if increaseOnly {
atomic.AddInt64(&offsetOld, offset)
self.offsetTableLock.Lock()
self.offsetTable[*mq] = offsetOld
self.offsetTableLock.Unlock()
} else {
self.offsetTableLock.Lock()
self.offsetTable[*mq] = offset
self.offsetTableLock.Unlock()
}
}
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化