打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
6-RabbitMQ工作模式-Routing路由模式

Routing路由模式

1. 模式说明

路由模式特点:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
1556029284397

图解:

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

2. 案例

在编码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。

在写案例之前,我们首先定义一下需求:

  • 生产者:发送两条消息,一条消息的用于插入数据,另一条消息用于更新数据
  • 消费者1:接收插入数据的消息,进行数据插入
  • 消费者2:接收更新数据的消息,进行数据更新

1)生产者

package com.lijw.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Aron.li
 * @date 2022/3/3 8:16
 */

public class Producer_Routing {

    //交换机名称
    static final String DIRECT_EXCHAGE = 'direct_exchange';
    //队列名称
    static final String DIRECT_QUEUE_INSERT = 'direct_queue_insert';
    //队列名称
    static final String DIRECT_QUEUE_UPDATE = 'direct_queue_update';

    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2. 设置参数
        factory.setHost('127.0.0.1'); // ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost('/test'); //虚拟机 默认值 /
        factory.setUsername('libai'); // 用户名 默认 guest
        factory.setPassword('libai'); //密码 默认值 guest

        //3. 创建连接 Connection
        Connection connection = factory.newConnection();

        //4. 创建Channel
        Channel channel = connection.createChannel();

        //5. 创建交换机
        /*
           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
           参数:
            1. exchange:交换机名称
            2. type:交换机类型
                DIRECT('direct'):定向
                FANOUT('fanout'):扇形(广播),发送消息到每一个与之绑定队列。
                TOPIC('topic') 通配符的方式
                HEADERS('headers') 参数匹配

            3. durable:是否持久化
            4. autoDelete:自动删除
            5. internal:内部使用。 一般false
            6. arguments:参数
        */

        channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT, truefalsefalsenull);

        // 6.声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */

        channel.queueDeclare(DIRECT_QUEUE_INSERT, truefalsefalsenull);
        channel.queueDeclare(DIRECT_QUEUE_UPDATE, truefalsefalsenull);

        // 7. 绑定队列和交换机
        /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为''
         */

        channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, 'insert');
        channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, 'update');

        //8. 发送消息至交换机,由交换机分发消息
        // 发送信息
        String message = '新增了商品。路由模式;routing key 为 insert ' ;
        /**
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */

        channel.basicPublish(DIRECT_EXCHAGE, 'insert'null, message.getBytes());
        System.out.println('已发送消息:' + message);

        // 发送信息
        message = '修改了商品。路由模式;routing key 为 update' ;
        /**
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */

        channel.basicPublish(DIRECT_EXCHAGE, 'update'null, message.getBytes());
        System.out.println('已发送消息:' + message);

        //9. 释放资源
        channel.close();
        connection.close();

    }
}

执行发送消息:

发送消息之后,我们来看看声明好的交换机:

2)消费者1:专门接收 insert 的消息

package com.lijw.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Aron.li
 * @date 2022/3/2 16:16
 */

public class Consumer_Routing1 {

    //队列名称
    static final String DIRECT_QUEUE_INSERT = 'direct_queue_insert';

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost('127.0.0.1'); // ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost('/test'); //虚拟机 默认值 /
        factory.setUsername('libai'); // 用户名 默认 guest
        factory.setPassword('libai'); //密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。

         */

        channel.queueDeclare(DIRECT_QUEUE_INSERT, truefalsefalsenull);

        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象

         */

        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法
                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据
             */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println('接收队列的数据 body: ' + new String(body));
            }
        };
        channel.basicConsume(DIRECT_QUEUE_INSERT,true,consumer);

        //不需要关闭资源,因为消费者需要持续监听队列信息
    }
}

3)消费者2:专门接收 update 的消息

package com.lijw.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Aron.li
 * @date 2022/3/2 16:16
 */

public class Consumer_Routing2 {

    //队列名称
    static final String DIRECT_QUEUE_UPDATE = 'direct_queue_update';

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost('127.0.0.1'); // ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost('/test'); //虚拟机 默认值 /
        factory.setUsername('libai'); // 用户名 默认 guest
        factory.setPassword('libai'); //密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。

         */

        channel.queueDeclare(DIRECT_QUEUE_UPDATE, truefalsefalsenull);

        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象
         */

        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法
                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据
             */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println('接收队列的数据 body: ' + new String(body));
            }
        };
        channel.basicConsume(DIRECT_QUEUE_UPDATE,true,consumer);

        //不需要关闭资源,因为消费者需要持续监听队列信息
    }
}

3. 测试

启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。

  • 消费者1 收到了 insert 的消息
  • 消费者2 收到了 update 的消息

4. 小结

Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
rabbitmq 实现延迟队列的两种方式
rabbitmq单机多实例集群搭建
RabbitMQ之推送消息
RabbitMQ六种通信模式介绍
rabbitmq介绍以及初步使用
RabbitMQ的几种典型使用场景
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服