打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
ActiveMQ --- 高级篇

在前两篇文章中说了入门和与spring以及springboot框架的整合,本文来聊聊activemq更深层次的东西,包括传输协议、消息存储、持久化、集群、异步投递、延迟投递、定时投递、重试机制、死信队列和防止重复消费问题等。

传输协议

默认使用tcp协议,那么可以改吗?如果可以,怎么改?
1、能否改?
可以改,activemq支持很多协议,除了默认的TCP,还有SSL、UDP、NIO、Http(s)、VM等。在项目中,一般使用NIO,性能更好。使用tcp协议时的端口是61616,url后面还有可选参数,可以参考官网的说明。TCP协议的优点是可靠性高,稳定性强,字节流方式传递,效率高,应用广泛,支持任何平台。NIO适合有大量的client去连接broker的情况,提供更好的性能,连接的url形式:nio://hostname:port?key=value
2、如何改?
在activemq的安装目录的conf目录下,有个activemq.xml配置文件,在transportConnectors标签内就可以配置协议。

image.png


在这里加上nio的配置:

<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/>

这样就配置好了,然后将生产者和消费者中连接的url都改成

nio://192.168.X.XX:61618

这样就可以成功使用nio协议了。
3、nio适配多协议:
其实说nio协议不太准确,应该是说使用以tcp协议为基础的nio网络模型。也就是说,activemq默认的那些协议,底层用的都是BIO,经过上面这样配置,就让tcp协议底层使用nio了。那么怎么让其他协议底层也用nio呢?配置文件中默认的其他四种协议,如amqp、stomp、mqtt和ws协议。只需要把之前配置的nio删掉,然后在配置文件中做如下配置:

<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectManager.corePoolSize=20&org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50"/>

最后将代码中的端口改成61608即可。这时即使将连接url开头的nio换成tcp,也可以在61608端口正确运行

保证消息可靠性 --- 持久化

在MQ的入门篇中讲到过保证MQ的可靠性就要从持久、事务和签收三方面去考虑,这三个都是MQ自带的可配的参数。这里要说的持久化,和redis中持久化概念差不多,持久化到硬盘或者数据库,即使MQ挂了消息也不会丢失。

1、activemq的持久化机制有哪些?
有JDBC(工作中常用的机制)、AMQ(版本5.3之前的存储机制,现在不用了)、KahaDB(版本5.4开始的默认机制)和Level DB。不论用哪种,存储消息的逻辑都是一致的。

2、存储消息的逻辑是怎样的?
生产者发送消息后,消息中心首先将消息进行持久化,然后再发送给接收者,发送成功则删除持久化的消息,失败就继续发送。

3、KahaDB机制介绍(4+1结构,4个文件一把锁):
类似于redis的aof方式,基于日志文件,记录消息。5.4开始的版本默认持久化机制就是这个,在activemq的配置文件activemq.xml中有如下配置:

kahaDB


通过这段配置可以知道,在activemq的安装目录下有个data/kahadb目录,打开目录,有如下文件:

kahadb目录


它们的作用分别是:

  • db-1.log:这是存储消息数据的日志文件,大小固定,如果存储满了,数字就会递增,比如db-1.log存储满了,就会生成一个db-2.log来存储消息数据。

  • db-data:这个文件保存了BTree索引,就是保存了db-1.log文件的索引。

  • db-free:其实还有个上图没显示的文件,就是db-free,这个是记录db-data哪里是空闲的,是没有索引的,以便要建新索引时就建在那里,可以充分利用db-data文件。

  • db-redo:这又是对上面三个文件的备份,如果上面三个文件出现意外数据丢失了,可以通过db-redo恢复。

  • lock:对记录日志和索引的时候加锁用的。

4、levelDB机制介绍:
它是activemq5.8开始引入的持久化机制,将来要替代kahadb的东西。和kahadb也挺相似,也是基于日志文件进行存储。要使用的话只需要将activemq.xml配置文件中的持久化配置改成这样即可:

<persistenceAdapter>
<levelDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

没错,就是将配置节点中的kahaDB换成levelDB即可。

