打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
RocketMQ顺序消息消费

1. 应用场景

消息队列中消息之间有先后的依赖关系,后一条消息的处理依赖于前一条消息的处理结果,那么比较适用于顺序消息消费

2. 分析

RocketMQ

此图可以看出Rocketmq中一个topic和它的mq之间的关系是一对多的关系,客户端向broker中发送消息是根据topic发送的,而消费方消费时也是按照topic来消费的,那么我们怎么保证消息之间的顺序性呢?首先,需要保证顺序的消息要发送到同一个messagequeue中;其次,一个messagequeue只能被一个消费者消费,这点是由消息队列的分配机制来保证的;最后,一个消费者内部对一个mq的消费要保证是有序的。
我们要做到生产者 - messagequeue - 消费者之间是一对一对一的关系。

3. 具体实现

3.1 消息生产者

Message message = new Message(topic, tags, key, body.getBytes());boolean result;try {    SendResult sendResult = getConfigBean().getMqProducer().send(message,queueSelector,args);    LoggerUtil.log(Level.INFO,"rocket message product result : " + sendResult.toString());    result = true;}catch (MQClientException e) {    throw new MQException("客户端调用状态异常", e);}catch (RemotingException | InterruptedException | MQBrokerException e) {    throw new MQException("远程调用异常", e);}

相比较普通消息的消费,顺序消费在向broker发送消息的时候要指定MessageQueueSelector,此接口RocketMQ提供了三种实现SelectMessageQueueByRandoom,SelectMessageQueueByHash,SelectMessageQueueByMachineRoom,可由调用方自行根据业务实现。指定将消息发送到对应的队列中去;

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {    MessageQueue mq = null;    try {        mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);    }    catch (Throwable e) {        throw new MQClientException("select message queue throwed exception.", e);    }    if (mq != null) {        return this.sendKernelImpl(msg, mq, communicationMode, sendCallback);    }    else {        throw new MQClientException("select message queue return null.", null);    }}

而普通消息的发送,客户端调用方无需指定队列,MQ会轮询topic下面的MessageQueue发送消息,代码如下

/** * 如果lastBrokerName不为null,则寻找与其不同的MessageQueue */public MessageQueue selectOneMessageQueue(final String lastBrokerName) {    if (lastBrokerName != null) {        int index = this.sendWhichQueue.getAndIncrement();        for (int i = 0; i < this.messageQueueList.size(); i++) {            int pos = Math.abs(index++) % this.messageQueueList.size();            MessageQueue mq = this.messageQueueList.get(pos);            if (!mq.getBrokerName().equals(lastBrokerName)) {                return mq;            }        }        return null;    }    else {        int index = this.sendWhichQueue.getAndIncrement();        int pos = Math.abs(index) % this.messageQueueList.size();        return this.messageQueueList.get(pos);    }}

3.2 消息消费者

消息消费方与普通消息消费只有一个地方不同,在consumer中注册MessageListenerOrderly,而不是MessageListenerConcurrently

