黑白名单过滤是使用最为广泛的一种过滤规则,例如,用于实现接口安全的 IP 黑白名单规则过滤,用于防骚扰的短信、来电拦截黑白名单过滤。所以 Sentinel 中的黑白名单限流并不难理解,如果配置了黑名单,且请求来源存在黑名单中,则拦截(拒绝)请求,如果配置了白名单,且请求来源存在白名单中则放行。Sentinel 不支持一个黑白名单规则同时配置黑名单和白名单,因此不存优先级的问题。
黑白名单过滤功能更像是一种授权机制,它简单的将权限分为有权限和无权限两种情况,如果支持冲突,可使用优先级策略解决冲突问题。Sentinel 把黑白名作为授权策略,实现黑白名单限流即实现授权限流。Sentinel 在命名上也是使用 Authority,而非 BlackWhiteList。
一些关键类说明:
授权规则(AuthorityRule)是 Sentinel 中最易于理解的一种规则,AuthorityRule 的配置项如下:
public class AuthorityRule extends AbstractRule {
private int strategy = RuleConstant.AUTHORITY_WHITE;
}
当 strategy 配置为 AUTHORITY_WHITE 时,limitApp 即为白名单;当 strategy 配置为 AUTHORITY_BLACK 时,limitApp 即为黑明单。例如:
AuthorityRule rule = new AuthorityRule();
// 资源名称
rule.setResource("GET:/hello");
// 白名单策略
rule.setStrategy(RuleConstant.AUTHORITY_WHITE);
// 白名单
rule.setLimitApp("serviceA,serviceC");
AuthorityRuleManager.loadRules(Collections.singletonList(rule));
上述规则用于限制资源 “GET:/hello” 只允许服务 A 和服务 C 访问。
在使用默认的 SlotChainBuilder 情况下,AuthoritySlot 被放在 SystemSlot、FlowSlot、DegradeSlot 的前面,其优先级更高。
原因之一是授权限流不需要使用统计的指标数据,另一个原因则是提升性能,在未授权的情况下没必要判断是否需要熔断、系统负载能否接住这个请求、QPS 是否过高等,这与用户授权功能是一样的道理,未登陆无需判断是否有权限访问某个资源。
AuthoritySlot 的实现源码如下:
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
checkBlackWhiteAuthority(resourceWrapper, context);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
// (1)
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
// (2)
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
// (3)
for (AuthorityRule rule : rules) {
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
}
AuthorityRuleChecker 负责实现黑白名单的过滤逻辑,其 passCheck 方法源码如下:
static boolean passCheck(AuthorityRule rule, Context context) {
// 获取来源
String requester = context.getOrigin();
// 来源为空,或者来源等于规则配置的 limitApp 则拦截请求
if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {
return true;
}
// 字符串查找,这一步起到快速过滤的作用,提升性能
int pos = rule.getLimitApp().indexOf(requester);
boolean contain = pos > -1;
// 存在才精确匹配
if (contain) {
boolean exactlyMatch = false;
// 分隔数组
String[] appArray = rule.getLimitApp().split(",");
for (String app : appArray) {
if (requester.equals(app)) {
// 标志设置为 true
exactlyMatch = true;
break;
}
}
contain = exactlyMatch;
}
// 策略
int strategy = rule.getStrategy();
// 如果是黑名单,且来源存在规则配置的黑名单中
if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {
return false;
}
// 如果是白名单,且来源不存在规则配置的白名单中
if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {
return false;
}
return true;
}
整个方法都比较简单,首先是从当前 Context 获取调用来源的名称,只有在调用来源不为空且规则配置了黑名单或者白名单的情况下,才会走黑白名单的过滤逻辑,这也说明,要实现黑白名单限流的前提是,每个服务消费端在发起请求时都必须要携带自身服务的名称,这取决于 Sentinel 主流框架适配器;其次,Sentinel 通过使用 indexOf 先简单匹配一次黑名单或白名单,再切割黑名单或白名单数组实现精确匹配,这有助于提升性能;如果当前请求来源存在名单中,则根据策略判断这份名称是黑名单还是白名单,再决定是否需要拒绝请求。
热点参数限流并非在 Sentinel 的 core 模块中实现的,但也是非常实用的一种限流方式。并且,Sentinel 支持 API Gateway 网关限流也是基于参数限流实现的,了解热点参数限流的实现原理,也有助于我们更好地理解网关限流。
参数限流,即根据方法调用传递的参数实现限流,又或者说是根据接口的请求参数限流;热点参数限流,即针对访问频繁的参数限流。
例如,都是调用一个下单接口,但购买的商品不同,比如主播带货的商品下单流量较大,而一般商品购买量很少,同时因为商品数量有限,不太可能每个下单请求都能购买成功,如果能实现根据客户端请求传递的商品 ID 实现限流,将流量控制在商品的库存总量左右,并且使用 QPS 限流等兜底,这种有针对性的限流将接口通过的有效流量最大化。
热点参数限流功能在 Sentinel 源码的扩展功能模块为 sentinel-extension,子模块为 sentinel-parameter-flow-control。
热点参数限流使用的指标数据不再是 core 模块中统计的指标数据,而是重新实现了一套指标数据统计功能,依旧是基于滑动窗口。
与 core 模块的 MetricBucket 实现不同,MetricBucket 只统计每个指标的数值,而 ParamMapBucket 需要统计每个指标、参数的每种取值的数值,MetricBucket 更像是 Redis 中的 String 结构,而 ParamMapBucket 更像 Redis 中的 Hash 结构。
ParamMapBucket 的源码如下:
public class ParamMapBucket {
// 数组类型为 CacheMap<Object, AtomicInteger>
private final CacheMap<Object, AtomicInteger>[] data;
public ParamMapBucket() {
this(DEFAULT_MAX_CAPACITY);
}
public ParamMapBucket(int capacity) {
RollingParamEvent[] events = RollingParamEvent.values();
// 根据需要统计的指标数据创建数组
this.data = new CacheMap[events.length];
// RollingParamEvent 可取值为 REQUEST_PASSED、REQUEST_BLOCKED
for (RollingParamEvent event : events) {
data[event.ordinal()] = new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(capacity);
}
}
}
CacheMap<Object, AtomicInteger>
,下标为 0 存储的是统计请求通过的指标数据,下标为 1 统计的是请求被拒绝的指标数据。CacheMap<Object, AtomicInteger>
:key 为参数的取值,例如商品的 ID,value 才是指标数值。HotParameterLeapArray 继承 LeapArray,即实现滑动窗口。ParamMapBucket 不存储窗口时间信息,窗口时间信息依然由 WindowWrap 存储,HotParameterLeapArray 使用 WindowWrap 包装 ParamMapBucket。
笔者也是看了 HotParameterLeapArray 之后才明白为什么 Sentienl 将滑动窗口抽象为 LeapArray,这为扩展实现收集自定义指标数据的滑动窗口提供了支持。
HotParameterLeapArray 的提供的几个 API 如下:
public class HotParameterLeapArray extends LeapArray<ParamMapBucket> {
//.....
public void addValue(RollingParamEvent event, int count, Object value) {
// ....
}
public Map<Object, Double> getTopValues(RollingParamEvent event, int number) {
// .....
}
public long getRollingSum(RollingParamEvent event, Object value) {
// .....
}
public double getRollingAvg(RollingParamEvent event, Object value) {
// ....
}
}
可见,如果是分钟级的滑动窗口,一分内参数的取值越多,其占用的内存就越多。
两个需要重点关注的类:
ParameterMetric 有三个静态字段,源码如下:
public class ParameterMetric {
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();
private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();
}
ParameterMetricStorage 使用 ConcurrentHashMap 缓存每个资源对应的 ParameterMetric,只会为配置了参数限流规则的资源创建 ParameterMetric。其部份源码如下所示:
public final class ParameterMetricStorage {
private static final Map<String, ParameterMetric> metricsMap = new ConcurrentHashMap<>();
private static final Object LOCK = new Object();
public static void initParamMetricsFor(ResourceWrapper resourceWrapper,ParamFlowRule rule) {
if (resourceWrapper == null || resourceWrapper.getName() == null) {
return;
}
String resourceName = resourceWrapper.getName();
ParameterMetric metric;
// 双重检测,线程安全,为资源创建全局唯一的 ParameterMetric
if ((metric = metricsMap.get(resourceName)) == null) {
synchronized (LOCK) {
if ((metric = metricsMap.get(resourceName)) == null) {
metric = new ParameterMetric();
metricsMap.put(resourceWrapper.getName(), metric);
}
}
}
// 初始化 ParameterMetric
metric.initialize(rule);
}
}
initParamMetricsFor 方法用于为资源创建 ParameterMetric 并初始化,该方法在资源被访问时由 ParamFlowSlot 调用,并且该方法只在为资源配置了参数限流规则的情况下被调用。
sentinel-parameter-flow-control 模块通过 Java SPI 注册自定义的 SlotChainBuilder,即注册 HotParamSlotChainBuilder,将 ParamFlowSlot 放置在 StatisticSlot 的后面,这个 ParamFlowSlot 就是实现热点参数限流功能的切入点。
public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
既然是参数限流,那么肯定是需要能够拿到参数了,而 ProcessorSlot#entry 方法的最后一个参数就是请求传递过来的参数,通过 SphU#entry 方法一层层往下传递。例如:
@GetMapping("/hello")
public String apiHello(String name) throws BlockException {
ContextUtil.enter("my_context");
Entry entry = null;
try {
entry = SphU.entry("GET:/hello", EntryType.IN,1,name);
doBusiness();
return "Hello!";
} catch (Exception e) {
if (!(e instanceof BlockException)) {
Tracer.trace(e);
}
throw e;
} finally {
if (entry != null) {
entry.exit(1);
}
ContextUtil.exit();
}
}
当 SphU#entry 调用到 ParamFlowSlot#entry 方法时,ParamFlowSlot 调用 checkFlow 方法判断是否需要限流。checkFlow 方法的实现如下:
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
//(1)
if (args == null) {
return;
}
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
return;
}
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
//(2)
for (ParamFlowRule rule : rules) {
applyRealParamIdx(rule, args.length);
// Initialize the parameter metrics.
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
String triggeredParam = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
triggeredParam = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
在阅读 ParamFlowChecker#passCheck 方法的源码之前,我们需要先了解参数限流规则的配置,了解每个配置项的作用。
参数限流规则 ParamFlowRule 的源码如下(有删减):
public class ParamFlowRule extends AbstractRule {
private int grade = RuleConstant.FLOW_GRADE_QPS;
private double count;
private Integer paramIdx;
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
private int maxQueueingTimeMs = 0;
private long durationInSec = 1;
private int burstCount = 0;
}
public String apiHello(String name)
,该方法只有一个参数,索引为 0 对应 name 参数。假设需要针对资源“GET:/hello”的 name 参数限流,当 name 取值为“jackson”时限流 QPS 阈值为 5,则配置如下:
ParamFlowRule rule = new ParamFlowRule();
// 资源为/hello
rule.setResource("GET:/hello");
// 索引 0 对应的参数为 name
rule.setParamIdx(0);
// qps 限流
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// 阈值为 5
rule.setCount(5);
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
以此为例,我们分析 ParamFlowChecker#passCheck 方法源码,passCheck 返回 true 表示放行,返回 false 表示拒绝。
ParamFlowChecker#passCheck 方法源码如下:
public static boolean passCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object... args) {
if (args == null) {
return true;
}
// 判断参数索引是否合法
int paramIdx = rule.getParamIdx();
if (args.length <= paramIdx) {
return true;
}
// 获取参数值,如果值为空则允许通过
Object value = args[paramIdx];
if (value == null) {
return true;
}
// 集群限流
if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
return passClusterCheck(resourceWrapper, rule, count, value);
}
// 单机限流
return passLocalCheck(resourceWrapper, rule, count, value);
}
我们先不讨论集群限流情况,仅看单机本地限流情况。passLocalCheck 方法的源码如下:
private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object value) {
try {
// 基本数据类型
if (Collection.class.isAssignableFrom(value.getClass())) {
for (Object param : ((Collection)value)) {
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
}
// 数组类
else if (value.getClass().isArray()) {
int length = Array.getLength(value);
for (int i = 0; i < length; i++) {
Object param = Array.get(value, i);
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
}
// 引用类型
else {
return passSingleValueCheck(resourceWrapper, rule, count, value);
}
} catch (Throwable e) {
}
return true;
}
由于参数可能是基本数据类型,也可能是数组类型,或者引用类型,所以 passLocalCheck 方法分三种情况处理。我们只讨论其中一种情况,其它情况的实现类似。
以资源“GET:/hello”为例,其方法 apiHello 的 name 参数为 String 类型,因此会调用 passSingleValueCheck 方法,该方法源码如下:
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,Object value) {
//(1)
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
} else {
return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
}
}
// (2)
else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
if (exclusionItems.contains(value)) {
int itemThreshold = rule.getParsedHotItems().get(value);
return ++threadCount <= itemThreshold;
}
long threshold = (long)rule.getCount();
return ++threadCount <= threshold;
}
return true;
}
你可能好奇,并行占用线程总数是在哪里自增和自减的呢?
这是由 ParamFlowStatisticEntryCallback 与 ParamFlowStatisticExitCallback 这两个 Callback 实现的,分别在 StatisticSlot 的 entry 方法和 exit 方法中被回调执行,这是我们前面分析 StatisticSlot 源码时故意遗漏的细节。
1. 快速失败
快速失败基于令牌桶算法实现。passDefaultLocalCheck 方法控制每个时间窗口只生产一次令牌,将令牌放入令牌桶,每个请求都从令牌桶中取走令牌,当令牌足够时放行,当令牌不足时直接拒绝。ParameterMetric#tokenCounters 用作令牌桶,timeCounters 存储最近一次生产令牌的时间。
passDefaultLocalCheck 方法源码如下:
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
//(1)
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
if (tokenCounters == null || timeCounters == null) {
return true;
}
// (2)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
// (3)
long maxCount = tokenCount + rule.getBurstCount();
if (acquireCount > maxCount) {
return false;
}
while (true) {
// (4)
long currentTime = TimeUtil.currentTimeMillis();
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null) {
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}
//(5)
long passTime = currentTime - lastAddTokenTime.get();
if (passTime > rule.getDurationInSec() * 1000) {
// 确保非 NULL
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
lastAddTokenTime.set(currentTime);
return true;
} else {
//(6)
long restQps = oldQps.get();
// 计算需要新增的令牌数,根据时间间隔、限流阈值、窗口时间计算
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
// 计算新的令牌总数,并立即使用(扣减 acquireCount 个令牌)
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
if (newQps < 0) {
return false;
}
if (oldQps.compareAndSet(restQps, newQps)) {
lastAddTokenTime.set(currentTime);
return true;
}
Thread.yield();
}
} else {
// (7)
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
long oldQpsValue = oldQps.get();
// 令牌是否足够
if (oldQpsValue - acquireCount >= 0) {
// 从令牌桶中取走令牌
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
return true;
}
} else {
return false;
}
}
Thread.yield();
}
}
}
2. 匀速排队
与 RateLimiterController 实现原理一样,passThrottleLocalCheck 方法让请求在虚拟队列中排队,控制请求通过的时间间隔,该时间间隔通过阈值与窗口时间大小计算出来,如果当前请求计算出来的排队等待时间大于限流规则指定的 maxQueueingTimeMs,则拒绝当前请求。
passThrottleLocalCheck 方法源码如下:
static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,Object value) {
//(1)
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule);
if (timeRecorderMap == null) {
return true;
}
// (2)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
//(3)
long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
while (true) {
//(4)
long currentTime = TimeUtil.currentTimeMillis();
AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime));
if (timeRecorder == null) {
return true;
}
long lastPassTime = timeRecorder.get();
// 计算当前请求的期望通过时间,最近一次请求的期望通过时间 + 请求通过的时间间隔
long expectedTime = lastPassTime + costTime;
//(5)
if (expectedTime <= currentTime
|| expectedTime - currentTime < rule.getMaxQueueingTimeMs()) {
AtomicLong lastPastTimeRef = timeRecorderMap.get(value);
if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) {
long waitTime = expectedTime - currentTime;
if (waitTime > 0) {
lastPastTimeRef.set(expectedTime);
try {
TimeUnit.MILLISECONDS.sleep(waitTime);
} catch (InterruptedException e) {
RecordLog.warn("passThrottleLocalCheck: wait interrupted", e);
}
}
return true;
} else {
Thread.yield();
}
} else {
return false;
}
}
}
黑白名单限流的实现相对简单,热点参数限流的实现相对复杂。热点参数限流自己实现了一个滑动窗口用于收集指标数据,但该滑动窗口并未被使用,而是使用 ParameterMetric 与 ParameterMetricStorage,这应该是出于性能的考虑。热点参数限流对性能的影响和对内存的占用与参数的取值有多少种可能成正比,限流参数的取值可能性越多,占用的内存就越大,对性能的影响也就越大,在使用热点参数限流功能时,一定要考虑参数的取值。
例如,根据商品 ID 限流,如果有十万个商品下单,那么 CacheMap 将会存在十万个 key-value,并且不会被移除,随着进程运行的时长而增长。如果限流阈值类型选择为 THREAD 则不会存在这个问题,因为在 ParamFlowStatisticExitCallback 方法会调用 ParameterMetric#decreaseThreadCount 方法扣减参数值占用的线程数,当线程数为零时,会将当前参数值对应的 key-value 从 CacheMap 中移除。