打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
聊聊rocketmq的LitePullConsumer

本文主要研究一下rocketmq的LitePullConsumer

LitePullConsumer

rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java

public interface LitePullConsumer {    void start() throws MQClientException;    void shutdown();    void subscribe(final String topic, final String subExpression) throws MQClientException;    void subscribe(final String topic, final MessageSelector selector) throws MQClientException;    void unsubscribe(final String topic);    void assign(Collection<MessageQueue> messageQueues);    List<MessageExt> poll();    List<MessageExt> poll(long timeout);    void seek(MessageQueue messageQueue, long offset) throws MQClientException;    void pause(Collection<MessageQueue> messageQueues);    void resume(Collection<MessageQueue> messageQueues);    boolean isAutoCommit();    void setAutoCommit(boolean autoCommit);    Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException;    Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException;    void commitSync();    Long committed(MessageQueue messageQueue) throws MQClientException;    void registerTopicMessageQueueChangeListener(String topic,        TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException;}
  • LitePullConsumer接口定义了start、shutdown、subscribe、unsubscribe、assign、poll、seek、pause、resume、isAutoCommit、setAutoCommit、fetchMessageQueues、offsetForTimestamp、commitSync、committed、registerTopicMessageQueueChangeListener方法

DefaultLitePullConsumer

rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java

public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {    private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;    private String consumerGroup;    private long brokerSuspendMaxTimeMillis = 1000 * 20;    private long consumerTimeoutMillisWhenSuspend = 1000 * 30;    private long consumerPullTimeoutMillis = 1000 * 10;    private MessageModel messageModel = MessageModel.CLUSTERING;    private MessageQueueListener messageQueueListener;    private OffsetStore offsetStore;    private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();    private boolean unitMode = false;    private boolean autoCommit = true;    private int pullThreadNums = 20;    private long autoCommitIntervalMillis = 5 * 1000;    private int pullBatchSize = 10;    private long pullThresholdForAll = 10000;    private int consumeMaxSpan = 2000;    private int pullThresholdForQueue = 1000;    private int pullThresholdSizeForQueue = 100;    private long pollTimeoutMillis = 1000 * 5;    private long topicMetadataCheckIntervalMillis = 30 * 1000;    public DefaultLitePullConsumer() {        this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);    }    public DefaultLitePullConsumer(final String consumerGroup) {        this(null, consumerGroup, null);    }    public DefaultLitePullConsumer(RPCHook rpcHook) {        this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);    }    public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {        this(null, consumerGroup, rpcHook);    }    public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {        this.namespace = namespace;        this.consumerGroup = consumerGroup;        defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);    }    @Override    public void start() throws MQClientException {        this.defaultLitePullConsumerImpl.start();    }    @Override    public void shutdown() {        this.defaultLitePullConsumerImpl.shutdown();    }    @Override    public void subscribe(String topic, String subExpression) throws MQClientException {        this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);    }    @Override    public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {        this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector);    }    @Override    public void unsubscribe(String topic) {        this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));    }    @Override    public void assign(Collection<MessageQueue> messageQueues) {        defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));    }    @Override    public List<MessageExt> poll() {        return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis());    }    @Override    public List<MessageExt> poll(long timeout) {        return defaultLitePullConsumerImpl.poll(timeout);    }    @Override    public void seek(MessageQueue messageQueue, long offset) throws MQClientException {        this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset);    }    @Override    public void pause(Collection<MessageQueue> messageQueues) {        this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues));    }    @Override    public void resume(Collection<MessageQueue> messageQueues) {        this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues));    }    @Override    public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {        return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));    }    @Override    public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException {        return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp);    }    @Override    public void registerTopicMessageQueueChangeListener(String topic,        TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException {        this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener);    }    @Override    public void commitSync() {        this.defaultLitePullConsumerImpl.commitSync();    }    @Override    public Long committed(MessageQueue messageQueue) throws MQClientException {        return this.defaultLitePullConsumerImpl.committed(messageQueue);    }    @Override    public boolean isAutoCommit() {        return autoCommit;    }    @Override    public void setAutoCommit(boolean autoCommit) {        this.autoCommit = autoCommit;    }    //......}
  • DefaultLitePullConsumer继承了ClientConfig,实现了LitePullConsumer接口,其构造器会创建DefaultLitePullConsumerImpl,LitePullConsumer接口定义的方法,其内部实现都委托给了DefaultLitePullConsumerImpl

小结

rocketmq6.0引入了LitePullConsumer,解决Add lite pull consumer support for RocketMQ #1388,提供了如下功能:

(1) Support consume messages in subscribe way with auto rebalance.(2) Support consume messages in assign way with no auto rebalance support.(3) Add seek/commit offset for a specified message queue.

doc

  • Add lite pull consumer support for RocketMQ #1388
  • [ISSUE #1388]Add lite pull consumer support for RocketMQ #1386
  • LitePullConsumer
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
RocketMQ源码解析:手把手教老婆看懂DefaultMQProducer
RocketMQ顺序消息消费
Android中handler的用法总结
RocketMQ的发送模式和消费模式
聊聊rocketmq的RemotingTooMuchRequestException
高德地图api之location定位服务 | 学步园
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服