this.consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {    msgs.stream().forEach(            messageExt -> {                //TODO consume message            });    return ConsumeOrderlyStatus.SUCCESS;});

我们进一步看下RocketMQ是怎么处理的ConsumeMessageOrderlyService在启动的时候,如果是集群模式下会启动一个单线程的定时调度任务,延迟一秒,时间间隔为20秒,执行rebalanceImpl的lockAll()方法。

public void start() {    // 启动定时lock队列服务    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl        .messageModel())) {        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {            @Override            public void run() {                ConsumeMessageOrderlyService.this.lockMQPeriodically();            }        }, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS);    }}

这个方法会锁定相关broker下面的相关的messagequeue,对message对应的processQueue设置是否锁定,这个地方下面会用到

class ConsumeRequest implements Runnable {        private final ProcessQueue processQueue;        private final MessageQueue messageQueue;        public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {            this.processQueue = processQueue;            this.messageQueue = messageQueue;        }        @Override        public void run() {            if (this.processQueue.isDroped()) {                log.warn("run, the message queue not be able to consume, because it's dropped. {}",                    this.messageQueue);                return;            }            // 保证在当前Consumer内,同一队列串行消费            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);            synchronized (objLock) {                // 保证在Consumer集群,同一队列串行消费                if (MessageModel.BROADCASTING                    .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())                        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {                    final long beginTime = System.currentTimeMillis();                    for (boolean continueConsume = true; continueConsume;) {                        if (this.processQueue.isDroped()) {                            log.warn("the message queue not be able to consume, because it's dropped. {}",                                this.messageQueue);                            break;                        }                        if (MessageModel.CLUSTERING                            .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl                                .messageModel())                                && !this.processQueue.isLocked()) {                            log.warn("the message queue not locked, so consume later, {}", this.messageQueue);                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,                                this.processQueue, 10);                            break;                        }                        if (MessageModel.CLUSTERING                            .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl                                .messageModel())                                && this.processQueue.isLockExpired()) {                            log.warn("the message queue lock expired, so consume later, {}",                                this.messageQueue);                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,                                this.processQueue, 10);                            break;                        }                        // 在线程数小于队列数情况下,防止个别队列被饿死                        long interval = System.currentTimeMillis() - beginTime;                        if (interval > MaxTimeConsumeContinuously) {                            // 过10ms后再消费                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue,                                messageQueue, 10);                            break;                        }                        final int consumeBatchSize =                                ConsumeMessageOrderlyService.this.defaultMQPushConsumer                                    .getConsumeMessageBatchMaxSize();                        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);                        if (!msgs.isEmpty()) {                            final ConsumeOrderlyContext context =                                    new ConsumeOrderlyContext(this.messageQueue);                            ConsumeOrderlyStatus status = null;                            // 执行Hook                            ConsumeMessageContext consumeMessageContext = null;                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {                                consumeMessageContext = new ConsumeMessageContext();                                consumeMessageContext                                    .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer                                        .getConsumerGroup());                                consumeMessageContext.setMq(messageQueue);                                consumeMessageContext.setMsgList(msgs);                                consumeMessageContext.setSuccess(false);                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl                                    .executeHookBefore(consumeMessageContext);                            }                            long beginTimestamp = System.currentTimeMillis();                            //处理队列加锁                            try {                                this.processQueue.getLockConsume().lock();                                if (this.processQueue.isDroped()) {                                    log.warn(                                        "consumeMessage, the message queue not be able to consume, because it's dropped. {}",                                        this.messageQueue);                                    break;                                }                                status =                                        messageListener.consumeMessage(Collections.unmodifiableList(msgs),                                            context);                            }                            catch (Throwable e) {                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",//                                    RemotingHelper.exceptionSimpleDesc(e),//                                    ConsumeMessageOrderlyService.this.consumerGroup,//                                    msgs,//                                    messageQueue);                            }                            finally {                                this.processQueue.getLockConsume().unlock();                            }                            // 针对异常返回代码打印日志                            if (null == status //                                    || ConsumeOrderlyStatus.ROLLBACK == status//                                    || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {                                log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",//                                    ConsumeMessageOrderlyService.this.consumerGroup,//                                    msgs,//                                    messageQueue);                            }                            long consumeRT = System.currentTimeMillis() - beginTimestamp;                            // 用户抛出异常或者返回null,都挂起队列                            if (null == status) {                                status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;                            }                            // 执行Hook                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {                                consumeMessageContext.setStatus(status.toString());                                consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status                                        || ConsumeOrderlyStatus.COMMIT == status);                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl                                    .executeHookAfter(consumeMessageContext);                            }                            // 记录统计信息                            ConsumeMessageOrderlyService.this.getConsumerStatsManager().incConsumeRT(                                ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(),                                consumeRT);                            continueConsume =                                    ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status,                                        context, this);                        }                        else {                            continueConsume = false;                        }                    }                }                // 没有拿到当前队列的锁,稍后再消费                else {                    if (this.processQueue.isDroped()) {                        log.warn("the message queue not be able to consume, because it's dropped. {}",                            this.messageQueue);                        return;                    }                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,                        this.processQueue, 100);                }            }        }        public ProcessQueue getProcessQueue() {            return processQueue;        }        public MessageQueue getMessageQueue() {            return messageQueue;        }    }

4. 问题

4.1 降低了吞吐量

4.2 前一条消息消费出现问题,后续的处理流程会阻塞

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
分布式消息队列 RocketMQ 源码分析——Message 顺序发送与消费
RocketMQ Push消息给消费者 解析——图解、源码级解析
探索RocketMQ的重复消费和乱序问题
分布式事务之解决方案(可靠消息最终一致性)
分布式事务之解决方案(最大努力通知)
RocketMQ第七章:手把手教老婆实现-广播消息生产者和消费者
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服