仓库源文站点原文


layout: post title: "Seata 动态配置订阅与降级实现原理" categories: Seata tags: dynamic

author: 张乘辉

那么 Seata 支持的多个配置中心是如何适配不同的动态配置订阅以及如何实现降级的呢?下面从源码的层面详细给大家讲解一番。

动态配置订阅

Seata 配置中心有一个监听器基准接口,它主要有一个抽象方法和 default 方法,如下:

io.seata.config.ConfigurationChangeListener

该监听器基准接口主要有两个实现类型:

  1. 实现注册配置订阅事件监听器:用于实现各种功能的动态配置订阅,比如 GlobalTransactionalInterceptor 实现了 ConfigurationChangeListener,根据动态配置订阅实现的动态降级功能;
  2. 实现配置中心动态订阅功能与适配:对于目前还没有动态订阅功能的 file 类型默认配置中心,可以实现该基准接口来实现动态配置订阅功能;对于阻塞订阅需要另起一个线程去执行,这时候可以实现该基准接口进行适配,还可以复用该基准接口的线程池;以及还有异步订阅,有订阅单个 key,有订阅多个 key 等等,我们都可以实现该基准接口以适配各个配置中心。

Nacos 动态订阅实现

Nacos 有自己内部实现的监听器,因此直接直接继承它内部抽象监听器 AbstractSharedListener,实现如下:

如上,

值得一提的是,nacos 并没有使用 ConfigurationChangeListener 实现自己的监听配置,一方面是因为 Nacos 本身已有监听订阅功能,不需要自己再去实现;另一方面因为 nacos 属于非阻塞式订阅,不需要复用 ConfigurationChangeListener 的线程池,即无需进行适配。

添加订阅:

Nacos 配置中心为某个 dataId 添加订阅的逻辑很简单,用 dataId 和 listener 创建一个 NacosListener 调用 configService#addListener 方法,把 NacosListener 作为 dataId 的监听器,dataId 就实现了动态配置订阅功能。

file 动态订阅实现

以它的实现类 FileListener 举例子,它的实现逻辑如下:

如上,

FileListener#onChangeEvent 方法的实现让 file 具备了动态配置订阅的功能,它的逻辑如下:

无限循环获取订阅的配置属性当前的值,从缓存中获取旧的值,判断是否有变更,如果有变更就执行外部传入 listener 的逻辑。

ConfigurationChangeEvent 用于保存配置变更的事件类,它的成员属性如下:

这里的 getConfig 方法是如何感知 file 配置的变更呢?我们点进去,发现它最终的逻辑如下:

发现它是创建一个 future 类,然后包装成一个 Runnable 放入线程池中异步执行,最后调用 get 方法阻塞获取值,那么我们继续往下看:

allowDynamicRefresh:动态刷新配置开关;

targetFileLastModified:file 最后更改的时间缓存。

以上逻辑:

获取 file 最后更新的时间值 tempLastModified,然后对比对比缓存值 targetFileLastModified,如果 tempLastModified > targetFileLastModified,说明期间配置有更改过,这时就重新加载 file 实例,替换掉旧的 fileConfig,使得后面的操作能够获取到最新的配置值。

添加一个配置属性监听器的逻辑如下:

configListenersMap 为 FileConfiguration 的配置监听器缓存,它的数据结构如下:

ConcurrentMap<String/*dataId*/, Set<ConfigurationChangeListener>> configListenersMap

从数据结构上可看出,每个配置属性可关联多个事件监听器。

最终执行 onProcessEvent 方法,这个是监听器基准接口里面的 default 方法,它会调用 onChangeEvent 方法,即最终会调用 FileListener 中的实现。

动态降级

有了以上的动态配置订阅功能,我们只需要实现 ConfigurationChangeListener 监听器,就可以做各种各种的功能,目前 Seata 只有动态降级有用到动态配置订阅的功能。

在「Seata AT 模式启动源码分析」这篇文章中讲到,Spring 集成 Seata 的项目中,在 AT 模式启动时,会用 用GlobalTransactionalInterceptor 代替了被 GlobalTransactional 和 GlobalLock 注解的方法,GlobalTransactionalInterceptor 实现了 MethodInterceptor,最终会执行 invoker 方法,那么想要实现动态降级,就可以在这里做手脚。

private volatile boolean disable;

在构造函数中进行初始化赋值:

ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION(service.disableGlobalTransaction)这个参数目前有两个功能:

  1. 在启动时决定是否开启全局事务;
  2. 在开启全局事务后,决定是否降级。

这里的逻辑简单,就是判断监听事件是否属于 ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION 配置属性,如果是,直接更新 disable 值。

如上,disable = true 时,不执行全局事务与全局锁。

io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary

在 Spring AOP 进行 wrap 逻辑过程中,当前配置中心将订阅降级事件监听器。