5、JDBC持久化介绍:
有了kahadb和leveldb,为什么还要有jdbc持久化机制呢?因为kahadb和leveldb还是基于activemq的,还是依赖于activemq自身。而jdbc持久化,就是将消息存储到数据库中。使用jdbc持久化步骤如下:

  • 将MySQL驱动包加到activemq安装目录的lib目录里面;

  • 在activemq.xml中最相关持久化配置,将kahadb换成jdbc,如下:

<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true" />
</persistenceAdapter>

mysql-ds就是数据源,接下来就要配置这个数据源。

  • 配置数据源:

<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.x.xxx:3306/activemq_persistence?relaxAutoCommit=true"/>
<property name="username" value="xxx"/>
<property name="password" value="xxx"/>
<property name="maxTotal" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>

首先你得在mysql中新建一个名为activemq_persistence的数据库。连接池用的是dbcp2,如果要用其他的,就需要把jar包添加到activemq安装目录下的lib目录下。那么这段配置在哪里呢?就是配置在activemq.xml的broker标签之后,import标签之前。如下图:

配置数据源


做好如上配置,运行MQ生产者(queue)生产消息,就会发现你连接的数据库中多了三张表,如下:

activemq


下面来说说这三张表的作用。

  • ACTIVEMQ_ACKS:如果是发布订阅模式,订阅者和服务器的订阅关系存储在这里。

  • ACTIVEMQ_LOCK:这张表主要是在集群的时候使用的。

  • ACTIVEMQ_MSGS:存储消息的表,消息的目的地,发送者主键,消息发送到顺序,过期时间等都存在这张表中。

    ACTIVEMQ_MSGS


    然后运行消费者消费消息,那么表中的数据就会自动被删除。需要注意的是,要将消息保存到数据库,得设置deliveryMode为持久化。springboot默认开启了,如需关闭,在生产者配置文件中像下面这样配置即可。

jms:
template:
delivery-mode: non_persistent

上面说了queue,下面再来看看topic。注意发布订阅模式一定先启动消费者!还有,如果是spring与activemq整合的项目,在spring配置文件配置activemq的监听程序的时候,要设置clientId等,监听程序的变成下面这样:

<!-- 5、配置监听程序 -->
<bean id="jmsListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="destination" ref="destinationTopic"/>
<property name="messageListener" ref="myMessageListener"/>
<property name="clientId" value="111"/>
<property name="subscriptionDurable" value="true"/>
<property name="pubSubDomain" value="true"/>
</bean>

下面三行配置是新加的,要做持久化就必不可少。

开发有坑:

  • 如果只生成了两张表,没有ACKS那张表,是因为数据库编码的问题,新建数据库的时候可能用了utf8b64之类的编码,官网建议编码为latin1 或者ASCII。

  • 如果报错BeanFactory not initialized or already closed,这时因为操作系统的机器名有“_”符号,改机器名重启即可。

  • 如果成功生成了三张表,然后访问8161的管理后台,首页可以进去,但是点击queue等菜单的时候就报503的错误,此时可以用./activemq console方式启动,如果发现控制台有如下info日志信息:

    info信息


    出现这种问题的原因就是你的mysql驱动包版本太高了,换个低版本的就好。实测activemq 5.15.5搭配mysql8.x的jar包会有此问题,换成5.x的jar包即可。

6、JDBC&Journal:
上面用JDBC实现了持久化,Journal呢就是一种缓存,也就是说,加上Journal,JDBC持久化速度将会更快!生产者生产消息,会存到数据库,消费者消费了这条消息,又要从数据库删除,会造成频繁地读库和写库,性能较低。有了journal,生产者生产的消息就会存到journal文件中,如果在journal还未来得及同步消息到DB的时候,消费者就已经将消息消费了,那么这条消息就不用同步到数据库了,这样就可以减轻数据库的压力。

  • 配置方法:将persistenceAdapter标签换成persistenceFactory并配置成如下即可:

<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="activemq-data" />
</persistenceFactory>
保证高可用 --- 集群

