打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Java消息队列任务的平滑关闭

#长按上图识别二维码,参与OSC源创会年终盛典#


摘要


对于消息队列的监听,我们一般使用Java写一个独立的程序,在Linux服务器上运行。程序启动后,通过消息队列客户端接收消息,放入一个线程池进行异步处理,并发的快速处理。当我们修改程序后,需要重新启动任务的时候,如何保证消息的不丢失呢?


1.问题背景


对于消息队列的监听,我们一般使用Java写一个独立的程序,在Linux服务器上运行。程序启动后,通过消息队列客户端接收消息,放入一个线程池进行异步处理,并发的快速处理。


那么问题来了,当我们修改程序后,需要重新启动任务的时候,如何保证消息的不丢失呢?


正常来说,订阅者程序关闭后,消息会在发送者队列中堆积,等待订阅者下次订阅消费,所以未接收的消息是不会丢失的。唯一可能丢失的消息,就是在关闭的一瞬间,已经从队列中取出但还没有处理完毕的消息。


因此我们需要一套平滑关闭的机制,保证在重启的时候,消息可以正常处理完成。


2.问题分析


平滑关闭的思路如下:


1.在关闭程序时,首先关闭消息订阅,这个时候消息都在发送者队列中


2.关闭本地消息处理线程池(等待本地线程池中的消息处理完毕)


3.程序退出


关闭消息订阅:一般消息队列的客户端都提供关闭连接的方法,具体可以自行查看api


关闭线程池:Java的ThreadPoolExecutor线程池提供shutdown()和shutdownNow()两个方法,区别是前者会等待线程池中的消息都处理完毕,后者直接停止线程的执行并返回list集合。因为我们需要使用shutdown()方法进行关闭,并通过isTerminated(),方法判断线程池是否已经关闭。


那么问题又来了,我们如何通知到程序,需要执行关闭操作呢?


在Linux中,我们可以用kill -9 pid关闭进程,除了-9之外,我们可以通过 kill -l查看kill 命令的其它信号量,比如使用 12) SIGUSR2 信号量

我们可以在Java程序启动时,注册对应的信号量,对信号量进行监听,在收到对应的kill操作时,执行相关的业务操作。


伪代码如下



下面通过一个demo模拟相关逻辑操作


首先模拟一个生产者,每秒生产5个消息


本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
SpringBoot+ActiveMQ多消息队列监听
基于优先级队列java线程池
看我是如何快速学习android开发的(一)
Java 理论与实践: 并发在一定程度上使一切变得简单
并发编程之痛
Java Concurrency In Practice
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服