目录
Pt1 核心类解析
Pt1.1 Resource
ResourceWrapper
StringResourceWrapper
Pt1.2 Entry
Entry
CtEntry
Pt1.3 Context
Pt1.4 Node
Node
StatisticNode
DefaultNode
ClusterNode
EntranceNode
Pt1.5 Slot
NodeSelectorSlot
ClusterBuilderSlot
LogSlot
StatisticSlot
AuthoritySlot
SystemSlot
FlowSlot
DegradeSlot
Pt1.6 SlotChain
构建SlotChain
Pt2 滑动窗口
Pt2.1 MetricEvent
Pt2.2 MetricBucket
Pt2.3 WindowWrap
Pt2.4 LeapArray
Pt2.5 ArrayMetric
Pt3 限流源码逻辑
Pt3.1 示例FlowQpsDemo
Pt3.2 代码流程说明
Pt3.3 定义并加载规则
Pt3.4 限流规则校验Sph*#entry()
Pt3.5 限流处置
Pt3.6 释放资源
Sentinel的核心类主要有如下几个:
Resource:标识资源,他可能是一段代码逻辑,限流规则的主要目标;
Entry:每个Request请求会生成一个Entry,Entry包含了Sentinel核心的处理信息,Entry会对应一个Resource;
Context:存放Entry执行的上下文信息,Context是ThreadLocal的,意味着一个Request处理完成、线程结束后,会释放Context;
SlotChain:Entry会有一个SlotChain对象,实际上是ProcessorSlotChain,保存了当前请求的核心处理节点信息,是最核心的数据;
Slot:SlotChain存储的是Slot对象,他们组成责任链模式来完成Sentinel核心处理逻辑。Sentinel提供了几个标准的Slot对象,包含了统计类、规则校验类的处理逻辑;
Node:每个Slot都有一个Node对象,存放的跟当前资源相关的统计信息。对资源来说,各个Slot存放的是统计或者规则校验逻辑而Node存放的是结果,统计结果。
Resource定义了Sentinel资源,类结构非常简单。
ResourceWrapper抽象父类定义了相关属性;
StringResourceWrapper实现类定义了资源的构造方法;
ResourceWrapper抽象父类定义了公共属性,覆写了hash和equals方法。
public abstract class ResourceWrapper { protected final String name; protected final EntryType entryType; protected final int resourceType; // 构造方法 public ResourceWrapper(String name, EntryType entryType, int resourceType) { AssertUtil.notEmpty(name, "resource name cannot be empty"); AssertUtil.notNull(entryType, "entryType cannot be null"); this.name = name; this.entryType = entryType; this.resourceType = resourceType; } // ... setters and getters... // ResourceWrapper类覆写hashCode和equals方法,使用name属性进行比较,这点非常重要。 // SlotChain和Rules信息使用Resource对象作为KEY存储在内存中,实际是和name属性相关。 @Override public int hashCode() { return getName().hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof ResourceWrapper) { ResourceWrapper rw = (ResourceWrapper)obj; return rw.getName().equals(getName()); } return false; } }
每一次资源调用都会创建一个 Entry。Entry 包含了资源名、curNode(当前统计节点)、originNode(来源统计节点)等信息。
CtEntry 为普通的 Entry,在调用 SphU.entry(xxx) 的时候创建。特性:Linked entry within current context(内部维护着 parent 和 child)
需要注意的一点:CtEntry 构造函数中会做调用链的变换,即将当前 Entry 接到传入 Context 的调用链路上(setUpEntryFor)。
资源调用结束时需要 entry.exit()。exit 操作会过一遍 slot chain exit,恢复调用栈,exit context 然后清空 entry 中的 context 防止重复调用。
类结构也比较简单:抽象父类Entry和实现类CTEntry。
Context 代表调用链路上下文,贯穿一次调用链路中的所有 Entry。Context 维持着入口节点(entranceNode)、本次调用链路的 curNode、调用来源(origin)等信息。Context 名称即为调用链路入口名称。
Context 维持的方式:通过 ThreadLocal 传递,只有在入口 enter 的时候生效。由于 Context 是通过 ThreadLocal 传递的,因此对于异步调用链路,线程切换的时候会丢掉 Context,因此需要手动通过 ContextUtil.runOnContext(context, f) 来变换 context。
Context
// 存储当前请求的调用信息(ThreadLocal线程安全)。 // 1.每一个Entry都会有一个Context对象,可以使用ContextUtil#enter()来生成,否则则会使用默认的Context。 // 2.在同一个Context多次调用entry会生成调用链树。 // 3.同一个资源在不同的Context调用时,会在NodeSelectorSlot分开进行统计。 public class Context { // 上下文的名称 private final String name; // 当前请求树的入口节点(当前请求调用链的根节点) private DefaultNode entranceNode; // 当前请求的Entry对象 private Entry curEntry; // 区分调用者,比如服务调用方名称或者IP private String origin = ""; private final boolean async; /** * Create a new async context. * * @param entranceNode entrance node of the context * @param name context name * @return the new created context * @since 0.2.0 */ public static Context newAsyncContext(DefaultNode entranceNode, String name) { return new Context(name, entranceNode, true); } public Context(DefaultNode entranceNode, String name) { this(name, entranceNode, false); } public Context(String name, DefaultNode entranceNode, boolean async) { this.name = name; this.entranceNode = entranceNode; this.async = async; } public boolean isAsync() { return async; } public String getName() { return name; } public Node getCurNode() { return curEntry == null ? null : curEntry.getCurNode(); } public Context setCurNode(Node node) { this.curEntry.setCurNode(node); return this; } public Entry getCurEntry() { return curEntry; } public Context setCurEntry(Entry curEntry) { this.curEntry = curEntry; return this; } public String getOrigin() { return origin; } public Context setOrigin(String origin) { this.origin = origin; return this; } public double getOriginTotalQps() { return getOriginNode() == null ? 0 : getOriginNode().totalQps(); } public double getOriginBlockQps() { return getOriginNode() == null ? 0 : getOriginNode().blockQps(); } public double getOriginPassReqQps() { return getOriginNode() == null ? 0 : getOriginNode().successQps(); } public double getOriginPassQps() { return getOriginNode() == null ? 0 : getOriginNode().passQps(); } public long getOriginTotalRequest() { return getOriginNode() == null ? 0 : getOriginNode().totalRequest(); } public long getOriginBlockRequest() { return getOriginNode() == null ? 0 : getOriginNode().blockRequest(); } public double getOriginAvgRt() { return getOriginNode() == null ? 0 : getOriginNode().avgRt(); } public int getOriginCurThreadNum() { return getOriginNode() == null ? 0 : getOriginNode().curThreadNum(); } public DefaultNode getEntranceNode() { return entranceNode; } /** * Get the parent {@link Node} of the current. * * @return the parent node of the current. */ public Node getLastNode() { if (curEntry != null && curEntry.getLastNode() != null) { return curEntry.getLastNode(); } else { return entranceNode; } } public Node getOriginNode() { return curEntry == null ? null : curEntry.getOriginNode(); } @Override public String toString() { return "Context{" + "name='" + name + '\'' + ", entranceNode=" + entranceNode + ", curEntry=" + curEntry + ", origin='" + origin + '\'' + ", async=" + async + '}'; } }
类结构
Sentinel 里面的各种种类的统计节点:
Node:所有Node的父接口,定义了获取实时统计数据的方法。
StatisticNode:最为基础的统计节点,包含秒级和分钟级两个滑动窗口结构。
DefaultNode:链路节点,用于统计调用链路上某个资源的数据,维持树状结构。
ClusterNode:簇点,用于统计每个资源全局的数据(不区分调用链路),以及存放该资源的按来源区分的调用数据(类型为 StatisticNode)。特别地,Constants.ENTRY_NODE 节点用于统计全局的入口资源数据。
EntranceNode:入口节点,特殊的链路节点,对应某个 Context 入口的所有调用数据。Constants.ROOT 节点也是入口节点。
构建的时机:
EntranceNode 在 ContextUtil.enter(xxx) 的时候就创建了,然后塞到 Context 里面。
NodeSelectorSlot:根据 context 创建 DefaultNode,然后 set curNode to context。
ClusterBuilderSlot:首先根据 resourceName 创建 ClusterNode,并且 set clusterNode to defaultNode;然后再根据 origin 创建来源节点(类型为 StatisticNode),并且 set originNode to curEntry。
几种 Node 的维度(数目):
ClusterNode 的维度是 resource
DefaultNode 的维度是 resource * context,存在每个 NodeSelectorSlot 的 map 里面
EntranceNode 的维度是 context,存在 ContextUtil 类的 contextNameNodeMap 里面
来源节点(类型为 StatisticNode)的维度是 resource * origin,存在每个 ClusterNode 的 originCountMap 里面
最为基础的统计节点,包含秒级和分钟级两个滑动窗口结构。
package com.alibaba.csp.sentinel.node; import com.alibaba.csp.sentinel.node.metric.MetricNode; import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; import com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric; import com.alibaba.csp.sentinel.slots.statistic.metric.Metric; import com.alibaba.csp.sentinel.util.TimeUtil; import com.alibaba.csp.sentinel.util.function.Predicate; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * StatisticNode对象保存3中类型的实时统计指标数据: * 1.以秒为单位的统计指标; * 2.以分钟为单位的统计指标; * 3.当前并发线程数; * * Sentinel使用滑动窗口来记录和统计资源相关的实时指标数据,滑动窗口是基于LeapArray的数据结构。 * * <p> * case 1: When the first request comes in, Sentinel will create a new window bucket of * a specified time-span to store running statics, such as total response time(rt), * incoming request(QPS), block request(bq), etc. And the time-span is defined by sample count. * </p> * <pre> * 0 100ms * +-------+--→ Sliding Windows * ^ * | * request * </pre> * <p> * Sentinel use the statics of the valid buckets to decide whether this request can be passed. * For example, if a rule defines that only 100 requests can be passed, * it will sum all qps in valid buckets, and compare it to the threshold defined in rule. * </p> * * <p>case 2: continuous requests</p> * <pre> * 0 100ms 200ms 300ms * +-------+-------+-------+-----→ Sliding Windows * ^ * | * request * </pre> * * <p>case 3: requests keeps coming, and previous buckets become invalid</p> * <pre> * 0 100ms 200ms 800ms 900ms 1000ms 1300ms * +-------+-------+ ...... +-------+-------+ ...... +-------+-----→ Sliding Windows * ^ * | * request * </pre> * * <p>The sliding window should become:</p> * <pre> * 300ms 800ms 900ms 1000ms 1300ms * + ...... +-------+ ...... +-------+-----→ Sliding Windows * ^ * | * request * </pre> */ public class StatisticNode implements Node { // 保存秒钟时间窗口的统计数据 private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); /** * Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds, * meaning each bucket per second, in this way we can get accurate statistics of each second. */ // 保存分钟时间窗口(最近60秒)的统计数据 private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); /** * The counter for thread count. */ // 线程并发数 private LongAdder curThreadNum = new LongAdder(); /** * The last timestamp when metrics were fetched. */ // 最近一次数据指标更新时间 private long lastFetchTime = -1; // 获取所有有效滑动窗口的统计数据 // KEY:滑动窗口的开始时间戳 // VALUE:滑动窗口时间轴内的统计数据 @Override public Map<Long, MetricNode> metrics() { // 获取当前时间戳 long currentTime = TimeUtil.currentTimeMillis(); // 计算当前时间戳对应的秒级滑动窗口的开始时间 currentTime = currentTime - currentTime % 1000; Map<Long, MetricNode> metrics = new ConcurrentHashMap<>(); // rollingCounterInMinute#details()用于获取当前有效的滑动窗口中的统计数据 List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details(); long newLastFetchTime = lastFetchTime; // Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date). for (MetricNode node : nodesOfEverySecond) { // 当前数据节点时效性验证:大于上次数据更新时间 // node.getTimestamp()是当前滑动窗口开始时间 if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) { metrics.put(node.getTimestamp(), node); newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp()); } } lastFetchTime = newLastFetchTime; return metrics; } @Override public List<MetricNode> rawMetricsInMin(Predicate<Long> timePredicate) { return rollingCounterInMinute.detailsOnCondition(timePredicate); } // 数据时效性 private boolean isNodeInTime(MetricNode node, long currentTime) { return node.getTimestamp() > lastFetchTime && node.getTimestamp() < currentTime; } private boolean isValidMetricNode(MetricNode node) { return node.getPassQps() > 0 || node.getBlockQps() > 0 || node.getSuccessQps() > 0 || node.getExceptionQps() > 0 || node.getRt() > 0 || node.getOccupiedPassQps() > 0; } @Override public void reset() { rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); } // ----以下API都是从滑动窗口的数据结构中获取不同类型的统计数据,不作详细解释---- @Override public long totalRequest() { return rollingCounterInMinute.pass() + rollingCounterInMinute.block(); } @Override public long blockRequest() { return rollingCounterInMinute.block(); } @Override public double blockQps() { return rollingCounterInSecond.block() / rollingCounterInSecond.getWindowIntervalInSec(); } @Override public double previousBlockQps() { return this.rollingCounterInMinute.previousWindowBlock(); } @Override public double previousPassQps() { return this.rollingCounterInMinute.previousWindowPass(); } @Override public double totalQps() { return passQps() + blockQps(); } @Override public long totalSuccess() { return rollingCounterInMinute.success(); } @Override public double exceptionQps() { return rollingCounterInSecond.exception() / rollingCounterInSecond.getWindowIntervalInSec(); } @Override public long totalException() { return rollingCounterInMinute.exception(); } @Override public double passQps() { return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec(); } @Override public long totalPass() { return rollingCounterInMinute.pass(); } @Override public double successQps() { return rollingCounterInSecond.success() / rollingCounterInSecond.getWindowIntervalInSec(); } @Override public double maxSuccessQps() { return (double) rollingCounterInSecond.maxSuccess() * rollingCounterInSecond.getSampleCount() / rollingCounterInSecond.getWindowIntervalInSec(); } @Override public double occupiedPassQps() { return rollingCounterInSecond.occupiedPass() / rollingCounterInSecond.getWindowIntervalInSec(); } @Override public double avgRt() { long successCount = rollingCounterInSecond.success(); if (successCount == 0) { return 0; } return rollingCounterInSecond.rt() * 1.0 / successCount; } @Override public double minRt() { return rollingCounterInSecond.minRt(); } @Override public int curThreadNum() { return (int)curThreadNum.sum(); } @Override public void addPassRequest(int count) { rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count); } @Override public void addRtAndSuccess(long rt, int successCount) { rollingCounterInSecond.addSuccess(successCount); rollingCounterInSecond.addRT(rt); rollingCounterInMinute.addSuccess(successCount); rollingCounterInMinute.addRT(rt); } @Override public void increaseBlockQps(int count) { rollingCounterInSecond.addBlock(count); rollingCounterInMinute.addBlock(count); } @Override public void increaseExceptionQps(int count) { rollingCounterInSecond.addException(count); rollingCounterInMinute.addException(count); } // 增加和减少并发线程数 @Override public void increaseThreadNum() { curThreadNum.increment(); } @Override public void decreaseThreadNum() { curThreadNum.decrement(); } @Override public void debug() { rollingCounterInSecond.debug(); } @Override public long tryOccupyNext(long currentTime, int acquireCount, double threshold) { double maxCount = threshold * IntervalProperty.INTERVAL / 1000; long currentBorrow = rollingCounterInSecond.waiting(); if (currentBorrow >= maxCount) { return OccupyTimeoutProperty.getOccupyTimeout(); } int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT; long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL; int idx = 0; /* * Note: here {@code currentPass} may be less than it really is NOW, because time difference * since call rollingCounterInSecond.pass(). So in high concurrency, the following code may * lead more tokens be borrowed. */ long currentPass = rollingCounterInSecond.pass(); while (earliestTime < currentTime) { long waitInMs = idx * windowLength + windowLength - currentTime % windowLength; if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) { break; } long windowPass = rollingCounterInSecond.getWindowPass(earliestTime); if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) { return waitInMs; } earliestTime += windowLength; currentPass -= windowPass; idx++; } return OccupyTimeoutProperty.getOccupyTimeout(); } @Override public long waiting() { return rollingCounterInSecond.waiting(); } @Override public void addWaitingRequest(long futureTime, int acquireCount) { rollingCounterInSecond.addWaiting(futureTime, acquireCount); } @Override public void addOccupiedPass(int acquireCount) { rollingCounterInMinute.addOccupiedPass(acquireCount); rollingCounterInMinute.addPass(acquireCount); } }
链路节点,用于统计调用链路上某个资源的数据,维持树状结构。
DefaultNode 的维度是 resource * context,存在每个 NodeSelectorSlot 的 map 里面。
package com.alibaba.csp.sentinel.node; import java.util.HashSet; import java.util.Set; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.SphO; import com.alibaba.csp.sentinel.SphU; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot; /** * 用于统计调用链路上某个资源的数据,基于Resource * Context维度统计。 * * 多次调用#Entry()时,会生成多个Node节点。 */ public class DefaultNode extends StatisticNode { /** * The resource associated with the node. */ // 资源标识,说明DefaultNode是基于Resource的维度存储的 private ResourceWrapper id; /** * The list of all child nodes. */ // 统计数据Node,多次调用#Entry()时,会生成多个Node节点数据 private volatile Set<Node> childList = new HashSet<>(); /** * Associated cluster node. */ // ClusterNode也是基于Resource维度 private ClusterNode clusterNode; public DefaultNode(ResourceWrapper id, ClusterNode clusterNode) { this.id = id; this.clusterNode = clusterNode; } public ResourceWrapper getId() { return id; } public ClusterNode getClusterNode() { return clusterNode; } public void setClusterNode(ClusterNode clusterNode) { this.clusterNode = clusterNode; } /** * Add child node to current node. * * @param node valid child node */ // 新增Node节点,实际上是将node加入到Set结构中 public void addChild(Node node) { if (node == null) { RecordLog.warn("Trying to add null child to node <{}>, ignored", id.getName()); return; } if (!childList.contains(node)) { synchronized (this) { if (!childList.contains(node)) { Set<Node> newSet = new HashSet<>(childList.size() + 1); newSet.addAll(childList); newSet.add(node); childList = newSet; } } RecordLog.info("Add child <{}> to node <{}>", ((DefaultNode)node).id.getName(), id.getName()); } } /** * Reset the child node list. */ public void removeChildList() { this.childList = new HashSet<>(); } public Set<Node> getChildList() { return childList; } @Override public void increaseBlockQps(int count) { super.increaseBlockQps(count); this.clusterNode.increaseBlockQps(count); } @Override public void increaseExceptionQps(int count) { super.increaseExceptionQps(count); this.clusterNode.increaseExceptionQps(count); } @Override public void addRtAndSuccess(long rt, int successCount) { super.addRtAndSuccess(rt, successCount); this.clusterNode.addRtAndSuccess(rt, successCount); } @Override public void increaseThreadNum() { super.increaseThreadNum(); this.clusterNode.increaseThreadNum(); } @Override public void decreaseThreadNum() { super.decreaseThreadNum(); this.clusterNode.decreaseThreadNum(); } @Override public void addPassRequest(int count) { super.addPassRequest(count); this.clusterNode.addPassRequest(count); } public void printDefaultNode() { visitTree(0, this); } // 打印childList的节点树 private void visitTree(int level, DefaultNode node) { for (int i = 0; i < level; ++i) { System.out.print("-"); } if (!(node instanceof EntranceNode)) { System.out.println( String.format("%s(thread:%s pq:%s bq:%s tq:%s rt:%s 1mp:%s 1mb:%s 1mt:%s)", node.id.getShowName(), node.curThreadNum(), node.passQps(), node.blockQps(), node.totalQps(), node.avgRt(), node.totalRequest() - node.blockRequest(), node.blockRequest(), node.totalRequest())); } else { System.out.println( String.format("Entry-%s(t:%s pq:%s bq:%s tq:%s rt:%s 1mp:%s 1mb:%s 1mt:%s)", node.id.getShowName(), node.curThreadNum(), node.passQps(), node.blockQps(), node.totalQps(), node.avgRt(), node.totalRequest() - node.blockRequest(), node.blockRequest(), node.totalRequest())); } for (Node n : node.getChildList()) { DefaultNode dn = (DefaultNode)n; visitTree(level + 1, dn); } } }
簇点,用于统计每个资源全局的数据(不区分调用链路),以及存放该资源的按来源区分的调用数据(类型为 StatisticNode)。特别地,Constants.ENTRY_NODE 节点用于统计全局的入口资源数据。
ClusterNode 的维度是 resource。
package com.alibaba.csp.sentinel.node; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; import com.alibaba.csp.sentinel.ResourceTypeConstants; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.util.AssertUtil; /** * 簇点,用于统计每个资源全局的数据(不区分调用链路),以及存放该资源的按来源区分的调用数据(类型为 StatisticNode)。 * ClusterNode不区分Context,他是基于Resource维度。 */ public class ClusterNode extends StatisticNode { // 簇点名称 private final String name; private final int resourceType; public ClusterNode(String name) { this(name, ResourceTypeConstants.COMMON); } public ClusterNode(String name, int resourceType) { AssertUtil.notEmpty(name, "name cannot be empty"); this.name = name; this.resourceType = resourceType; } /** * <p>The origin map holds the pair: (origin, originNode) for one specific resource.</p> * <p> * The longer the application runs, the more stable this mapping will become. * So we didn't use concurrent map here, but a lock, as this lock only happens * at the very beginning while concurrent map will hold the lock all the time. * </p> */ private Map<String, StatisticNode> originCountMap = new HashMap<>(); private final ReentrantLock lock = new ReentrantLock(); /** * Get resource name of the resource node. * * @return resource name * @since 1.7.0 */ public String getName() { return name; } /** * Get classification (type) of the resource. * * @return resource type * @since 1.7.0 */ public int getResourceType() { return resourceType; } /** * <p>Get {@link Node} of the specific origin. Usually the origin is the Service Consumer's app name.</p> * <p>If the origin node for given origin is absent, then a new {@link StatisticNode} * for the origin will be created and returned.</p> * * @param origin The caller's name, which is designated in the {@code parameter} parameter * {@link ContextUtil#enter(String name, String origin)}. * @return the {@link Node} of the specific origin */ public Node getOrCreateOriginNode(String origin) { StatisticNode statisticNode = originCountMap.get(origin); if (statisticNode == null) { lock.lock(); try { statisticNode = originCountMap.get(origin); if (statisticNode == null) { // The node is absent, create a new node for the origin. statisticNode = new StatisticNode(); HashMap<String, StatisticNode> newMap = new HashMap<>(originCountMap.size() + 1); newMap.putAll(originCountMap); newMap.put(origin, statisticNode); originCountMap = newMap; } } finally { lock.unlock(); } } return statisticNode; } public Map<String, StatisticNode> getOriginCountMap() { return originCountMap; } }
入口节点,特殊的链路节点,对应某个 Context 入口的所有调用数据。Constants.ROOT 节点也是入口节点。
EntranceNode 的维度是 context,存在 ContextUtil 类的 contextNameNodeMap 里面
package com.alibaba.csp.sentinel.node; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot; /** * 入口节点,特殊的链路节点,对应某个 Context 入口的所有调用数据。 */ public class EntranceNode extends DefaultNode { public EntranceNode(ResourceWrapper id, ClusterNode clusterNode) { super(id, clusterNode); } @Override public double avgRt() { double total = 0; double totalQps = 0; for (Node node : getChildList()) { total += node.avgRt() * node.passQps(); totalQps += node.passQps(); } return total / (totalQps == 0 ? 1 : totalQps); } @Override public double blockQps() { double blockQps = 0; for (Node node : getChildList()) { blockQps += node.blockQps(); } return blockQps; } @Override public long blockRequest() { long r = 0; for (Node node : getChildList()) { r += node.blockRequest(); } return r; } @Override public int curThreadNum() { int r = 0; for (Node node : getChildList()) { r += node.curThreadNum(); } return r; } @Override public double totalQps() { double r = 0; for (Node node : getChildList()) { r += node.totalQps(); } return r; } @Override public double successQps() { double r = 0; for (Node node : getChildList()) { r += node.successQps(); } return r; } @Override public double passQps() { double r = 0; for (Node node : getChildList()) { r += node.passQps(); } return r; } @Override public long totalRequest() { long r = 0; for (Node node : getChildList()) { r += node.totalRequest(); } return r; } @Override public long totalPass() { long r = 0; for (Node node : getChildList()) { r += node.totalPass(); } return r; } }
Slot用于处理统计逻辑和规则判断逻辑,他可以组成单项列表结构的SlotChain,采用SPI方式扩展。
这里画了一张SlotChain调用链上entry的处理逻辑图:
我们逐一分析Slot的实现原理。
这个 slot 主要负责收集资源的路径,并将这些资源的调用路径以树状结构存储起来,用于根据调用路径进行流量控制。
ContextUtil.enter("entrance1", "appA"); Entry nodeA = SphU.entry("nodeA"); if (nodeA != null) { nodeA.exit(); } ContextUtil.exit();上述代码通过 ContextUtil.enter() 创建了一个名为 entrance1 的上下文,同时指定调用发起者为 appA;接着通过SphU.entry()请求一个 token,如果该方法顺利执行没有抛 BlockException,表明 token 请求成功。
以上代码将在内存中生成以下结构:
machine-root / / EntranceNode1 / / DefaultNode(nodeA)注意:每个 DefaultNode 由资源 ID 和输入名称来标识。换句话说,一个资源 ID 可以有多个不同入口的 DefaultNode。
ContextUtil.enter("entrance1", "appA"); Entry nodeA = SphU.entry("nodeA"); if (nodeA != null) { nodeA.exit(); } ContextUtil.exit(); ContextUtil.enter("entrance2", "appA"); nodeA = SphU.entry("nodeA"); if (nodeA != null) { nodeA.exit(); } ContextUtil.exit();以上代码将在内存中生成以下结构:
machine-root / \ / \ EntranceNode1 EntranceNode2 / \ / \ DefaultNode(nodeA) DefaultNode(nodeA)上面的结构可以通过调用 curl http://localhost:8719/tree?type=root 来显示:
EntranceNode: machine-root(t:0 pq:1 bq:0 tq:1 rt:0 prq:1 1mp:0 1mb:0 1mt:0) -EntranceNode1: Entrance1(t:0 pq:1 bq:0 tq:1 rt:0 prq:1 1mp:0 1mb:0 1mt:0) --nodeA(t:0 pq:1 bq:0 tq:1 rt:0 prq:1 1mp:0 1mb:0 1mt:0) -EntranceNode2: Entrance1(t:0 pq:1 bq:0 tq:1 rt:0 prq:1 1mp:0 1mb:0 1mt:0) --nodeA(t:0 pq:1 bq:0 tq:1 rt:0 prq:1 1mp:0 1mb:0 1mt:0) t:threadNum pq:passQps bq:blockedQps tq:totalQps rt:averageRt prq: passRequestQps 1mp:1m-passed 1mb:1m-blocked 1mt:1m-total
源码讲解:
package com.alibaba.csp.sentinel.slots.nodeselector; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.node.ClusterNode; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.node.EntranceNode; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.spi.SpiOrder; import java.util.HashMap; import java.util.Map; /** * 这个 slot 主要负责收集资源的路径,并将这些资源的调用路径以树状结构存储起来,用于根据调用路径进行流量控制。 * * <p>工作流程:</p> * <pre> * ContextUtil.enter("entrance1", "appA"); * Entry nodeA = SphU.entry("nodeA"); * if (nodeA != null) { * nodeA.exit(); * } * ContextUtil.exit(); * </pre> * * Above code will generate the following invocation structure in memory: * * <pre> * * machine-root * / * / * EntranceNode1 * / * / * DefaultNode(nodeA)- - - - - -> ClusterNode(nodeA); * </pre> * * DefaultNode and ClusterNode都会保存StatisticNode的统计数据。 * * ClusterNode是根据ResourceId来进行区分,DefaultNode是根据Resource和Context进行区分。 * 也就是说,Resource会根据Context不同产生多个DefaultNode,但是只会有一个ClusterNode。 * * * <p> * the following code shows one resource id in two different context: * </p> * * <pre> * ContextUtil.enter("entrance1", "appA"); * Entry nodeA = SphU.entry("nodeA"); * if (nodeA != null) { * nodeA.exit(); * } * ContextUtil.exit(); * * ContextUtil.enter("entrance2", "appA"); * nodeA = SphU.entry("nodeA"); * if (nodeA != null) { * nodeA.exit(); * } * ContextUtil.exit(); * </pre> * * Above code will generate the following invocation structure in memory: * * <pre> * * machine-root * / \ * / \ * EntranceNode1 EntranceNode2 * / \ * / \ * DefaultNode(nodeA) DefaultNode(nodeA) * | | * +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA); * </pre> * * * <p> * 使用如下链接可以查看结构 * curl http://localhost:8719/tree?type=root * </p> */ @SpiOrder(-10000) public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> { /** * {@link DefaultNode}s of the same resource in different context. */ // 保存同一个资源下不同Context的DefaultNode节点 // 资源的SlotChain是共享的,所以NodeSelectorSlot中保存了Resource在不同Context下数据,即DefaultNode节点 private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10); /** * */ @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { /* * It's interesting that we use context name rather resource name as the map key. * * Remember that same resource({@link ResourceWrapper#equals(Object)}) will share * the same {@link ProcessorSlotChain} globally, no matter in which context. So if * code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, Object...)}, * the resource name must be same but context name may not. * * If we use {@link com.alibaba.csp.sentinel.SphU#entry(String resource)} to * enter same resource in different context, using context name as map key can * distinguish the same resource. In this case, multiple {@link DefaultNode}s will be created * of the same resource name, for every distinct context (different context name) each. * * Consider another question. One resource may have multiple {@link DefaultNode}, * so what is the fastest way to get total statistics of the same resource? * The answer is all {@link DefaultNode}s with same resource name share one * {@link ClusterNode}. See {@link ClusterBuilderSlot} for detail. */ // 尝试获取Context对应的DefaultNode DefaultNode node = map.get(context.getName()); if (node == null) { synchronized (this) { node = map.get(context.getName()); // 第一次请求新建一个DefaultNode,并放入map中 if (node == null) { node = new DefaultNode(resourceWrapper, null); HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size()); cacheMap.putAll(map); cacheMap.put(context.getName(), node); map = cacheMap; // Context维护了当前调用链的节点树 ((DefaultNode) context.getLastNode()).addChild(node); } } } context.setCurNode(node); // 调用链上其它Slot的#entry处理 fireEntry(context, resourceWrapper, node, count, prioritized, args); } // 触发Slot Exit逻辑,处理完成后执行链上下一个Slot的Exit逻辑 @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } }
此插槽用于构建资源的 ClusterNode 以及调用来源节点。ClusterNode 保持某个资源运行统计信息(响应时间、QPS、block 数目、线程数、异常数等)以及调用来源统计信息列表。调用来源的名称由 ContextUtil.enter(contextName,origin) 中的 origin 标记。可通过如下命令查看某个资源不同调用者的访问情况:curl http://localhost:8719/origin?id=caller:
id: nodeA idx origin threadNum passedQps blockedQps totalQps aRt 1m-passed 1m-blocked 1m-total 1 caller1 0 0 0 0 0 0 0 0 2 caller2 0 0 0 0 0 0 0 0
源码讲解:
package com.alibaba.csp.sentinel.slots.clusterbuilder; import java.util.HashMap; import java.util.Map; import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.node.ClusterNode; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.node.IntervalProperty; import com.alibaba.csp.sentinel.node.Node; import com.alibaba.csp.sentinel.node.SampleCountProperty; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; import com.alibaba.csp.sentinel.spi.SpiOrder; /** * 此插槽用于构建资源的 ClusterNode 以及调用来源节点。ClusterNode 保持某个资源运行统计信息(响应时间、QPS、block 数目、线程数、异常数等) * 以及调用来源统计信息列表。调用来源的名称由 ContextUtil.enter(contextName,origin) 中的 origin 标记。 * * Resource只有一个ClusterNode,但是可以有多个DefaultNode。 */ @SpiOrder(-9000) public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> { /** * <p> * Remember that same resource({@link ResourceWrapper#equals(Object)}) will share * the same {@link ProcessorSlotChain} globally, no matter in which context. So if * code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, boolean, Object...)}, * the resource name must be same but context name may not. * </p> * <p> * To get total statistics of the same resource in different context, same resource * shares the same {@link ClusterNode} globally. All {@link ClusterNode}s are cached * in this map. * </p> * <p> * The longer the application runs, the more stable this mapping will * become. so we don't concurrent map but a lock. as this lock only happens * at the very beginning while concurrent map will hold the lock all the time. * </p> */ // Resource共享ProcessorSlotChain,不区分Context。 // 因为Resource可以有多个Context,所以ClusterNode是用来统计Resource在所有Context的汇总统计数据。 // clusterNodeMap就是用来缓存所有这些ClusterNode数据。 private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>(); private static final Object lock = new Object(); private volatile ClusterNode clusterNode = null; // entry()是Slot核心逻辑的入口 @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { if (clusterNode == null) { synchronized (lock) { if (clusterNode == null) { // Create the cluster node. // 根据Resource创建新的ClusterNode放到Map中 clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType()); HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16)); newMap.putAll(clusterNodeMap); newMap.put(node.getId(), clusterNode); clusterNodeMap = newMap; } } } node.setClusterNode(clusterNode); /* * if context origin is set, we should get or create a new {@link Node} of * the specific origin. */ if (!"".equals(context.getOrigin())) { Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin()); context.getCurEntry().setOriginNode(originNode); } // 执行下一Slot处理 fireEntry(context, resourceWrapper, node, count, prioritized, args); } // exit()是Slot处理结束的出口 @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } /** * Get {@link ClusterNode} of the resource of the specific type. * * @param id resource name. * @param type invoke type. * @return the {@link ClusterNode} */ public static ClusterNode getClusterNode(String id, EntryType type) { return clusterNodeMap.get(new StringResourceWrapper(id, type)); } /** * Get {@link ClusterNode} of the resource name. * * @param id resource name. * @return the {@link ClusterNode}. */ public static ClusterNode getClusterNode(String id) { if (id == null) { return null; } ClusterNode clusterNode = null; for (EntryType nodeType : EntryType.values()) { clusterNode = clusterNodeMap.get(new StringResourceWrapper(id, nodeType)); if (clusterNode != null) { break; } } return clusterNode; } /** * Get {@link ClusterNode}s map, this map holds all {@link ClusterNode}s, it's key is resource name, * value is the related {@link ClusterNode}. <br/> * DO NOT MODIFY the map returned. * * @return all {@link ClusterNode}s */ public static Map<ResourceWrapper, ClusterNode> getClusterNodeMap() { return clusterNodeMap; } /** * Reset all {@link ClusterNode}s. Reset is needed when {@link IntervalProperty#INTERVAL} or * {@link SampleCountProperty#SAMPLE_COUNT} is changed. */ public static void resetClusterNodes() { for (ClusterNode node : clusterNodeMap.values()) { node.reset(); } } }
LogSlot负责记录处理中产生的Exceptions,从而提供具体的日志已进行故障排除。
源码讲解:
package com.alibaba.csp.sentinel.slots.logger; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.spi.SpiOrder; /** * LogSlot负责记录处理中产生的Exceptions,从而提供具体的日志已进行故障排除。 * * LogSlot后节点都是Sentinel规则处理节点,会产生BlockExceptions,LogSlot负责对异常进行处理。 */ @SpiOrder(-8000) public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> { // entry()是Slot核心逻辑的入口 @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args) throws Throwable { try { // 直接调用后续Slot节点,并捕获过程中产生的异常进行处理。 fireEntry(context, resourceWrapper, obj, count, prioritized, args); } catch (BlockException e) { // 对于Sentinel规则触发的BlockException,按照标准格式记录日志,并向上继续抛出异常。 EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(), context.getOrigin(), count); throw e; } catch (Throwable e) { // 非Sentinel规则异常,记录报警信息。 RecordLog.warn("Unexpected entry exception", e); } } // exit()是Slot处理结束的出口 @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { try { // 不需要额外的资源释放逻辑,直接执行下一节点逻辑 fireExit(context, resourceWrapper, count, args); } catch (Throwable e) { RecordLog.warn("Unexpected entry exit exception", e); } } }
StatisticSlot 是 Sentinel 的核心功能插槽之一,用于统计实时的调用数据。
clusterNode:资源唯一标识的 ClusterNode 的实时统计
origin:根据来自不同调用者的统计信息
defaultnode: 根据入口上下文区分的资源 ID 的 runtime 统计
入口流量的统计
StatisticSlot 是 Sentinel 最为重要的类之一,用于根据规则判断结果进行相应的统计操作。
entry 的时候:依次执行后面的判断 slot。每个 slot 触发流控的话会抛出异常(BlockException 的子类)。若有 BlockException 抛出,则记录 block 数据;若无异常抛出则算作可通过(pass),记录 pass 数据。
exit 的时候:若无 error(无论是业务异常还是流控异常),记录 complete(success)以及 RT,线程数-1。
记录数据的维度:线程数+1、记录当前 DefaultNode 数据、记录对应的 originNode 数据(若存在 origin)、累计 IN 统计数据(若流量类型为 IN)。
Sentinel 底层采用高性能的滑动窗口数据结构 LeapArray 来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。
源码讲解:
package com.alibaba.csp.sentinel.slots.statistic; import java.util.Collection; import com.alibaba.csp.sentinel.node.Node; import com.alibaba.csp.sentinel.slotchain.ProcessorSlotEntryCallback; import com.alibaba.csp.sentinel.slotchain.ProcessorSlotExitCallback; import com.alibaba.csp.sentinel.slots.block.flow.PriorityWaitException; import com.alibaba.csp.sentinel.spi.SpiOrder; import com.alibaba.csp.sentinel.util.TimeUtil; import com.alibaba.csp.sentinel.Constants; import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.node.ClusterNode; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.block.BlockException; /** * StatisticSlot 是 Sentinel 最为重要的类之一,用于根据规则判断结果进行相应的统计操作。 * * StatisticSlot执行时需要统计以下信息: * > ClusterNode中和Resource的统计指标 * > OriginNode中和Caller的统计指标 * > DefaultNode中和Resource * Context维度的统计指标 * > 所有entrance维度的的统计数据 * * StatisticSlot记录以下统计数据: * 并发线程数; * Pass限流数; * BlockQps数; * RT; * 请求完成时间; * ExceptionQPS数; * 成功处理请求数; * * StatisticSlot统计维度: * 记录当前 DefaultNode 数据 * 记录对应的 originNode 数据(若存在 origin) * 累计 IN 统计数据(若流量类型为 IN)。 */ @SpiOrder(-7000) public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> { // entry()是Slot核心逻辑的入口 // entry 的时候:依次执行后面的判断 slot。每个 slot 触发流控的话会抛出异常(BlockException 的子类)。 // 若有 BlockException 抛出,则记录 block 数据; // 若无异常抛出则算作可通过(pass),记录 pass 数据。 @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { // 1.触发规则验证Slot:StatisticSlot之后的Slot都是规则验证节点,首先触发规则验证,在根据验证结果进行统计。 fireEntry(context, resourceWrapper, node, count, prioritized, args); // 2.无异常抛出时,说明没有触发任何规则,算作通过Sentinel验证(pass),需要统计并记录以下调用信息 // 2.1 并发线程数 + 1,pass请求数 + 1 node.increaseThreadNum(); node.addPassRequest(count); // 2.2 Origin维度下,并发线程数 + 1,pass请求数 + 1 if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } // 2.3 Inbound entry,并发线程数 + 1,pass请求数 + 1,这里是全局统计数据(所有Resource) if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { // Ocuppy时触发PriorityWaitException // 增加并发线程数的统计值 node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) { // Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e); // Add block count. // 增加被限流QPS数 node.increaseBlockQps(count); // 增加Origin维度下被限流QPS数 if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } //增加全局流量下的被限流QPS数 if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } // Handle block event with registered entry callback handlers. // 执行限流事件上注册的回调处理 for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) { // Unexpected internal error, set error to current entry. // 其它异常发生时记录ERROR context.getCurEntry().setError(e); throw e; } } // exit()是Slot处理结束的出口 // exit 的时候:若无流控异常,记录 complete(success)以及 RT,线程数-1。 @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { Node node = context.getCurNode(); // 没有发生流控异常时统计:结束时间、RT if (context.getCurEntry().getBlockError() == null) { // Calculate response time (use completeStatTime as the time of completion). // 记录请求处理完成时间completeStatTime,并计算RT long completeStatTime = TimeUtil.currentTimeMillis(); context.getCurEntry().setCompleteTimestamp(completeStatTime); long rt = completeStatTime - context.getCurEntry().getCreateTimestamp(); Throwable error = context.getCurEntry().getError(); // Record response time and success count. // 处理DefaultNode、ClusterNode、OriginNode的统计逻辑 recordCompleteFor(node, count, rt, error); recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error); if (resourceWrapper.getEntryType() == EntryType.IN) { recordCompleteFor(Constants.ENTRY_NODE, count, rt, error); } } // Handle exit event with registered exit callback handlers. Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks(); for (ProcessorSlotExitCallback handler : exitCallbacks) { handler.onExit(context, resourceWrapper, count, args); } // 触发后续exit处理,实际上StatisticSlot后续都是判断节点,没有太多的exit逻辑需要处理 fireExit(context, resourceWrapper, count); } // 记录成功请求数和RT值,减少并发线程数,并记录异常QPS // 对于BlockException:属于流控异常,被记录为block数据 // 对于ERROR:非流控异常,被统计为Success(未被限流),但同时也属于ExceptionQps private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) { if (node == null) { return; } // 记录success数和rt值 node.addRtAndSuccess(rt, batchCount); // 并发线程数 - 1 node.decreaseThreadNum(); // 发生非流控异常时,记录异常QPS数 if (error != null && !(error instanceof BlockException)) { node.increaseExceptionQps(batchCount); } } }
黑白名单校验规则。
源码讲解:
package com.alibaba.csp.sentinel.slots.block.authority; import java.util.Map; import java.util.Set; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.spi.SpiOrder; /** * AuthoritySlot负责校验AuthorityRule. AuthorityRule定义了请求的黑白名单 * * 很多时候,我们需要根据调用来源来判断该次请求是否允许放行,这时候可以使用 Sentinel 的来源访问控制(黑白名单控制)的功能。 * 来源访问控制根据资源的请求来源(origin)限制资源是否通过,若配置白名单则只有请求来源位于白名单内时才可通过; * 若配置黑名单则请求来源位于黑名单时不通过,其余的请求通过。 * * 来源访问控制规则(AuthorityRule)非常简单,主要有以下配置项: * - resource:资源名,即限流规则的作用对象。 * - limitApp:对应的黑名单/白名单,不同 origin 用 , 分隔,如 appA,appB。 * - strategy:限制模式,AUTHORITY_WHITE 为白名单模式,AUTHORITY_BLACK 为黑名单模式,默认为白名单模式。 */ @SpiOrder(-6000) public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> { // entry()是Slot核心逻辑的入口 @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { // 校验黑白名单规则,触发抛出AuthorityException,结束SlotChain处理;否则执行下一节点逻辑 checkBlackWhiteAuthority(resourceWrapper, context); fireEntry(context, resourceWrapper, node, count, prioritized, args); } // exit()是Slot处理结束的出口 @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { // 不需要额外的资源释放逻辑,直接执行下一节点逻辑 fireExit(context, resourceWrapper, count, args); } void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException { // 根据Resource获取内存中加载的黑白名单规则AuthorityRule Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules(); if (authorityRules == null) { return; } Set<AuthorityRule> rules = authorityRules.get(resource.getName()); if (rules == null) { return; } // 逐条校验AuthorityRule,触发则抛出AuthorityException for (AuthorityRule rule : rules) { if (!AuthorityRuleChecker.passCheck(rule, context)) { throw new AuthorityException(context.getOrigin(), rule); } } } }
这个 slot 会根据对于当前系统的整体情况,对入口资源的调用进行动态调配。其原理是让入口的流量和当前系统的预计容量达到一个动态平衡。
注意系统规则只对入口流量起作用(调用类型为 EntryType.IN),对出口流量无效。可通过 SphU.entry(res, entryType) 指定调用类型,如果不指定,默认是EntryType.OUT。
源码讲解:
package com.alibaba.csp.sentinel.slots.system; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.spi.SpiOrder; /** * SystemSlot校验SystemRule。 * 这个 slot 会根据对于当前系统的整体情况,对入口资源的调用进行动态调配。其原理是让入口的流量和当前系统的预计容量达到一个动态平衡。 * * 注意系统规则只对入口流量起作用(调用类型为 EntryType.IN),对出口流量无效。 * 可通过 SphU.entry(res, entryType) 指定调用类型,如果不指定,默认是EntryType.OUT。 */ @SpiOrder(-5000) public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> { // entry()是Slot核心逻辑的入口 @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { // 执行SystemRule校验,失败抛出SystemBlockException(继承自BlockException) SystemRuleManager.checkSystem(resourceWrapper); // 未触发SystemRule时,执行下一节点逻辑 fireEntry(context, resourceWrapper, node, count, prioritized, args); } // exit()是Slot处理结束的出口 @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { // 不需要额外的资源释放逻辑,直接执行下一节点逻辑 fireExit(context, resourceWrapper, count, args); } }
这个 slot 主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止:
指定应用生效的规则,即针对调用方限流的;
调用方为 other 的规则;
调用方为 default 的规则。
源码讲解:
package com.alibaba.csp.sentinel.slots.block.flow; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.spi.SpiOrder; import com.alibaba.csp.sentinel.util.AssertUtil; import com.alibaba.csp.sentinel.util.function.Function; import java.util.Collection; import java.util.List; import java.util.Map; /** * <p> * Combined the runtime statistics collected from the previous * slots (NodeSelectorSlot, ClusterNodeBuilderSlot, and StatisticSlot), FlowSlot * will use pre-set rules to decide whether the incoming requests should be * blocked. * </p> * * <p> * {@code SphU.entry(resourceName)} will throw {@code FlowException} if any rule is * triggered. Users can customize their own logic by catching {@code FlowException}. * </p> * * <p> * One resource can have multiple flow rules. FlowSlot traverses these rules * until one of them is triggered or all rules have been traversed. * </p> * * <p> * Each {@link FlowRule} is mainly composed of these factors: grade, strategy, path. We * can combine these factors to achieve different effects. * </p> * * <p> * The grade is defined by the {@code grade} field in {@link FlowRule}. Here, 0 for thread * isolation and 1 for request count shaping (QPS). Both thread count and request * count are collected in real runtime, and we can view these statistics by * following command: * </p> * * <pre> * curl http://localhost:8719/tree * * idx id thread pass blocked success total aRt 1m-pass 1m-block 1m-all exception * 2 abc647 0 460 46 46 1 27 630 276 897 0 * </pre> * * <ul> * <li>{@code thread} for the count of threads that is currently processing the resource</li> * <li>{@code pass} for the count of incoming request within one second</li> * <li>{@code blocked} for the count of requests blocked within one second</li> * <li>{@code success} for the count of the requests successfully handled by Sentinel within one second</li> * <li>{@code RT} for the average response time of the requests within a second</li> * <li>{@code total} for the sum of incoming requests and blocked requests within one second</li> * <li>{@code 1m-pass} is for the count of incoming requests within one minute</li> * <li>{@code 1m-block} is for the count of a request blocked within one minute</li> * <li>{@code 1m-all} is the total of incoming and blocked requests within one minute</li> * <li>{@code exception} is for the count of business (customized) exceptions in one second</li> * </ul> * * This stage is usually used to protect resources from occupying. If a resource * takes long time to finish, threads will begin to occupy. The longer the * response takes, the more threads occupy. * * Besides counter, thread pool or semaphore can also be used to achieve this. * * - Thread pool: Allocate a thread pool to handle these resource. When there is * no more idle thread in the pool, the request is rejected without affecting * other resources. * * - Semaphore: Use semaphore to control the concurrent count of the threads in * this resource. * * The benefit of using thread pool is that, it can walk away gracefully when * time out. But it also bring us the cost of context switch and additional * threads. If the incoming requests is already served in a separated thread, * for instance, a Servlet HTTP request, it will almost double the threads count if * using thread pool. * * <h3>Traffic Shaping</h3> * <p> * When QPS exceeds the threshold, Sentinel will take actions to control the incoming request, * and is configured by {@code controlBehavior} field in flow rules. * </p> * <ol> * <li>Immediately reject ({@code RuleConstant.CONTROL_BEHAVIOR_DEFAULT})</li> * <p> * This is the default behavior. The exceeded request is rejected immediately * and the FlowException is thrown * </p> * * <li>Warmup ({@code RuleConstant.CONTROL_BEHAVIOR_WARM_UP})</li> * <p> * If the load of system has been low for a while, and a large amount of * requests comes, the system might not be able to handle all these requests at * once. However if we steady increase the incoming request, the system can warm * up and finally be able to handle all the requests. * This warmup period can be configured by setting the field {@code warmUpPeriodSec} in flow rules. * </p> * * <li>Uniform Rate Limiting ({@code RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER})</li> * <p> * This strategy strictly controls the interval between requests. * In other words, it allows requests to pass at a stable, uniform rate. * </p> * <img src="https://raw.githubusercontent.com/wiki/alibaba/Sentinel/image/uniform-speed-queue.png" style="max-width: * 60%;"/> * <p> * This strategy is an implement of <a href="https://en.wikipedia.org/wiki/Leaky_bucket">leaky bucket</a>. * It is used to handle the request at a stable rate and is often used in burst traffic (e.g. message handling). * When a large number of requests beyond the system’s capacity arrive * at the same time, the system using this strategy will handle requests and its * fixed rate until all the requests have been processed or time out. * </p> * </ol> * * @author jialiang.linjl * @author Eric Zhao */ @SpiOrder(-2000) public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { private final FlowRuleChecker checker; public FlowSlot() { this(new FlowRuleChecker()); } /** * Package-private for test. * * @param checker flow rule checker * @since 1.6.1 */ FlowSlot(FlowRuleChecker checker) { AssertUtil.notNull(checker, "flow checker should not be null"); this.checker = checker; } // entry()是Slot核心逻辑的入口 @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { // 执行规则校验 checkFlow(resourceWrapper, context, node, count, prioritized); // 未触发限流规则,执行下一节点处理 fireEntry(context, resourceWrapper, node, count, prioritized, args); } // 校验限流规则,失败则抛出FlowException(继承自BlockException),结束SlotChain处理 void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { checker.checkFlow(ruleProvider, resource, context, node, count, prioritized); } // exit()是Slot处理结束的出口 @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { // FlowSlot结束时没有触发数据统计逻辑,直接执行下一个Slot逻辑 fireExit(context, resourceWrapper, count, args); } // 根据Resource获取内存中加载的限流规则 private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() { @Override public Collection<FlowRule> apply(String resource) { // Flow rule map should not be null. Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap(); return flowRules.get(resource); } }; }
这个 slot 主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。
源码讲解:
package com.alibaba.csp.sentinel.slots.block.degrade; import java.util.List; import com.alibaba.csp.sentinel.Entry; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker; import com.alibaba.csp.sentinel.spi.SpiOrder; /** * DegradeSlot 主要处理熔断逻辑。 * * 这个 slot 主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。 */ @SpiOrder(-1000) public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> { // entry()是Slot核心逻辑的入口 @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { performChecking(context, resourceWrapper); // 未触发熔断时,执行下一节点处理。触发熔断时抛出异常,结束SlotChain处理。 fireEntry(context, resourceWrapper, node, count, prioritized, args); } void performChecking(Context context, ResourceWrapper r) throws BlockException { // 根据Resource获取内存中加载的熔断规则 List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName()); if (circuitBreakers == null || circuitBreakers.isEmpty()) { return; } // 逐条检验熔断规则,失败将会抛出DegradeException异常 for (CircuitBreaker cb : circuitBreakers) { if (!cb.tryPass(context)) { throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule()); } } } // exit()是Slot处理结束的出口 @Override public void exit(Context context, ResourceWrapper r, int count, Object... args) { Entry curEntry = context.getCurEntry(); // 如果当前请求没有触发限流规则,也没有定义熔断规则,请求完成时会统计相关调用:请求处理结束时间、RT、是否慢调用 // 成功完成处理,没有触发限流和熔断规则时, if (curEntry.getBlockError() != null) { fireExit(context, r, count, args); return; } List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName()); if (circuitBreakers == null || circuitBreakers.isEmpty()) { fireExit(context, r, count, args); return; } // 如果没有触发限流规则,请求完成时需要统计调用数据:处理结束时间、RT、是否慢调用 if (curEntry.getBlockError() == null) { // passed request for (CircuitBreaker circuitBreaker : circuitBreakers) { circuitBreaker.onRequestComplete(context); } } // 触发下一个Slot#exit()执行 fireExit(context, r, count, args); } }
SlotChain是由多个ProcessorSlot对象组成的单向链表结构。
ProcessorSlot定义了处理节点的逻辑和指向的next节点;
ProcessorSlotChain定义了对单向链表的处理方法,包括addFirst和addLast;
关于SlotChain的构建,我们从SlotChainProvider#newSlotChain方法入手,源码如下
package com.alibaba.csp.sentinel.slotchain; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder; import com.alibaba.csp.sentinel.util.SpiLoader; public final class SlotChainProvider { // volatile保证slotChainBuilder的内存可见性 private static volatile SlotChainBuilder slotChainBuilder = null; /** * The load and pick process is not thread-safe, but it's okay since the method should be only invoked * via {@code lookProcessChain} in {@link com.alibaba.csp.sentinel.CtSph} under lock. * * @return new created slot chain */ // 新建SlotChain,方法本身不是线程安全的,但是在lookProcessChain()中操作是线程安全的。 public static ProcessorSlotChain newSlotChain() { if (slotChainBuilder != null) { return slotChainBuilder.build(); } // 如果slotChainBuilder未初始化,使用SPI的方式加载 // 加载内容在 META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class); if (slotChainBuilder == null) { // Should not go through here. RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); slotChainBuilder = new DefaultSlotChainBuilder(); } else { RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", slotChainBuilder.getClass().getCanonicalName()); } return slotChainBuilder.build(); } private SlotChainProvider() {} }
最关键的是 slotChainBuilder#build里的构建逻辑。
public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); // 使用SPI方式获取默认的SlotChain定义 List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class); for (ProcessorSlot slot : sortedSlotList) { // 如果是自定义Slot,需要实现AbstractLinkedProcessorSlot才可以 if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } // 返回SlotChain对象 chain.addLast((AbstractLinkedProcessorSlot<?>) slot); } return chain; } }
Sentinel是用SPI的方式来获取并构建SlotChain,我们继续往下,看看SpiLoader#loadPrototypeInstanceListSorted这里有什么特别的地方。
/** * Load the sorted and prototype SPI instance list for provided SPI interface. * * Note: each call return different instances, i.e. prototype instance, not singleton instance. * * @param clazz class of the SPI * @param <T> SPI type * @return sorted and different SPI instance list * @since 1.7.2 */ // 使用SPI来加载SlotChain public static <T> List<T> loadPrototypeInstanceListSorted(Class<T> clazz) { try { // 使用SPI来加载服务,这里参数是ProcessorSlot.class,对应的service名称就是clazz对应的全路径。 // SPI要加载的内容是定义在\src\main\resources\META-INF\services目录下。 // 可以看到该文件下定义了以下Slot服务: // com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot(@SpiOrder(-10000)) // com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot(@SpiOrder(-9000)) // com.alibaba.csp.sentinel.slots.logger.LogSlot(@SpiOrder(-8000)) // com.alibaba.csp.sentinel.slots.statistic.StatisticSlot(@SpiOrder(-7000)) // com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot(@SpiOrder(-6000)) // com.alibaba.csp.sentinel.slots.system.SystemSlot(@SpiOrder(-5000)) // com.alibaba.csp.sentinel.slots.block.flow.FlowSlot(@SpiOrder(-2000)) // com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot(@SpiOrder(-1000)) ServiceLoader<T> serviceLoader = ServiceLoaderUtil.getServiceLoader(clazz); // orderWrappers是根据Slot对象定义的Order值来升序排列, 所以SlotChain的调用链顺序是: // NodeSelectorSlot > ClusterBuilderSlot > LogSlot > StatisticSlot > AuthoritySlot > // SystemSlot > FlowSlot > DegradeSlot List<SpiOrderWrapper<T>> orderWrappers = new ArrayList<>(); for (T spi : serviceLoader) { // 获取Slot注解定义的SPIOrder值 int order = SpiOrderResolver.resolveOrder(spi); // Since SPI is lazy initialized in ServiceLoader, we use online sort algorithm here. SpiOrderResolver.insertSorted(orderWrappers, spi, order); RecordLog.debug("[SpiLoader] Found {} SPI: {} with order {}", clazz.getSimpleName(), spi.getClass().getCanonicalName(), order); } // 返回List<ProcessorSlot>对象 List<T> list = new ArrayList<>(orderWrappers.size()); for (int i = 0; i < orderWrappers.size(); i++) { list.add(orderWrappers.get(i).spi); } return list; } catch (Throwable t) { RecordLog.error("[SpiLoader] ERROR: loadPrototypeInstanceListSorted failed", t); t.printStackTrace(); return new ArrayList<>(); } }
可以看到,Sentinel使用SPI来加载SlotChain,并根据SPI_ORDER来规定Chain中的Slot的加载顺序。注释中的文字部分是我从各个Slot源码中捞出来的定义,里面包含了SpiOrder注解,编排了Slot的加载顺序。
我们再来看看SPI的定义,SPI是放在\src\main\resources\META-INF\services目录下,SpiLoader#loadPrototypeInstanceListSorted加载的是ProcessorSlot.class对象,全路径就是com.alibaba.csp.sentinel.slotchain.ProcessorSlot,所以在\services目录下找到全路径定义的文件,里面是SPI定义。
# Sentinel default ProcessorSlots com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot com.alibaba.csp.sentinel.slots.logger.LogSlot com.alibaba.csp.sentinel.slots.statistic.StatisticSlot com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot com.alibaba.csp.sentinel.slots.system.SystemSlot com.alibaba.csp.sentinel.slots.block.flow.FlowSlot com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot根据SPI定义的类路径,按照一定顺序加载Slot并加入到SlotChain中。
Sentinel使用滑动窗口来保存实时统计数据,主要有以下几个核心类:
ArrayMetric:Sentinel用于保存滑动窗口的对象类,存储了滑动窗口数据LeapArray<MetricBucket>,和对滑动窗口的操作行为;
LeapArray:滑动窗口的核心数据结构,包含了滑动窗口的定义和实例数据AtomicReferenceArray<WindowWrap<T>>;
WindowWrap:滑动窗口对象,定义了滑动窗口起始时间和存放的统计数据;
MetricBucket:滑动窗口存放数据的实例数据结构,包含了各种指标的数据;
MetricEvent:Sentinel中实时统计的几个数据指标;
Sentinel的滑动窗口如下图所示,假定滑动窗口的长度为1000ms,Sentinel等于是在绝得的时间轴上规划了一个如下的滑动窗口轴:
非常简单,我们直接看源码的实现,里面有详细的讲解。我们按照从前往后的顺序来说明类定义。
从上面的示例可以看出,Sentinel处理可以分为几个步骤:
定义并加载限流规则到内存中;
定义资源并进行规则校验(Slot节点);
限流处置;
释放资源;
首先定义FlowRule的逻辑
private static void initFlowQpsRule() { List<FlowRule> rules = new ArrayList<FlowRule>(); FlowRule rule1 = new FlowRule(); rule1.setResource(KEY); // set limit qps to 20 rule1.setCount(20); rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); rule1.setLimitApp("default"); rules.add(rule1); FlowRuleManager.loadRules(rules); }
然后我们看 FlowRuleManager#loadRules是怎么处理的。
/* * Copyright 1999-2018 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.csp.sentinel.slots.block.flow; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.util.AssertUtil; import com.alibaba.csp.sentinel.util.StringUtil; import com.alibaba.csp.sentinel.node.metric.MetricTimerListener; import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; import com.alibaba.csp.sentinel.property.PropertyListener; import com.alibaba.csp.sentinel.property.SentinelProperty; /** * <p> * One resources can have multiple rules. And these rules take effects in the following order: * <ol> * <li>requests from specified caller</li> * <li>no specified caller</li> * </ol> * </p> * * @author jialiang.linjl * @author Eric Zhao */ public class FlowRuleManager { private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>(); private static final FlowPropertyListener LISTENER = new FlowPropertyListener(); private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>(); @SuppressWarnings("PMD.ThreadPoolCreationRule") private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, new NamedThreadFactory("sentinel-metrics-record-task", true)); static { // 添加Lisnen监听逻辑 currentProperty.addListener(LISTENER); SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS); } /** * Listen to the {@link SentinelProperty} for {@link FlowRule}s. The property is the source of {@link FlowRule}s. * Flow rules can also be set by {@link #loadRules(List)} directly. * * @param property the property to listen. */ public static void register2Property(SentinelProperty<List<FlowRule>> property) { AssertUtil.notNull(property, "property cannot be null"); synchronized (LISTENER) { RecordLog.info("[FlowRuleManager] Registering new property to flow rule manager"); currentProperty.removeListener(LISTENER); property.addListener(LISTENER); currentProperty = property; } } /** * Get a copy of the rules. * * @return a new copy of the rules. */ public static List<FlowRule> getRules() { List<FlowRule> rules = new ArrayList<FlowRule>(); for (Map.Entry<String, List<FlowRule>> entry : flowRules.entrySet()) { rules.addAll(entry.getValue()); } return rules; } /** * Load {@link FlowRule}s, former rules will be replaced. * * @param rules new rules to load. */ // 可以看出FlowRuleManager.loadRules的逻辑比较简单,核心在内部类FlowPropertyListener的逻辑中。 public static void loadRules(List<FlowRule> rules) { // currentProperty.updateValue会对当前已存在的rules和定义的rules进行比较,如果不同,则触发FlowPropertyListener的逻辑 currentProperty.updateValue(rules); } static Map<String, List<FlowRule>> getFlowRuleMap() { return flowRules; } public static boolean hasConfig(String resource) { return flowRules.containsKey(resource); } public static boolean isOtherOrigin(String origin, String resourceName) { if (StringUtil.isEmpty(origin)) { return false; } List<FlowRule> rules = flowRules.get(resourceName); if (rules != null) { for (FlowRule rule : rules) { if (origin.equals(rule.getLimitApp())) { return false; } } } return true; } // 内部类实现监听逻辑 private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> { // 根据定义重新加载FlowRules @Override public void configUpdate(List<FlowRule> value) { Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value); if (rules != null) { flowRules.clear(); flowRules.putAll(rules); } RecordLog.info("[FlowRuleManager] Flow rules received: {}", flowRules); } @Override public void configLoad(List<FlowRule> conf) { Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf); if (rules != null) { flowRules.clear(); flowRules.putAll(rules); } RecordLog.info("[FlowRuleManager] Flow rules loaded: {}", flowRules); } } }
我们看看Listener怎么实现:
package com.alibaba.csp.sentinel.property; import java.util.Collections; import java.util.HashSet; import java.util.Set; import com.alibaba.csp.sentinel.log.RecordLog; public class DynamicSentinelProperty<T> implements SentinelProperty<T> { protected Set<PropertyListener<T>> listeners = Collections.synchronizedSet(new HashSet<PropertyListener<T>>()); private T value = null; public DynamicSentinelProperty() { } public DynamicSentinelProperty(T value) { super(); this.value = value; } @Override public void addListener(PropertyListener<T> listener) { listeners.add(listener); listener.configLoad(value); } @Override public void removeListener(PropertyListener<T> listener) { listeners.remove(listener); } // 比较rules是否发生变化,如果发生变化,触发FlowPropertyListener的逻辑 @Override public boolean updateValue(T newValue) { if (isEqual(value, newValue)) { return false; } RecordLog.info("[DynamicSentinelProperty] Config will be updated to: {}", newValue); value = newValue; for (PropertyListener<T> listener : listeners) { listener.configUpdate(newValue); } return true; } private boolean isEqual(T oldValue, T newValue) { if (oldValue == null && newValue == null) { return true; } if (oldValue == null) { return false; } return oldValue.equals(newValue); } public void close() { listeners.clear(); } } package com.alibaba.csp.sentinel.property; /** * 这个类负责监听SentinelProperty#updateValue(Object)调用事件 */ public interface PropertyListener<T> { /** * SentinelProperty#updateValue(Object)被调用且返回true时被触发 */ void configUpdate(T value); /** * 值第一次被加载时触发 */ void configLoad(T value); }
这就是FlowRules的加载过程,其实比较简单。
第1步:以SphU.entry()为入口,创建对应的Resource对象。
public class FlowQpsDemo { ...... private static void simulateTraffic() { while (!stop) { Entry entry = null; try { // 1.1 资源定义并执行Slot规则 entry = SphU.entry(KEY); // TODO } catch (BlockException e1) { // TODO } catch (Exception e2) { // TODO } finally { // TODO } } } // 在[1.1]中触发SphU#entry执行,以下逻辑被执行 // 以SphU.entry()为入口,首先根据入参生成对应的Resource对象 public class CtSph implements Sph { ...... @Override public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException { // 1.2 创建Resource对象 StringResourceWrapper resource = new StringResourceWrapper(name, type); // 1.3 对资源进行统计和规则判定 return entry(resource, count, args); } ...... } // 在[1.2]中触发new一个Resource对象 // 其核心逻辑是在父类中初始化Resource对象 public abstract class ResourceWrapper { // 资源名称 protected final String name; // Entry类型 protected final EntryType entryType; // 资源类型 protected final int resourceType; public ResourceWrapper(String name, EntryType entryType, int resourceType) { AssertUtil.notEmpty(name, "resource name cannot be empty"); AssertUtil.notNull(entryType, "entryType cannot be null"); this.name = name; this.entryType = entryType; this.resourceType = resourceType; } ...... }第2步:对参数和全局配置项做检测
步骤[2.1]内容如下。
public class CtSph implements Sph { ...... // private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { // 2.1 对参数和全局配置项做检测,如果不符合要求就直接返回了一个CtEntry对象,不会再进行后面的限流检测,否则进入下面的检测流程。 // 获取一个ThreadLocal的上下文Context Context context = ContextUtil.getContext(); if (context instanceof NullContext) { // 如果处理失败,直接返回CtEntry,第二个参数SlotChain为null,不会进行后续的统计和限流监测 return new CtEntry(resourceWrapper, null, context); } if (context == null) { // Using default context. context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } // Global switch is close, no rule checking will do. if (!Constants.ON) { // 如果处理失败,直接返回CtEntry,第二个参数SlotChain为null,不会进行后续的统计和限流监测 return new CtEntry(resourceWrapper, null, context); } // 2.2 获取资源的SlotChain对象 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */ if (chain == null) { return new CtEntry(resourceWrapper, null, context); } // 2.3 生成Entry对象 Entry e = new CtEntry(resourceWrapper, chain, context); try { // 2.4 执行Slot Chain的entry方法 chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; } ...... }
第3步:获取Resource对应的SlotChain
步骤[2.2]是根据包装过的Resource对象获取对应的SlotChain,我们来看看CtSph#lookProcessChain()做了些什么。
/** * Get {@link ProcessorSlotChain} of the resource. new {@link ProcessorSlotChain} will * be created if the resource doesn't relate one. * * <p>Same resource({@link ResourceWrapper#equals(Object)}) will share the same * {@link ProcessorSlotChain} globally, no matter in which {@link Context}.<p/> * * <p> * Note that total {@link ProcessorSlot} count must not exceed {@link Constants#MAX_SLOT_CHAIN_SIZE}, * otherwise null will return. * </p> * * @param resourceWrapper target resource * @return {@link ProcessorSlotChain} of the resource */ // 1.获取资源对应的SlotChain,如果资源还没有关联则创建新的SlotChain。 // 2.同一个资源在不同的调用中(Context不同)使用相同的SlotChain对象。 // 3.chainMap对象不能超过限定数量,否则返回null。 ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { // HashMap#get()是根据hash(key)来定位数据,resourceWrapper#hash被重写为ResourceWrapper.name#hashCode ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) { // 资源对应的SlotChain还没有被初始化,这里通过线程安全的方式进行初始化。 synchronized (LOCK) { // double check防止线程安全性问题 chain = chainMap.get(resourceWrapper); if (chain == null) { // 这里原注释是Entry size limit,应该是和资源数量相关 if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } // 新建SlotChain并放入原来的chainMap中 chain = SlotChainProvider.newSlotChain(); // 这里在新增的时候是采用替换Map对象的方式,这在源码中比较常见,主要是防止直接在原Map中新增可能引发扩容导致性能问题 Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }
所以,最关键的还是SlotChainProvider#newSlotChain()方法,这部分的逻辑,请参考SlotChain部分源码解析。
第4步:创建Entry对象
步骤[2.3]根据Resource、Context和SlotChain参数创建Entry对象。
第5步:执行SlotChain的entry方法
根据SlotChain中Slot的加载顺序,执行各Slot#entry方法逻辑:
1.如果SlotChain的entry方法抛出了BlockException,则将该异常继续向上抛出
2.如果SlotChain的entry方法正常执行了,则最后会将该entry对象返回
关于各个Slot的逻辑,可以参照Slot部分源码解析,这里就不重复拷贝了。
第6步:如果上层方法捕获了BlockException,则说明请求被限流了,否则请求能正常执行
public class CtSph implements Sph { // 执行#entry()方法 private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { ....... try { // 第4步:顺序执行Slot#entry()方法,具体可以看每个Slot实现类的#entry()逻辑 chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e;
到这里就是Sentinel整块限流核心逻辑的执行顺序。
如果触发了Slot的限流规则,会抛出BlockException的子类,在业务代码中可以对异常进行捕获和处理。同时SlotChain会根据Exception的类型,对数据进行统计,得到限流、降级和成功的数据。
主要包含两部分处理:
自定义部分:业务端捕获BlockException异常后,判断为请求被限流,可以进行对应的业务逻辑处理,比如等待,丢弃,快速失败,或者进行自定义数据统计等等;
Sentinel限流部分:针对抛出BlockException,SlotChain会对Exception进行处理。比如LogSlot会统计并输出异常日志。StatisticSlot会统计限流数据和正常请求数据。这部分是Sentinel SlotChain自带的处理,输出数据也是常用的维度。当然我们也可以自定义Slot来进行数据统计。
1、在限流处理结束后,需要再finnaly里显示调用Entry.exit()释放资源。
try { entry = SphU.entry(KEY); // 业务处理 } catch (BlockException e1) { // 限流处理 } catch (Exception e2) { // biz exception } finally { if (entry != null) { // 这里调用显示释放资源 entry.exit(); } }
Slot#exit的执行顺序和Slot#entry一样,exit主要是负责一些资源释放和后期数据统计逻辑的处理。
这部分在Slot源码部分也做过整理,这里不再进行说明。