互联网技术,总离不开可靠性和高可用这些词汇。可靠性就是如何保证数据不丢失,也就是上面讲的持久化;高可用就是当服务器挂了还要能继续正常提供服务,所以就是集群技术。

1、ActiveMQ如何保证高可用?
官网推荐的就是基于Zookeeper和Replicated levelDB Store搭建ActiveMQ主从复制集群,避免单点故障。官网链接:http://activemq.apache.org/replicated-leveldb-store。原理图:

集群


这张图的意思就是,至少会有三个mq的节点,一主二从,它们有着相同的数据。一主二从形成一个整体,客户端并不知道集群的存在,客户端连接的是master。zookeeper就在后面默默地监视着mq集群,一旦发现master挂了,立即从两个slave中选举出新的master。

2、配置集群的步骤:

  • 配置zookeeper集群,参考我zookeeper的文章,此处不再赘述。

  • 集群配置表:为了不混淆,列出如下配置表,zk集群和AMQ集群就按照下表配置。

|主机|zk端口|AMQ集群bind端口|AMQ消息TCP端口|AMQ控制台端口|AMQ安装目录|
|:-:|:-:|:-:|:-:|:-:|:-:|
|0.103|2181|63631|61616|8161|/mq_node01|
|0.103|2182|63632|61617|8162|/mq_node02|
|0.103|2183|63633|61618|8163|/mq_node03|

  • 添加host映射:vim/etc/hosts,添加映射,等下配置集群时要用到:

    host映射
  • 拷贝三份AMQ,分别命名为mq_node01,mq_node02和mq_node03。然后进入conf目录下vim配置文件jetty.xml,将控制台端口分别改为8161,8162和8163。

    修改控制台端口
  • 将三个节点的brokerName改成一致,默认是localhost,我这里都改成zhusl。分别修改三个节点conf目录下的activemq.xml:

    修改brokerName
  • 三个节点的持久化配置:

<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:63631"
zkAddress="localhost:2181,localhost:2182,localhost:2183"
hostname="zhusl-server"
sync="local_disk"
zkPath="/activemq/leveldb-store"/>
</persistenceAdapter>

这是其中一个节点的配置,我们要关注的有bind,这个是集群的bind端口,按照上面表中列的,一个63631,一个63632,一个63633;还有zkAddress,这是zookeeper的端口,按照上面表中的来,将三个端口配上;最后一个是hostname,这个就是刚才配的host映射。所以三个节点配置不同的地方就是bind端口。

  • 修改消息TCP协议端口:

<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>

将三个节点的这里,分别改成61616,61617和61618。

  • 启动zookeeper集群

  • 启动mq_node01、mq_node02和mq_node03

  • 检查zookeeper和activemq成功启动的数量:

ps -ef | grep zookeeper | grep -v grep | wc -l
ps -ef | grep activemq | grep -v grep | wc -l

如果执行这两条命令返回的都是3,那说明都启动成功了。

  • 用zkCli.sh连接一台zk,查看状态:

./zkCli.sh -server 127.0.0.1:2181

连接后查看节点,若如下图所示,说明activemq成功地注册到zookeeper上了。

activemq成功注册到zookeeper上
  • 查看activemq集群中的master
    直接在zkCli中用get命令查看,如下图:

    查看activemq的master

如果elected为null,那么该节点就是slave,有值,那就是master,上图中63631端口的就是master。至此,activemq集群就搭建完了。

  • 在代码中使用集群:以前在代码中是如下图一样配置的(这段代码是在ActiveMQ入门篇中写过的):

    非集群在代码中的使用

即连接的url是tcp://192.168.0.103:61616,现在集群了,写法应该改成:

private static final String URL = 
"failover:(tcp://192.168.0.103:61616,tcp://192.168.0.103:61617,tcp://192.168.0.103:61618)?randomize=false";

改成这样之后(生产者也是将url改成如上形式即可),运行代码,控制台显示成功,并且发现队列中就会多了一个消费者“queue_test”。

控制台显示启动成功
管理后台

3、集群可用性测试:

  • 检查是否只有master能被连接。首先执行lsof -i:816X,X分别代表1、2、3。

    查看端口

