打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
RocketMQ在平安银行的应用与实践

随着互联网金融和相关技术的不断发展,传统金融行业为满足业务快速发展需求,正在积极引入各类开源技术,以快速抢占市场。那么,以金融和科技作为双驱动的平安银行在开源技术的引入方面是如何评估,运用到哪些业务场景以及面对银行业复杂的网络环境,是如何部署的呢?本文将以Apache RocketMQ为例,和您一起了解平安银行在开源技术选型方面的思考和实践

 关注本号并回复平安,可以获取文章的pdf版

文章整理自平安银行消息负责人吴建峰在Apache RocketMQ Meetup上海站的分享,主要包括以下内容:

📌 RocketMQ 在平安银行的应用场景;

📌 复杂网络环境下的部署实践;

📌 多隔离区场景下的部署情况;

📌 IDC 场景下的部署情况;

📌 改造实践和遇到的小插曲;

RocketMQ在平安银行的应用场景

目前,平安银行通过RocketMQ 解决了数据预加、信息通知和状态变化等方面的业务需求,接下来,我们通过App 登录、资产总览和工资理财个应用场景来展开讲下。

App 登录:当用户打开平安银行App 的时候,系统会根据用户的登录ID 去加载相应的用户数据,比如银行卡、信用卡和理财产品等,以及一些系统通知。这个场景下,我们用到了RocketMQ 的异步处理能力,即预加载可能需要使用的数据,提升用户体验。

资产总览:进入平安银行 App 资产总览的页面,页面显示账户余额、各类理财产品(黄金、基金和股票等),以及贷款等方面的信息。平安银行对接了各类基金、黄金和股票等来自不同金融主体、不同系统的数据,具有种类多、异构系统多和变化快的特点。我们的目标就是让资产总览数据尽可能准确,不同种类的资产变动的时候发出通知,资产系统收到通知后,根据变化的数据来计算出用户当前的资产总览。

工资理财:

工资理财是指每月工资下发到银行卡后,系统可以实现自动买入用户设置好的理财产品,例如买一些定投类的理财产品。这里信息的传递流程是:


·      银行卡里的余额出现变动,系统把变动信息发送到消息引擎;

·      Consumer端进行消费,通知用户账户已经出现变化;

·      系统判断变动是否来源于代发工资;

·      如果是,系统会再发送一条消息;

·      理财的 Consumer 进行消费;

·      判断现在是不是应该买入工资理财类的产品;

·      如果是,自动买入用户设定好的理财产品;

·      自动买入之后,余额再次变动,系统会再一次通知,这一次通知,判断的就是一些其他的逻辑了。

那么,在这些场景中,我们对消息引擎会有哪些要求呢?

A高可用、高可靠和高性能,这是金融行业引入开源技术的基本要求;

B堆积能力,代发工资的用户很多,一个公司的员工会在某个时间点集中发放;

C顺序能力,即账户变动在先,发出通知在后;

D事务性能力,如果没有事务性,有可能会出现账户变动了,但没有触发购买理财产品的情况;

E重试和延迟消息功能,比如工资发放的时候,可能是在晚上,这时候系统会自动把购买理财的动作放在第二天白天去操作,再发出通知;

F消息回溯能力,就是出现问题的时候,我们可以把这个消息进行回溯,重新进行消息的处理,提供完整的消息轨迹;

在技术选型的过程中,RocketMQ符合我们在这些典型使用场景中对消息产品的需求,在引入的过程中,平安银行也对社区做了相应的贡献。

RocketMQ在复杂网络环境下的部署实践

2.1 多测试子环境下的服务调用场景

平安银行有多套测试子环境,用来对不同的feture进行测试,即图中的FAT、FAT001、FAT002、FAT003等。传统银行系统由大型机时代向更面向互联网用户的零售时代转型过程中,不可避免微服务化,传统较集中式系统会被划分为较多的微服务,正常的情况如下图,服务A 调用服务B,服务B 调用服务C,服务C 调用服务D。

随着业务的需求,假设新的feature,我们需要对服务A 和B 进行改动。相比在FAT环境里去部署测试,更合适的方式是另起一套FAT 环境,这里我们命名为FAT001,把服务A和B部署上去,A 调用B,B会调用原来FAT 环境里的C 和D。

此时,另一个新的需求,需要对服务A 和C 进行改动。如果直接发布到FAT 或FAT001 肯定是不对的,会影响正在进行的测试,此时,我们会再起一套测试环境,命名为FAT002,发布服务A 和C。由于FAT002 里没有服务B,所以服务A要调用服务B 就需要去FAT 环境(FAT 定义为较稳定的测试子环境)。服务B 调用服务C 的时候,就不应该调用本环境的C了,而是调动FAT002 的C 才能实现测试功能。

