打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
ActiveMQ  延时投递

1、如何保证消息中间件的高可用性?
答:zookeeper + replicated-leveldb-store主从集群

2、什么是异步投递Async Sends?
答:activemq支持同步和异步两种消息投递模式,默认是异步投递,原因是如果消费者消费能力有限,而使用同步的话,会严重影响mq的性能,所以默认是异步。可以明确指定使用同步方式,或者在没有使用事务的前提下发送持久化消息,也会进行同步发送。使用异步可以最大化生产者的发送效率,但是如果消费者消费能力跟不上,可能会造成消息的积压,也不能保证消息发送成功,所以使用异步需要承担在发送失败的情况下有少量的消息丢失的风险。关闭异步投递的方式:

// 方式一:在url中加上参数
private static final String URL = "tcp://192.168.0.103:61616?jms.useAsyncSend=false";

// 方式二:ActiveMQConnectionFactory 设置相关属性
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
factory.setUseAsyncSend(false);

3、异步发送时如何判断发送成功?
答:消息丢失出现的原因:异步投递的时候只要调用send方法生产者就会认为消息成功发送至MQ,如果MQ突然宕机,此时刚生产出来还未发送到MQ的消息就会丢失。所以生产者发送后需要接收回调来判断是否发送成功,这个回调需要自己写。此时生产者的写法如下:

public class ProductorAsync {
    private static final String URL = "tcp://192.168.0.103:61617?jms.useAsyncSend=false";
    private static final String QUEUE_NAME = "queue_test";
    private static final String TOPIC_NAME = "topic_test";

    public static void main(String[] args) throws Exception {
        // 1. 创建factory工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 异步投递
        factory.setUseAsyncSend(true);
        // 2. 创建connection连接
        Connection connection = factory.createConnection();
        connection.start();
        // 3. 创建session
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        // 4. 创建目的地queue
        Queue queue = session.createQueue(QUEUE_NAME);
        // producer换成ActiveMQMessageProducer
        ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
        // 5. 生产消息
        for (int i = 1; i <= 3; i++) {
            TextMessage message = session.createTextMessage("queue" + 1);
            // 给消息设置上id
            message.setJMSMessageID(UUID.randomUUID().toString());
            // 获取自己设置的id
            String messageId = message.getJMSMessageID();
            // 6. 使用异步回调方式将消息发送到MQ
            activeMQMessageProducer.send(message, new AsyncCallback() {
                // 发送成功执行这个方法
                @Override
                public void onSuccess() {
                    System.out.println("id为:" + messageId + " 的消息发送成功!");
                }

                // 发送失败执行这个方法
                @Override
                public void onException(JMSException e) {
                    System.out.println("id为:" + messageId + " 的消息发送失败!");
                }
            });
        }
        // 7. 关闭资源(顺着申请,倒着关闭)
        activeMQMessageProducer.close();
        session.commit();
        session.close();
        connection.close();
        System.out.println("发送到MQ完成!");
    }
}

4、什么是延迟投递和定时投递?
答:延迟和定时的概念我们都理解,下面就来说说怎么用。要开启延迟和定时投递,

  • 首先需要在activemq的配置文件activemq.xml中进行如下配置:

 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="zhusl" dataDirectory="${activemq.data}" schedulerSupport="true">

即在broker标签后面加上schedulerSupport="true"属性。

  • 然后生产者与之前有所不同,如下(有区别的地方均加了注释):

public class ProductorSchedul {
    private static final String URL = "tcp://192.168.0.103:61618";
    private static final String QUEUE_NAME = "queue_test";

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);

        // 延迟投递的时间(3秒钟)
        long delay = 3 * 1000;
        // 重复投递的时间间隔(4秒一次)
        long period = 4 * 1000;
        // 重复投递的次数(5次)
        int repeat = 5;

        for (int i = 1; i <= 3; i++) {
            TextMessage message = session.createTextMessage("queue" + 1);
            // 设置延迟投递
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
            // 设置重复投递的时间间隔
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
            // 设置重复投递的次数
            message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);

            producer.send(message);
        }
        producer.close();
        session.commit();
        session.close();
        connection.close();
        System.out.println("发送到MQ完成!");
    }
}

先启动上面的生产者,然后启动消费者,就会发现控制台3秒钟之后,每个4秒会打印一次收到的消息,总共会打印5次。

5、说说消息重试机制。
答:会引起消息重发的情况有:

  • 消费端使用了事务,并且在session钟调用了rollback。

  • 消费端使用了事务,但是在commit之前程序退出了或者根本就没有commit。

  • 消费端使用了签收,并且在session中调用了recover。

以上情况会导致消息重发,默认重发策略是重发6次(是重发6次,第一次算正常消费,不算重发,所以总共可以消费7次,第8次才会消费不到)。如果重发了6次还是发送失败,MQ就会认为这个消息有毒(poison ack),不再进行发送,并且放入死信队列(DLQ)。如果不使用默认的6次,也可以自己在消费端设置,如下代码:

public class ConsumerRepeat {
    private static final String URL = "tcp://192.168.0.103:61618";
    private static final String QUEUE_NAME = "queue_test";

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 设置重发策略
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        // 最多重发3次
        redeliveryPolicy.setMaximumRedeliveries(3);
        factory.setRedeliveryPolicy(redeliveryPolicy);

        Connection connection = factory.createConnection();
        connection.start();
        // 创建session,开启事务
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer(queue);
        while (true){
            TextMessage message = (TextMessage) consumer.receive(3000);
            if (message != null)
                System.out.println(message.getText());
            else
                break;
        }

        consumer.close();
        // 不提交事务
        //session.commit();
        session.close();
        connection.close();
    }
}

上面的代码设置成了重发3次。先启动生产者,然后启动消费者,可以正常消费1次,重复消费3次,第5次的时候,消费不到消息,同时管理控制台多了一个死信队列。

死信队列

6、说说死信队列。
上面说到了,重发了设定次数还不正的消息就会进入到死信队列,所以死信队列就是用来存放失败消息的,开发人员可以在这个队列中查看出错的消息,然后进行人工干预。默认是所有的失败消息都放到同一个共享死信队列中,也可以设置业务相关的独立死信队列。在activemq.xml中进行如下配置即可:

<policyEntry queue="<" >
        <deadLetterStrategy>
            <!-- 给死信队列名字加上后缀,例如test队列中的消息重发失败后就会放到DLQ.test死信队列中 -->
            <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
            <!-- 过期消息不放入死信队列中,默认是true,会放入 -->
            <!-- 非持久消息放入死信队列,默认false,即不会放入 -->
            <sharedDeadLetterStrategy processExpired="false" processNonPersistent="true"/>
            <!-- 非持久消息放入死信队列,默认false,即不会放入 -->
        </deadLetterStrategy>
</policyEntry>

7、如何保证消息不被重复消费?
像上面说的消息重发的时候,我们就可以重复消费消息。当然生产中肯定会避免上面说的三种造成消息重发的原因。但是由于网络等原因还是可能会造成消息重发,也会重复消费。保证消息不重复消费有以下两种办法:

  • 将消息存入数据库,利用主键唯一来防止重复消费。

  • 利用redis,给消息分配一个id,只要消费过的消息,就以

    的形式存入redis。消费者消费之前,就先拿id去redis中取,如果有数据,那就是消费过了
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
ActiveMQ --- 高级篇
ActiveMQ系列—ActiveMQ性能优化(中2)(处理规则和优化)
关于 MQ ,你必须知道的
ActiveMQ讯息策略
干货:RabbitMQ核心概念及工作原理
ActiveMQ使用笔记
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服