只有8161才有线程在使用,说明8161就是master的管理后台的端口,然后直接用浏览器访问。

控制台


然后也可以试着访问8162和8163,发现是访问不了的,这就达到了要求,只有master才能被连接。

  • kill掉8161对应的线程,即activemq集群的master,看看能否自动选出新的master。

    kill掉master


    浏览器再次访问8161端口的管理后台,发现无法访问此网站了。再尝试着访问8162和8163,发现8162可以访问。

    8162可以正常访问

说明8162就是选出来的新master。同样也可以去zookeeper中用get命令验证。然后我们执行代码,控制台的输出如下:

控制台输出

成功连接到61617了,说明集群是可用的。至此,集群可用性验证完毕。

常考面试题

1、如何保证消息中间件的高可用性?
答:zookeeper + replicated-leveldb-store主从集群

2、什么是异步投递Async Sends?
答:activemq支持同步和异步两种消息投递模式,默认是异步投递,原因是如果消费者消费能力有限,而使用同步的话,会严重影响mq的性能,所以默认是异步。可以明确指定使用同步方式,或者在没有使用事务的前提下发送持久化消息,也会进行同步发送。使用异步可以最大化生产者的发送效率,但是如果消费者消费能力跟不上,可能会造成消息的积压,也不能保证消息发送成功,所以使用异步需要承担在发送失败的情况下有少量的消息丢失的风险。关闭异步投递的方式:

// 方式一:在url中加上参数
private static final String URL = "tcp://192.168.0.103:61616?jms.useAsyncSend=false";

// 方式二:ActiveMQConnectionFactory 设置相关属性
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
factory.setUseAsyncSend(false);

3、异步发送时如何判断发送成功?
答:消息丢失出现的原因:异步投递的时候只要调用send方法生产者就会认为消息成功发送至MQ,如果MQ突然宕机,此时刚生产出来还未发送到MQ的消息就会丢失。所以生产者发送后需要接收回调来判断是否发送成功,这个回调需要自己写。此时生产者的写法如下:

public class ProductorAsync {
private static final String URL = "tcp://192.168.0.103:61617?jms.useAsyncSend=false";
private static final String QUEUE_NAME = "queue_test";
private static final String TOPIC_NAME = "topic_test";

public static void main(String[] args) throws Exception {
// 1. 创建factory工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 异步投递
factory.setUseAsyncSend(true);
// 2. 创建connection连接
Connection connection = factory.createConnection();
connection.start();
// 3. 创建session
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 4. 创建目的地queue
Queue queue = session.createQueue(QUEUE_NAME);
// producer换成ActiveMQMessageProducer
ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
// 5. 生产消息
for (int i = 1; i <= 3; i++) {
TextMessage message = session.createTextMessage("queue" + 1);
// 给消息设置上id
message.setJMSMessageID(UUID.randomUUID().toString());
// 获取自己设置的id
String messageId = message.getJMSMessageID();
// 6. 使用异步回调方式将消息发送到MQ
activeMQMessageProducer.send(message, new AsyncCallback() {
// 发送成功执行这个方法
@Override
public void onSuccess() {
System.out.println("id为:" + messageId + " 的消息发送成功!");
}

// 发送失败执行这个方法
@Override
public void onException(JMSException e) {
System.out.println("id为:" + messageId + " 的消息发送失败!");
}
});
}
// 7. 关闭资源(顺着申请,倒着关闭)
activeMQMessageProducer.close();
session.commit();
session.close();
connection.close();
System.out.println("发送到MQ完成!");
}
}

4、什么是延迟投递和定时投递?
答:延迟和定时的概念我们都理解,下面就来说说怎么用。要开启延迟和定时投递,

  • 首先需要在activemq的配置文件activemq.xml中进行如下配置:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="zhusl" dataDirectory="${activemq.data}" schedulerSupport="true">

即在broker标签后面加上schedulerSupport="true"属性。

  • 然后生产者与之前有所不同,如下(有区别的地方均加了注释):

