路由模式特点:
RoutingKey
(路由key)RoutingKey
。Routing Key
进行判断,只有队列的Routingkey
与消息的 Routing key
完全一致,才会接收到消息图解:
在编码上与 Publish/Subscribe发布与订阅模式
的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。
在写案例之前,我们首先定义一下需求:
package com.lijw.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Aron.li
* @date 2022/3/3 8:16
*/
public class Producer_Routing {
//交换机名称
static final String DIRECT_EXCHAGE = 'direct_exchange';
//队列名称
static final String DIRECT_QUEUE_INSERT = 'direct_queue_insert';
//队列名称
static final String DIRECT_QUEUE_UPDATE = 'direct_queue_update';
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost('127.0.0.1'); // ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost('/test'); //虚拟机 默认值 /
factory.setUsername('libai'); // 用户名 默认 guest
factory.setPassword('libai'); //密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建交换机
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT('direct'):定向
FANOUT('fanout'):扇形(广播),发送消息到每一个与之绑定队列。
TOPIC('topic') 通配符的方式
HEADERS('headers') 参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT, true, false, false, null);
// 6.声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
// 7. 绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为''
*/
channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, 'insert');
channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, 'update');
//8. 发送消息至交换机,由交换机分发消息
// 发送信息
String message = '新增了商品。路由模式;routing key 为 insert ' ;
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(DIRECT_EXCHAGE, 'insert', null, message.getBytes());
System.out.println('已发送消息:' + message);
// 发送信息
message = '修改了商品。路由模式;routing key 为 update' ;
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(DIRECT_EXCHAGE, 'update', null, message.getBytes());
System.out.println('已发送消息:' + message);
//9. 释放资源
channel.close();
connection.close();
}
}
执行发送消息:
发送消息之后,我们来看看声明好的交换机:
package com.lijw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Aron.li
* @date 2022/3/2 16:16
*/
public class Consumer_Routing1 {
//队列名称
static final String DIRECT_QUEUE_INSERT = 'direct_queue_insert';
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost('127.0.0.1'); // ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost('/test'); //虚拟机 默认值 /
factory.setUsername('libai'); // 用户名 默认 guest
factory.setPassword('libai'); //密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println('接收队列的数据 body: ' + new String(body));
}
};
channel.basicConsume(DIRECT_QUEUE_INSERT,true,consumer);
//不需要关闭资源,因为消费者需要持续监听队列信息
}
}
package com.lijw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Aron.li
* @date 2022/3/2 16:16
*/
public class Consumer_Routing2 {
//队列名称
static final String DIRECT_QUEUE_UPDATE = 'direct_queue_update';
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost('127.0.0.1'); // ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost('/test'); //虚拟机 默认值 /
factory.setUsername('libai'); // 用户名 默认 guest
factory.setPassword('libai'); //密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println('接收队列的数据 body: ' + new String(body));
}
};
channel.basicConsume(DIRECT_QUEUE_UPDATE,true,consumer);
//不需要关闭资源,因为消费者需要持续监听队列信息
}
}
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。
Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
联系客服