经过前面的学习,我们知道,为资源配置各种规则可使用 Sentinel 提供的各种规则对应的 loadRules API,但这种以编码的方式配置规则很难实现动态修改。但基于 Sentinel 提供的各种规则对应的 loadRules API,我们可以自己实现规则的动态更新,而这一功能几乎在每个需要使用 Sentinel 的微服务项目中都需要实现一遍。Sentinel 也考虑到了这点,所以提供了动态数据源接口,并且提供了多种动态数据源的实现,尽管我们可能不会用到。
动态数据源作为扩展功能放在 sentinel-extension 模块下,前面我们学习的热点参数限流模块 sentinel-parameter-flow-control 也是在该模块下。在 1.7.1 版本,sentinel-extension 模块下的子模块除 sentinel-parameter-flow-control、sentinel-annotation-aspectj 之外,其余子模块都是实现动态数据源的模块。
显然,sentinel-datasource-extension 模块才是我们主要研究的模块,这是 Sentinel 实现动态数据源的核心。
SentinelProperty 是 Sentinel 提供的一个接口,可注册到 Sentinel 提供的各种规则的 Manager,例如 FlowRuleManager,并且可以给 SentinelProperty 添加监听器,在配置改变时,你可以调用 SentinelProperty#updateValue 方法,由它负责调用监听器去更新规则,而不需要调用 FlowRuleManager#loadRules 方法。同时,你也可以注册额外的监听器,在配置改变时做些别的事情。
SentinelProperty 并非 sentinel-datasource-extension 模块中定义的接口,而是 sentinel-core 定义的接口,其源码如下:
public interface SentinelProperty<T> {
void addListener(PropertyListener<T> listener);
void removeListener(PropertyListener<T> listener);
boolean updateValue(T newValue);
}
默认使用的实现类是 DynamicSentinelProperty,其实现源码如下(有删减):
public class DynamicSentinelProperty<T> implements SentinelProperty<T> {
// 存储注册的监听器
protected Set<PropertyListener<T>> listeners = Collections.synchronizedSet(new HashSet<PropertyListener<T>>());
@Override
public void addListener(PropertyListener<T> listener) {
listeners.add(listener);
listener.configLoad(value);
}
@Override
public void removeListener(PropertyListener<T> listener) {
listeners.remove(listener);
}
@Override
public boolean updateValue(T newValue) {
for (PropertyListener<T> listener : listeners) {
listener.configUpdate(newValue);
}
return true;
}
}
可见,DynamicSentinelProperty 使用 Set 存储已注册的监听器,updateValue 负责通知所有监听器,调用监听器的 configUpdate 方法。
在前面分析 FlowRuleManager 时,我们只关注了其 loadRules 方法,除了使用 loadRules 方法加载规则配置之外,FlowRuleManager 还提供 registerProperty API,用于注册 SentinelProperty。
使用 SentinelProperty 实现加载 FlowRule 的步骤如下:
FlowRuleManager 支持使用 SentinelProperty 加载或更新限流规则的实现源码如下:
public class FlowRuleManager {
// 缓存限流规则
private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();
// PropertyListener 监听器
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
// SentinelProperty
private static SentinelProperty<List<FlowRule>> currentProperty
// 提供默认的 SentinelProperty
= new DynamicSentinelProperty<List<FlowRule>>();
static {
// 给默认的 SentinelProperty 注册监听器(FlowPropertyListener)
currentProperty.addListener(LISTENER);
}
// 注册 SentinelProperty
public static void register2Property(SentinelProperty<List<FlowRule>> property) {
synchronized (LISTENER) {
currentProperty.removeListener(LISTENER);
// 注册监听器
property.addListener(LISTENER);
currentProperty = property;
}
}
}
实现更新限流规则缓存的 FlowPropertyListener 是 FlowRuleManager 的一个内部类,其源码如下:
private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
@Override
public void configUpdate(List<FlowRule> value) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
// 先清空缓存再写入
flowRules.clear();
flowRules.putAll(rules);
}
}
@Override
public void configLoad(List<FlowRule> conf) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);
if (rules != null) {
flowRules.clear();
flowRules.putAll(rules);
}
}
}
PropertyListener 接口定义的两个方法:
所以,现在我们有两种方法更新限流规则:
Sentinel 将读和写数据源抽离成两个接口,一开始只有读接口,写接口是后面才加的功能,目前来看,写接口只在热点参数限流模块中使用到。事实上,使用读接口就已经满足我们的需求。ReadableDataSource 接口的定义如下:
public interface ReadableDataSource<S, T> {
T loadConfig() throws Exception;
S readSource() throws Exception;
SentinelProperty<T> getProperty();
void close() throws Exception;
}
ReadableDataSource 是一个泛型接口,参数化类型 S 代表用于装载从数据源读取的配置的类型,参数化类型 T 代表对应 Sentinel 中的规则类型。例如,我们可以定义一个 FlowRuleProps 类,用于装载从 yml 配置文件中读取的限流规则配置,然后再将 FlowRuleProps 转为 FlowRule,所以 S 可以替换为 FlowRuleProps,T 可以替换为 List<FlowRule>
。
ReadableDataSource 接口定义的方法解释说明如下:
如果动态数据源提供 SentinelProperty,则可以调用 getProperty 方法获取动态数据源的 SentinelProperty,将 SentinelProperty 注册给规则管理器(XxxManager),动态数据源在读取到配置时就可以调用自身 SentinelProperty 的 updateValue 方法通知规则管理器(XxxManager)更新规则。
AbstractDataSource 是一个抽象类,该类实现 ReadableDataSource 接口,用于简化具体动态数据源的实现,子类只需要继承 AbstractDataSource 并实现 readSource 方法即可。AbstractDataSource 源码如下:
public abstract class AbstractDataSource<S, T> implements ReadableDataSource<S, T> {
protected final Converter<S, T> parser;
protected final SentinelProperty<T> property;
public AbstractDataSource(Converter<S, T> parser) {
if (parser == null) {
throw new IllegalArgumentException("parser can't be null");
}
this.parser = parser;
this.property = new DynamicSentinelProperty<T>();
}
@Override
public T loadConfig() throws Exception {
return loadConfig(readSource());
}
public T loadConfig(S conf) throws Exception {
T value = parser.convert(conf);
return value;
}
@Override
public SentinelProperty<T> getProperty() {
return property;
}
}
从源码可以看出:
Converter 接口的定义如下:
public interface Converter<S, T> {
T convert(S source);
}
我们项目中并未使用 Sentinel 提供的任何一种动态数据源的实现,而是选择自己实现数据源,因为我们项目是部署在 Kubernetes 集群上的,我们可以利用 ConfigMap 资源存储限流、熔断降级等规则。Spring Cloud Kubernetes 提供了 Spring Cloud 动态配置接口的实现,因此,我们不需要关心怎么去读取 ConfigMap 资源。就相当于基于 Spring Cloud 动态配置实现 Sentinel 规则动态数据源。Spring Cloud 动态配置如何使用这里不做介绍。
以实现 FlowRule 动态配置为例,其步骤如下。
第一步:定义一个用于装载动态配置的类,如 FlowRuleProps 所示。
@Component
@RefreshScope
@ConfigurationProperties(prefix = "sentinel.flow-rules")
public class FlowRuleProps{
//....省略
}
第二步:创建数据转换器,实现将 FlowRuleProps 转为 List<FlowRule>
,如 FlowRuleConverter 所示。
public class FlowRuleConverter implements Converter<FlowRuleProps, List<FlowRule>>{
@Override
public List<FlowRule> convert(FlowRuleProps source){
//....省略
}
}
第三步:创建 FlowRuleDataSource,继承 AbstractDataSource,实现 readSource 方法。readSource 方法只需要获取 FlowRuleProps 返回即可,代码如下。
@Component
public class FlowRuleDataSource extends AbstractDataSource<FlowRuleProps, List<FlowRule>>{
@Autowired
private FlowRuleProps flowRuleProps;
public FlowRuleDataSource() {
super(new FlowRuleConverter());
}
@Override
public FlowRuleProps readSource() throws Exception {
return this.flowRuleProps;
}
@Override
public void close() throws Exception {
}
}
第四步:增强 FlowRuleDataSource,让 FlowRuleDataSource 能够监听到 FlowRuleProps 配置改变。
@Component
public class FlowRuleDataSource extends AbstractDataSource<FlowRuleProps, List<FlowRule>>
implements ApplicationListener<RefreshScopeRefreshedEvent>,
InitializingBean{
// .....
@Override
public void onApplicationEvent(RefreshScopeRefreshedEvent event) {
getProperty().updateValue(loadConfig());
}
@Override
public void afterPropertiesSet() throws Exception {
onApplicationEvent(new RefreshScopeRefreshedEvent());
}
}
第五步:写一个 ApplicationRunner 类,在 Spring 容器刷新完成后, 将数据源(FlowRuleDataSource)的 SentinelProperty 注册给 FlowRuleManager,代码如下。
@Component
public class FlowRuleDataSourceConfiguration implements ApplicationRunner{
@Autowired
private FlowRuleDataSource flowRuleDataSource;
@Override
public void run(ApplicationArguments args) throws Exception {
// 将数据源的 SentinelProperty 注册给 FlowRuleManager
FlowRuleManager.register2Property(flowRuleDataSource.getProperty());
}
}
在调用 FlowRuleManager#register2Property 方法将 FlowRuleDataSource 动态数据源的 SentinelProperty 注册给 FlowRuleManager 时,FlowRuleManager 会自动给该 SentinelProperty 注册一个监听器(FlowPropertyListener)。
到此,一个基于 Spring Cloud 动态配置的限流规则动态数据源就已经完成,整个调用链路如下:
List<FlowRule>
对象;了解 Sentinel 实现动态数据源的原理后,我们可以灵活地自定义规则动态数据源,例如本篇介绍的,利用 Kubernetes 的 ConfigMap 资源存储规则配置,并通过 Spring Cloud 动态配置实现 Sentinel 的规则动态数据源。不仅如此,Sentinel 实现动态数据源的整体框架的设计也是值得我们学习的,如数据转换器、监听器。