2.2 多测试子环境的消息场景

以上是服务调用的请求路径,比较好理解,但到了消息队列上,问题会变得复杂些,假如一个feture 只是更改了消费者,那如何把这条消息传递到改的消费者应用上去进行测试,而不被其它环境的消费者消费掉,这是我们需要去考虑的问题。 

来看下具体的情况,集群部署了Broke A 和Broke B,TopicA分别部署在这两个Broker上。此时,Producer Group A向Topic A 中写数据,Consumer Group A去消费,这种情况下是不会有任何问题。但如果新增一套FAT001 的环境,要求FAT001 发布的消息,只能由FAT001 来消费,FAT 不能消费,这种情况下我们该怎么解决?

在消息上面加一些路由、或是加一些TagFilter、消息的Property这些都不能解决我们的问题。每个子环境部署一套RocketMQ一方面成本太高,另一方面如果这个feture测试完成了,需要把相关的应用再切回FAT 进行处理,实现太过复杂。

大家可以思考下,多个feture 同时进行测试,DB 会部署一套还是多套?首先一个feture 不会更改所在的应用,一般来说DB 是部署一套的,在数据库里面添加字段,来识别数据是来源于哪一个子环境,如果多套部署,不更改的应用取不到新部署的DB 数据,无法进行全链路测试。所以同样的,我们也没有在每个子环境都部署一套 RocketMQ,而是部署统一部署,通过微服务RPC路由把请求路由到正确的生产者集,改造消息路由算法把消息路由到正确的消费者进行处理。

下面介绍几种生产和消费场景:

1 生产者变更

在上图中的场景下,默认的场景FAT发送,FAT 消费,没有问题的,假设FAT001 的生产者发布了,需要FAT001 发送到MQ集群,FAT是可以直接消费。

2 生产者、消费者同时变更

在这个场景下,如果消费者在 FAT001也部署了应用,需要FAT消费者不能消费由FAT001产生的消息,而是由FAT001的消费者去消费。我们的处理方式是在逻辑上把Topic A下面的Queue进行分组,相当于加了逻辑分组,如果消费者在 FAT001 有部署,我们会把 Queue的分组扩大,在其默认设置的情况下再翻一倍,新增的 Queue 就是给到 FAT001进行消费的。

3 只有消费者变更

再来看一种场景,假设有个feature只需要更改消费者,部署在FAT001。也是可以通过逻辑分组的方式,实现生产者根据请求来源来发送消息到队列FAT001 逻辑分组内的Queue,从而只让部署在FAT001 的消费者消费。

通过以上个场景,我们了解到添加逻辑分组的必要性,实现过程并不复杂。主要做了以下调整:

  •  这个逻辑分组什么时候建立?

新建Topic 的时候,全部建好?还是 Consumer 上线/下线的时候动态的去进行调整?我们选择了动态创建的方式,这个过程中,我们添加了Meta Server 进行元数据管理,进行动态创建:

  •  添加 Meta Service,管理的元数据包括 ProducerConsumerTopicQueueSubenv等信息:

  • 调整 Producer,取Request Head 里面请求来源FATFAT001FAT002...),如果Topic 对应的存在分组,选择分组的 Queue,否则发到默认分组呢的Queue

  •  调整 Consumer,上线时判断应用部署的分组(FAT、FAT001、FAT002...),如果Topic不存在对应的分组,则新建;存在,则rebalalce (新Consumer节点上线),下线时,判断该分组是否还存在 其它Consumer实例,若不存在,删除分组,否则 rebalalce(Consumer某一节点下线);

多隔离区场景下的部署实践

由于对安全上的重视,金融行业的网络环境相比其他行业会更复杂。整个隔离区主要分为以下几个部分:

  • DMZ区:

外网可以直接访问,用于放置网关;

  • Web区:

面向的是用户手机,或者网页上可以看到的功能应用;

  • 核心区:

包含核心的调用逻辑功能,和交易、订单相关的核心应用,例如DB 和存储;

  • 外联区:

用于访问外网,有时候也会部署一些Poxy 代理,因为内网不能直接访问外网,需要通过代理去访问外网;

  • 专用区域:

对接基金、三方存管等外部系统。在金融行业,如果某个系统是闭环的,那必须要去做隔离;

  • 管理区:

