打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
RocketMQ第七章:手把手教老婆实现-广播消息生产者和消费者
技术活,该赏
关注+一键三连(点赞,评论,收藏)再看,养成好习惯

RocketMQ使用教程相关系列 目录



第一节:介绍

消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的

限制:

不支持顺序消息

消费者消费模式

消费者的消费模式分为两种

  1. 负载均衡模式:消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
  2. 广播模式:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的

第二节:广播消息-生产者和消息者步骤说明

广播消息生产者代码实现步骤

1.创建消息生产者producer,并制定生产者组名

2.指定Nameserver地址

3.启动producer

4.创建消息对象集合,指定主题Topic、Tag和消息体

5.发送集合消息

6.关闭生产者producer

注:与批量消息的生产者代码一模一样

广播消息消费者代码实现步骤

1.创建消费者Consumer,制定消费者组名

2.指定Nameserver地址

3.默认均衡轮询消费模式 改为广播模式

4.订阅主题Topic和Tag

5.设置回调函数,处理消息

6.启动消费者consumer

注意:消费者的 Topic 和 Tag 需要和生产者保持一致

第三节:广播消息生产者

public class Producer {

   public static void main(String[] args) throws Exception {
      // 1.创建消息生产者producer,并制定生产者组名
      DefaultMQProducer producer = new DefaultMQProducer("demo_producer_broadcasting_group");
      // 2.指定Nameserver地址
      producer.setNamesrvAddr("192.168.88.131:9876");
      // 3.启动producer
      producer.start();
      System.out.println("生产者启动");
      List<Message> msgs = new ArrayList<Message>();

      // 4.创建消息对象,指定主题Topic、Tag和消息体
      /**
       * 参数一:消息主题Topic
       * 参数二:消息Tag
       * 参数三:消息内容
       */
      for (int i = 0; i < 20; i++) {
         Message msg = new Message("Topic_broadcasting_demo", "Tag_broadcasting_demo",
               ("Hello 虚竹,这是广播消息" + i).getBytes());
         msgs.add(msg);
      }

      // 5.发送消息
      SendResult result = producer.send(msgs);
      // 发送状态
      SendStatus status = result.getSendStatus();

      System.out.println("发送结果:" + result);

      // 线程睡1秒
      TimeUnit.SECONDS.sleep(1);

      // 6.关闭生产者producer
      producer.shutdown();
      System.out.println("生产者关闭");
   }

}

效果:

第四节:广播消息消费者A 

public class ConsumerA {
   public static void main(String[] args) throws Exception {
      // 1.创建消费者Consumer,制定消费者组名
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_producer_broadcasting_group");
      // 2.指定Nameserver地址
      consumer.setNamesrvAddr("192.168.88.131:9876");

      // 默认均衡轮询消费模式 改为广播模式
      consumer.setMessageModel(MessageModel.BROADCASTING);

      // 3.订阅主题Topic和Tag
      consumer.subscribe("Topic_broadcasting_demo", "*");

      // 4.设置回调函数,处理消息
      consumer.registerMessageListener(new MessageListenerConcurrently() {

         // 接受消息内容
         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
               System.out.println(
                     "A----consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         }
      });
      // 5.启动消费者consumer
      consumer.start();

      System.out.println("消费者启动");
   }
}

效果:

第五节:广播消息消费者B

public class ConsumerB {
   public static void main(String[] args) throws Exception {
      // 1.创建消费者Consumer,制定消费者组名
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_producer_broadcasting_group");
      // 2.指定Nameserver地址
      consumer.setNamesrvAddr("192.168.88.131:9876");

      // 默认均衡轮询消费模式 改为广播模式
      consumer.setMessageModel(MessageModel.BROADCASTING);

      // 3.订阅主题Topic和Tag
      consumer.subscribe("Topic_broadcasting_demo", "*");

      // 4.设置回调函数,处理消息
      consumer.registerMessageListener(new MessageListenerConcurrently() {

         // 接受消息内容
         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
               System.out.println(
                     "B----consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         }
      });
      // 5.启动消费者consumer
      consumer.start();

      System.out.println("消费者启动");
   }
}

效果:

第六节:总结

消费模式默认为负载均衡模式
修改消费模式的方法是: consumer.setMessageModel();
设置为广播模式:consumer.setMessageModel(MessageModel.BROADCASTING);
设置负载均衡模式:consumer.setMessageModel(MessageModel.CLUSTERING);
 

参考:

顺序消息是否支持集群消费和广播消费?

https://help.aliyun.com/knowledge_detail/54357.html

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
定时消息通知 – 运维生存时间
小白学习Kafka(二):主题策略
谁再问我 Kafka,我把这 43 张图甩给他
Kafka系列1:Kafka概况
RocketMQ架构模块解析
Kafka分布式消息队列在python的应用
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服