打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
聊聊rocketmq
本文主要研究一下rocketmq-client-go的pushConsumer
pushConsumer
rocketmq-client-go-v2.0.0/consumer/push_consumer.go
type pushConsumer struct { *defaultConsumer queueFlowControlTimes int queueMaxSpanFlowControlTimes int consumeFunc utils.Set submitToConsume func(*processQueue, *primitive.MessageQueue) subscribedTopic map[string]string interceptor primitive.Interceptor queueLock *QueueLock done chan struct{} closeOnce sync.Once}pushConsumer定义了queueFlowControlTimes、queueMaxSpanFlowControlTimes、consumeFunc、submitToConsume、subscribedTopic、interceptor、queueLock、done、closeOnce属性
NewPushConsumer
rocketmq-client-go-v2.0.0/consumer/push_consumer.go
func NewPushConsumer(opts ...Option) (*pushConsumer, error) { defaultOpts := defaultPushConsumerOptions() for _, apply := range opts { apply(&defaultOpts) } srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs) if err != nil { return nil, errors.Wrap(err, "new Namesrv failed.") } if !defaultOpts.Credentials.IsEmpty() { srvs.SetCredentials(defaultOpts.Credentials) } defaultOpts.Namesrv = srvs if defaultOpts.Namespace != "" { defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName } dc := &defaultConsumer{ client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil), consumerGroup: defaultOpts.GroupName, cType: _PushConsume, state: int32(internal.StateCreateJust), prCh: make(chan PullRequest, 4), model: defaultOpts.ConsumerModel, consumeOrderly: defaultOpts.ConsumeOrderly, fromWhere: defaultOpts.FromWhere, allocate: defaultOpts.Strategy, option: defaultOpts, namesrv: srvs, } p := &pushConsumer{ defaultConsumer: dc, subscribedTopic: make(map[string]string, 0), queueLock: newQueueLock(), done: make(chan struct{}, 1), consumeFunc: utils.NewSet(), } dc.mqChanged = p.messageQueueChanged if p.consumeOrderly { p.submitToConsume = p.consumeMessageOrderly } else { p.submitToConsume = p.consumeMessageCurrently } p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...) return p, nil}NewPushConsumer方法实例化defaultConsumer及pushConsumer
Start
rocketmq-client-go-v2.0.0/consumer/push_consumer.go
func (pc *pushConsumer) Start() error { var err error pc.once.Do(func() { rlog.Info("the consumer start beginning", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, "messageModel": pc.model, "unitMode": pc.unitMode, }) atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed)) pc.validate() err = pc.client.RegisterConsumer(pc.consumerGroup, pc) if err != nil { rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) err = ErrCreated return } err = pc.defaultConsumer.start() if err != nil { return } go func() { // todo start clean msg expired for { select { case pr := <-pc.prCh: go func() { pc.pullMessage(&pr) }() case <-pc.done: rlog.Info("push consumer close pullConsumer listener.", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return } } }() go primitive.WithRecover(func() { // initial lock. if !pc.consumeOrderly { return } time.Sleep(1000 * time.Millisecond) pc.lockAll() lockTicker := time.NewTicker(pc.option.RebalanceLockInterval) defer lockTicker.Stop() for { select { case <-lockTicker.C: pc.lockAll() case <-pc.done: rlog.Info("push consumer close tick.", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return } } }) }) if err != nil { return err } pc.client.UpdateTopicRouteInfo() for k := range pc.subscribedTopic { _, exist := pc.topicSubscribeInfoTable.Load(k) if !exist { pc.client.Shutdown() return fmt.Errorf("the topic=%s route info not found, it may not exist", k) } } pc.client.CheckClientInBroker() pc.client.SendHeartbeatToAllBrokerWithLock() pc.client.RebalanceImmediately() return err}Start方法执行pc.client.RegisterConsumer及pc.defaultConsumer.start(),然后异步执行pc.pullMessage(&pr);对于非consumeOrderly则通过time.NewTicker创建lockTicker,执行pc.lockAll();之后执行pc.client.UpdateTopicRouteInfo()、pc.client.CheckClientInBroker()、pc.client.SendHeartbeatToAllBrokerWithLock()及pc.client.RebalanceImmediately()
Shutdown
rocketmq-client-go-v2.0.0/consumer/push_consumer.go
func (pc *pushConsumer) Shutdown() error { var err error pc.closeOnce.Do(func() { close(pc.done) pc.client.UnregisterConsumer(pc.consumerGroup) err = pc.defaultConsumer.shutdown() }) return err}Shutdown方法则执行pc.client.UnregisterConsumer及pc.defaultConsumer.shutdown()
Subscribe
rocketmq-client-go-v2.0.0/consumer/push_consumer.go
func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector, f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error { if atomic.LoadInt32(&pc.state) != int32(internal.StateCreateJust) { return errors.New("subscribe topic only started before") } if pc.option.Namespace != "" { topic = pc.option.Namespace + "%" + topic } data := buildSubscriptionData(topic, selector) pc.subscriptionDataTable.Store(topic, data) pc.subscribedTopic[topic] = "" pc.consumeFunc.Add(&PushConsumerCallback{ f: f, topic: topic, }) return nil}Subscribe方法先通过buildSubscriptionData构建data,之后执行pc.subscriptionDataTable.Store(topic, data)及pc.consumeFunc.Add
pullMessage
rocketmq-client-go-v2.0.0/consumer/push_consumer.go
func (pc *pushConsumer) pullMessage(request *PullRequest) { rlog.Debug("start a new Pull Message task for PullRequest", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), })项目方法
8Y7fXskV9U1444
YXl2A2010/10/04 03:58:43
6kAZ7紫然
k4ii42010-06-17 18:47:46
QJ0s26VNt93116
var sleepTime time.Duration pq := request.pq go primitive.WithRecover(func() { for { select { case <-pc.done: rlog.Info("push consumer close pullMessage.", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return default: pc.submitToConsume(request.pq, request.mq) } } }) for { NEXT: select { case <-pc.done: rlog.Info("push consumer close message handle.", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) return default: } if pq.IsDroppd() { rlog.Debug("the request was dropped, so stop task", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), }) return } if sleepTime > 0 { rlog.Debug(fmt.Sprintf("pull MessageQueue: %d sleep %d ms for mq: %v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq), nil) time.Sleep(sleepTime) } // reset time sleepTime = pc.option.PullInterval pq.lastPullTime = time.Now() err := pc.makeSureStateOK() if err != nil { rlog.Warning("consumer state error", map[string]interface{}{ rlog.LogKeyUnderlayError: err.Error(), }) sleepTime = _PullDelayTimeWhenError goto NEXT } if pc.pause { rlog.Debug(fmt.Sprintf("consumer [%s] of [%s] was paused, execute pull request [%s] later", pc.option.InstanceName, pc.consumerGroup, request.String()), nil) sleepTime = _PullDelayTimeWhenSuspend goto NEXT } cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb) if pq.cachedMsgCount > pc.option.PullThresholdForQueue { if pc.queueFlowControlTimes%1000 == 0 { rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{ "PullThresholdForQueue": pc.option.PullThresholdForQueue, "minOffset": pq.Min(), "maxOffset": pq.Max(), "count": pq.msgCache, "size(MiB)": cachedMessageSizeInMiB, "flowControlTimes": pc.queueFlowControlTimes, rlog.LogKeyPullRequest: request.String(), }) } pc.queueFlowControlTimes++ sleepTime = _PullDelayTimeWhenFlowControl goto NEXT } if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue { if pc.queueFlowControlTimes%1000 == 0 { rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{ "PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue, "minOffset": pq.Min(), "maxOffset": pq.Max(), "count": pq.msgCache, "size(MiB)": cachedMessageSizeInMiB, "flowControlTimes": pc.queueFlowControlTimes, rlog.LogKeyPullRequest: request.String(), }) } pc.queueFlowControlTimes++ sleepTime = _PullDelayTimeWhenFlowControl goto NEXT } if !pc.consumeOrderly { if pq.getMaxSpan() > pc.option.ConsumeConcurrentlyMaxSpan { if pc.queueMaxSpanFlowControlTimes%1000 == 0 { rlog.Warning("the queue's messages span too long, so do flow control", map[string]interface{}{ "ConsumeConcurrentlyMaxSpan": pc.option.ConsumeConcurrentlyMaxSpan, "minOffset": pq.Min(), "maxOffset": pq.Max(), "maxSpan": pq.getMaxSpan(), "flowControlTimes": pc.queueFlowControlTimes, rlog.LogKeyPullRequest: request.String(), }) } sleepTime = _PullDelayTimeWhenFlowControl goto NEXT } } else { if pq.IsLock() { if !request.lockedFirst { offset := pc.computePullFromWhere(request.mq) brokerBusy := offset < request.nextOffset rlog.Info("the first time to pull message, so fix offset from broker, offset maybe changed", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), rlog.LogKeyValueChangedFrom: request.nextOffset, rlog.LogKeyValueChangedTo: offset, "brokerBusy": brokerBusy, }) if brokerBusy { rlog.Info("[NOTIFY_ME] the first time to pull message, but pull request offset larger than "+ "broker consume offset", map[string]interface{}{"offset": offset}) } request.lockedFirst = true request.nextOffset = offset } } else { rlog.Info("pull message later because not locked in broker", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), }) sleepTime = _PullDelayTimeWhenError goto NEXT } } v, exist := pc.subscriptionDataTable.Load(request.mq.Topic) if !exist { rlog.Info("find the consumer's subscription failed", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), }) sleepTime = _PullDelayTimeWhenError goto NEXT } beginTime := time.Now() var ( commitOffsetEnable bool commitOffsetValue int64 subExpression string ) if pc.model == Clustering { commitOffsetValue = pc.storage.read(request.mq, _ReadFromMemory) if commitOffsetValue > 0 { commitOffsetEnable = true } } sd := v.(*internal.SubscriptionData) classFilter := sd.ClassFilterMode if pc.option.PostSubscriptionWhenPull && classFilter { subExpression = sd.SubString } sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter) pullRequest := &internal.PullMessageRequestHeader{ ConsumerGroup: pc.consumerGroup, Topic: request.mq.Topic, QueueId: int32(request.mq.QueueId), QueueOffset: request.nextOffset, MaxMsgNums: pc.option.PullBatchSize, SysFlag: sysFlag, CommitOffset: commitOffsetValue, SubExpression: _SubAll, ExpressionType: string(TAG), SuspendTimeoutMillis: 20 * time.Second, } // //if data.ExpType == string(TAG) { // pullRequest.SubVersion = 0 //} else { // pullRequest.SubVersion = data.SubVersion //} brokerResult := pc.defaultConsumer.tryFindBroker(request.mq) if brokerResult == nil { rlog.Warning("no broker found for mq", map[string]interface{}{ rlog.LogKeyPullRequest: request.mq.String(), }) sleepTime = _PullDelayTimeWhenError goto NEXT } if brokerResult.Slave { pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag) } result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest) if err != nil { rlog.Warning("pull message from broker error", map[string]interface{}{ rlog.LogKeyBroker: brokerResult.BrokerAddr, rlog.LogKeyUnderlayError: err.Error(), }) sleepTime = _PullDelayTimeWhenError goto NEXT } if result.Status == primitive.PullBrokerTimeout { rlog.Warning("pull broker timeout", map[string]interface{}{ rlog.LogKeyBroker: brokerResult.BrokerAddr, }) sleepTime = _PullDelayTimeWhenError goto NEXT } switch result.Status { case primitive.PullFound: rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil) prevRequestOffset := request.nextOffset request.nextOffset = result.NextBeginOffset rt := time.Now().Sub(beginTime) / time.Millisecond increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt)) pc.processPullResult(request.mq, result, sd) msgFounded := result.GetMessageExts() firstMsgOffset := int64(math.MaxInt64) if msgFounded != nil && len(msgFounded) != 0 { firstMsgOffset = msgFounded[0].QueueOffset increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded)) pq.putMessage(msgFounded...) } if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset { rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{ "nextBeginOffset": result.NextBeginOffset, "firstMsgOffset": firstMsgOffset, "prevRequestOffset": prevRequestOffset, }) } case primitive.PullNoNewMsg: rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d no more msg, current offset: %d, next offset: %d", request.mq.Topic, request.mq.QueueId, pullRequest.QueueOffset, result.NextBeginOffset), nil) case primitive.PullNoMsgMatched: request.nextOffset = result.NextBeginOffset pc.correctTagsOffset(request) case primitive.PullOffsetIllegal: rlog.Warning("the pull request offset illegal", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), "result": result.String(), }) request.nextOffset = result.NextBeginOffset pq.WithDropped(true) time.Sleep(10 * time.Second) pc.storage.update(request.mq, request.nextOffset, false) pc.storage.persist([]*primitive.MessageQueue{request.mq}) pc.processQueueTable.Delete(request.mq) rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil) default: rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil) sleepTime = _PullDelayTimeWhenError } }}pullMessage方法会创建internal.PullMessageRequestHeader,之后通过pc.defaultConsumer.tryFindBroker获取brokerResult,之后执行pc.client.PullMessage获取result;对于result.Status为primitive.PullFound执行pc.processPullResult、pq.putMessage提交到processQueue;pc.submitToConsume(request.pq, request.mq)对于p.consumeOrderly执行的是p.consumeMessageOrderly,否则执行的是p.consumeMessageCurrently,他们都会执行pc.consumeInner
consumeInner
rocketmq-client-go-v2.0.0/consumer/push_consumer.go
func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.MessageExt) (ConsumeResult, error) { if len(subMsgs) == 0 { return ConsumeRetryLater, errors.New("msg list empty") } f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic) // fix lost retry message if !exist && strings.HasPrefix(subMsgs[0].Topic, internal.RetryGroupTopicPrefix) { f, exist = pc.consumeFunc.Contains(subMsgs[0].GetProperty(primitive.PropertyRetryTopic)) } if !exist { return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic) } callback, ok := f.(*PushConsumerCallback) if !ok { return ConsumeRetryLater, fmt.Errorf("the consume callback assert failed for topic: %s", subMsgs[0].Topic) } if pc.interceptor == nil { return callback.f(ctx, subMsgs...) } else { var container ConsumeResultHolder err := pc.interceptor(ctx, subMsgs, &container, func(ctx context.Context, req, reply interface{}) error { msgs := req.([]*primitive.MessageExt) r, e := callback.f(ctx, msgs...) realReply := reply.(*ConsumeResultHolder) realReply.ConsumeResult = r msgCtx, _ := primitive.GetConsumerCtx(ctx) msgCtx.Success = realReply.ConsumeResult == ConsumeSuccess if realReply.ConsumeResult == ConsumeSuccess { msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn) } else { msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn) } return e }) return container.ConsumeResult, err }}consumeInner方法会触发f.(*PushConsumerCallback)
小结
pushConsumer是对pull模式的封装,拉到消息之后若consumeOrderly则执行consumeMessageOrderly,否则执行的是consumeMessageCurrently,他们内部调用了consumeInner,会触发PushConsumerCallback回调
doc
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
手把手教姐姐写消息队列
beego文件上传服务器,Beego
万字详文告诉你如何做 Code Review
Diving deep into net/http : A look at http.RoundTripper
【文末送书】B站点赞功能的思考与简单实现
配置wsgi运行环境
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服