消息队列中消息之间有先后的依赖关系,后一条消息的处理依赖于前一条消息的处理结果,那么比较适用于顺序消息消费
此图可以看出Rocketmq中一个topic和它的mq之间的关系是一对多的关系,客户端向broker中发送消息是根据topic发送的,而消费方消费时也是按照topic来消费的,那么我们怎么保证消息之间的顺序性呢?首先,需要保证顺序的消息要发送到同一个messagequeue中;其次,一个messagequeue只能被一个消费者消费,这点是由消息队列的分配机制来保证的;最后,一个消费者内部对一个mq的消费要保证是有序的。
我们要做到生产者 - messagequeue - 消费者之间是一对一对一的关系。
Message message = new Message(topic, tags, key, body.getBytes());boolean result;try { SendResult sendResult = getConfigBean().getMqProducer().send(message,queueSelector,args); LoggerUtil.log(Level.INFO,"rocket message product result : " + sendResult.toString()); result = true;}catch (MQClientException e) { throw new MQException("客户端调用状态异常", e);}catch (RemotingException | InterruptedException | MQBrokerException e) { throw new MQException("远程调用异常", e);}
相比较普通消息的消费,顺序消费在向broker发送消息的时候要指定MessageQueueSelector,此接口RocketMQ提供了三种实现SelectMessageQueueByRandoom,SelectMessageQueueByHash,SelectMessageQueueByMachineRoom,可由调用方自行根据业务实现。指定将消息发送到对应的队列中去;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; try { mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); } catch (Throwable e) { throw new MQClientException("select message queue throwed exception.", e); } if (mq != null) { return this.sendKernelImpl(msg, mq, communicationMode, sendCallback); } else { throw new MQClientException("select message queue return null.", null); }}
而普通消息的发送,客户端调用方无需指定队列,MQ会轮询topic下面的MessageQueue发送消息,代码如下
/** * 如果lastBrokerName不为null,则寻找与其不同的MessageQueue */public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName != null) { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return null; } else { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); return this.messageQueueList.get(pos); }}
消息消费方与普通消息消费只有一个地方不同,在consumer中注册MessageListenerOrderly,而不是MessageListenerConcurrently
this.consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { msgs.stream().forEach( messageExt -> { //TODO consume message }); return ConsumeOrderlyStatus.SUCCESS;});
我们进一步看下RocketMQ是怎么处理的ConsumeMessageOrderlyService在启动的时候,如果是集群模式下会启动一个单线程的定时调度任务,延迟一秒,时间间隔为20秒,执行rebalanceImpl的lockAll()方法。
public void start() { // 启动定时lock队列服务 if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl .messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } }, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS); }}
这个方法会锁定相关broker下面的相关的messagequeue,对message对应的processQueue设置是否锁定,这个地方下面会用到
class ConsumeRequest implements Runnable { private final ProcessQueue processQueue; private final MessageQueue messageQueue; public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) { this.processQueue = processQueue; this.messageQueue = messageQueue; } @Override public void run() { if (this.processQueue.isDroped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } // 保证在当前Consumer内,同一队列串行消费 final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { // 保证在Consumer集群,同一队列串行消费 if (MessageModel.BROADCASTING .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true; continueConsume;) { if (this.processQueue.isDroped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } if (MessageModel.CLUSTERING .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl .messageModel()) && !this.processQueue.isLocked()) { log.warn("the message queue not locked, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } if (MessageModel.CLUSTERING .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl .messageModel()) && this.processQueue.isLockExpired()) { log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } // 在线程数小于队列数情况下,防止个别队列被饿死 long interval = System.currentTimeMillis() - beginTime; if (interval > MaxTimeConsumeContinuously) { // 过10ms后再消费 ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; } final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer .getConsumeMessageBatchMaxSize(); List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); if (!msgs.isEmpty()) { final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); ConsumeOrderlyStatus status = null; // 执行Hook ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer .getConsumerGroup()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl .executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); //处理队列加锁 try { this.processQueue.getLockConsume().lock(); if (this.processQueue.isDroped()) { log.warn( "consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",// RemotingHelper.exceptionSimpleDesc(e),// ConsumeMessageOrderlyService.this.consumerGroup,// msgs,// messageQueue); } finally { this.processQueue.getLockConsume().unlock(); } // 针对异常返回代码打印日志 if (null == status // || ConsumeOrderlyStatus.ROLLBACK == status// || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",// ConsumeMessageOrderlyService.this.consumerGroup,// msgs,// messageQueue); } long consumeRT = System.currentTimeMillis() - beginTimestamp; // 用户抛出异常或者返回null,都挂起队列 if (null == status) { status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } // 执行Hook if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl .executeHookAfter(consumeMessageContext); } // 记录统计信息 ConsumeMessageOrderlyService.this.getConsumerStatsManager().incConsumeRT( ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); } else { continueConsume = false; } } } // 没有拿到当前队列的锁,稍后再消费 else { if (this.processQueue.isDroped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } } public ProcessQueue getProcessQueue() { return processQueue; } public MessageQueue getMessageQueue() { return messageQueue; } }
联系客服