public class ProductorSchedul {
private static final String URL = "tcp://192.168.0.103:61618";
private static final String QUEUE_NAME = "queue_test";

public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
Connection connection = factory.createConnection();
connection.start();

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);

// 延迟投递的时间(3秒钟)
long delay = 3 * 1000;
// 重复投递的时间间隔(4秒一次)
long period = 4 * 1000;
// 重复投递的次数(5次)
int repeat = 5;

for (int i = 1; i <= 3; i++) {
TextMessage message = session.createTextMessage("queue" + 1);
// 设置延迟投递
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
// 设置重复投递的时间间隔
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
// 设置重复投递的次数
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);

producer.send(message);
}
producer.close();
session.commit();
session.close();
connection.close();
System.out.println("发送到MQ完成!");
}
}

先启动上面的生产者,然后启动消费者,就会发现控制台3秒钟之后,每个4秒会打印一次收到的消息,总共会打印5次。

5、说说消息重试机制。
答:会引起消息重发的情况有:

  • 消费端使用了事务,并且在session钟调用了rollback。

  • 消费端使用了事务,但是在commit之前程序退出了或者根本就没有commit。

  • 消费端使用了签收,并且在session中调用了recover。

以上情况会导致消息重发,默认重发策略是重发6次(是重发6次,第一次算正常消费,不算重发,所以总共可以消费7次,第8次才会消费不到)。如果重发了6次还是发送失败,MQ就会认为这个消息有毒(poison ack),不再进行发送,并且放入死信队列(DLQ)。如果不使用默认的6次,也可以自己在消费端设置,如下代码:

public class ConsumerRepeat {
private static final String URL = "tcp://192.168.0.103:61618";
private static final String QUEUE_NAME = "queue_test";

public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 设置重发策略
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
// 最多重发3次
redeliveryPolicy.setMaximumRedeliveries(3);
factory.setRedeliveryPolicy(redeliveryPolicy);

Connection connection = factory.createConnection();
connection.start();
// 创建session,开启事务
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
while (true){
TextMessage message = (TextMessage) consumer.receive(3000);
if (message != null)
System.out.println(message.getText());
else
break;
}

consumer.close();
// 不提交事务
//session.commit();
session.close();
connection.close();
}
}

上面的代码设置成了重发3次。先启动生产者,然后启动消费者,可以正常消费1次,重复消费3次,第5次的时候,消费不到消息,同时管理控制台多了一个死信队列。

死信队列

6、说说死信队列。
上面说到了,重发了设定次数还不正的消息就会进入到死信队列,所以死信队列就是用来存放失败消息的,开发人员可以在这个队列中查看出错的消息,然后进行人工干预。默认是所有的失败消息都放到同一个共享死信队列中,也可以设置业务相关的独立死信队列。在activemq.xml中进行如下配置即可:

<policyEntry queue="<" >
<deadLetterStrategy>
<!-- 给死信队列名字加上后缀,例如test队列中的消息重发失败后就会放到DLQ.test死信队列中 -->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
<!-- 过期消息不放入死信队列中,默认是true,会放入 -->
<!-- 非持久消息放入死信队列,默认false,即不会放入 -->
<sharedDeadLetterStrategy processExpired="false" processNonPersistent="true"/>
<!-- 非持久消息放入死信队列,默认false,即不会放入 -->
</deadLetterStrategy>
</policyEntry>

7、如何保证消息不被重复消费?
像上面说的消息重发的时候,我们就可以重复消费消息。当然生产中肯定会避免上面说的三种造成消息重发的原因。但是由于网络等原因还是可能会造成消息重发,也会重复消费。保证消息不重复消费有以下两种办法:

  • 将消息存入数据库,利用主键唯一来防止重复消费。

  • 利用redis,给消息分配一个id,只要消费过的消息,就以

    的形式存入redis。消费者消费之前,就先拿id去redis中取,如果有数据,那就是消费过了

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
关于 MQ ,你必须知道的
【Active入门
ActiveMQ_部署及发送接收消息
ActiveMQ  延时投递
ActiveMQ使用笔记
成小胖学习 ActiveMQ · 基础篇
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服