是指对整个区域的应用要进行集中管理,且数据流动是单向的,只能取数据,不能通过管理区把某区域的数据推送到另一区域。

此外,从安全的角度出发,所有的区域在生产环境下都通过防火墙进行隔离,这就给我们部署RocketMQ 带来了很大的实施难度。如果只部署一套,面对这么多的防火墙,生产者消费者到集群的的流量跨墙,会给网络带来极大的不稳定,遇到瓶颈,防火墙几乎无法在线平滑扩容;如果每个子环境都部署一套,又带来运维复杂度,而且还是存在数据同步和跨墙消费的问题。

最终,我们采用的是折中的办法,即统一部署加分隔离区部署,这样做的益处是:

  • 防火墙是开大策略,保证安全的前提下,符合监管要求;

  • 针对跨隔离区消费的问题,我们采用复制的方式,模拟消费者重新写入目标集群;

多IDC场景下的部署实践

同城多IDC,可以认为近似局域网,比较好处理,但异地多IDC多活场景,目前我们还没有特别好的解方案,多活不可避免面临数据分片、数据合并和数据冲突的解决等问题。

如果 Topic 下数据有多活需求,我们暂时通过复制方式来处理。但这类手工模拟消费者消费数据写入新集群的复制方式,会存在一些问题,即复制到另一个集群之后offset 会改变,处理逻辑就会有偏差。我们通过pull 的方式自定义的去根据offset 去进行消费。当故障转移到新的集群需要去消费的时候,需要获取到新集群里面正确的offset 值。此时,这个值和之前集群里的已经不一样了,需要根据时间得到新集群里正确的offset 值,再去进行消费。在没有更好的解决方案前,治理就显得很关键了。

不过,我们注意到, RocketMQ 最新发布的版本里,提供了DLedger的特性,DLedger 定位的是一个工业级的Java Library,可以友好地嵌入各类Java 系统中,满足其高可用、高可靠、强一致的需求。我们会尽快对这一特性进行集成和测试。

改造实践和遇到的小插曲

我们在对 RocketMQ的使用过程中,添加了以下功能或特性:

A. 为生产者提供消息的堆积能力。

B. 将所有配置管理部署到配置中心,并做云端化处理,以满足动态修改的需求。

C.4.3 版本还未提供事务处理能力前,我们在本地数据库里去建一张消息表,数据库更改数据状态的时候,会同步将数据写入消息表。若发送失败,则进行补偿。并在客户端里进行封装。

D.实现统一的消息者幂等处理。

E.添加身份认证和消息认证(注:RocketMQ 4.3 版本中已经实现身份认证功能)

当然,也遇到了一些小插曲,基本都是使用上的小问题,可能大家也会碰到:

A. 一个应用使用多个RocketMQ集群时,未加载到正确的配置。Client 端,如果没有对instancename 进行配置,一个应用连多个集群会失败。

B. 在大数据的场景下,内存溢出。订阅的Topic 越多,每个Queue 在本地缓存的message 也会越多,默认每个queue 1000条,可能会把内存打爆,可根据实际情况调整。

C. 删除数据时IO抖动,默认每天凌晨4点删除数据,量上来后出现IO 抖动,配置了消息删除策略,默认逗号分隔开,多配几个时间删除就可以了。

D. Broker上日志报延迟消息找不到数据文件。在主备切换的演练过程中,出现了延迟消息在Broker 上处理异常的情况。当主切换到备端时,延迟消息在Broker 上保存的文件被自动删除,再切回主,由于延时消息的元数据感觉在,会查询文件进行处理,此时已找不到文件。

E. NAS 的时候,IP 获取了 NAS的私网地址,并被提交给了服务端

以上就是我们在部署过程中遇到的一些小插曲,基本都是可以很快定位原因,解决的。

总结

总的来看,RocketMQ对平安银行的消息系统建设的帮助是非常大的,尤其是满足了数据持久化、顺序消费和回溯的需求,此外,在消息的查询方面,也比我们之前使用的消息引擎好很多。最后再分享一点自己对中间件的一点感悟:中间件使用重在治理,规范不先行,开发两行泪。

本文作者:吴建峰,GitHub ID @devilfeng,来自平安银行平台架构部基础框架团队。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Java进阶必备:优雅的告诉面试官消息中间件该如何实现高可用架构?
Kafka实战-Kafka Cluster
RocketMQ生产部署架构如何设计
EMQX vs Mosquitto | 2023 MQTT Broker 对比
5G场景拓展下的服务器增量需求
分布式缓存架构基础
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服