1.8 源码分析

    科技2025-07-19  6

    目录

    Pt1 核心类解析

    Pt1.1 Resource

    ResourceWrapper

    StringResourceWrapper

    Pt1.2 Entry

    Entry

    CtEntry

    Pt1.3 Context

    Pt1.4 Node

    Node

    StatisticNode

    DefaultNode

    ClusterNode

    EntranceNode

    Pt1.5 Slot

    NodeSelectorSlot

    ClusterBuilderSlot

    LogSlot

    StatisticSlot

    AuthoritySlot

    SystemSlot

    FlowSlot

    DegradeSlot

    Pt1.6 SlotChain

    构建SlotChain

    Pt2 滑动窗口

    Pt2.1 MetricEvent

    Pt2.2 MetricBucket

    Pt2.3 WindowWrap

    Pt2.4 LeapArray

    Pt2.5 ArrayMetric

    Pt3 限流源码逻辑

    Pt3.1 示例FlowQpsDemo

    Pt3.2 代码流程说明

    Pt3.3 定义并加载规则

    Pt3.4 限流规则校验Sph*#entry()

    Pt3.5 限流处置

    Pt3.6 释放资源


    Pt1 核心类解析

    Sentinel的核心类主要有如下几个:

    Resource:标识资源,他可能是一段代码逻辑,限流规则的主要目标;

    Entry:每个Request请求会生成一个Entry,Entry包含了Sentinel核心的处理信息,Entry会对应一个Resource;

    Context:存放Entry执行的上下文信息,Context是ThreadLocal的,意味着一个Request处理完成、线程结束后,会释放Context;

    SlotChain:Entry会有一个SlotChain对象,实际上是ProcessorSlotChain,保存了当前请求的核心处理节点信息,是最核心的数据;

    Slot:SlotChain存储的是Slot对象,他们组成责任链模式来完成Sentinel核心处理逻辑。Sentinel提供了几个标准的Slot对象,包含了统计类、规则校验类的处理逻辑;

    Node:每个Slot都有一个Node对象,存放的跟当前资源相关的统计信息。对资源来说,各个Slot存放的是统计或者规则校验逻辑而Node存放的是结果,统计结果。

     

    Pt1.1 Resource

    Resource定义了Sentinel资源,类结构非常简单。

    ResourceWrapper抽象父类定义了相关属性;

    StringResourceWrapper实现类定义了资源的构造方法;

     

     

    ResourceWrapper

    ResourceWrapper抽象父类定义了公共属性,覆写了hash和equals方法。

    public abstract class ResourceWrapper { ​    protected final String name;    protected final EntryType entryType;    protected final int resourceType; ​    // 构造方法    public ResourceWrapper(String name, EntryType entryType, int resourceType) {        AssertUtil.notEmpty(name, "resource name cannot be empty");        AssertUtil.notNull(entryType, "entryType cannot be null");        this.name = name;        this.entryType = entryType;        this.resourceType = resourceType;   } // ... setters and getters... ​    // ResourceWrapper类覆写hashCode和equals方法,使用name属性进行比较,这点非常重要。    // SlotChain和Rules信息使用Resource对象作为KEY存储在内存中,实际是和name属性相关。    @Override    public int hashCode() {        return getName().hashCode();   }    @Override    public boolean equals(Object obj) {        if (obj instanceof ResourceWrapper) {            ResourceWrapper rw = (ResourceWrapper)obj;            return rw.getName().equals(getName());       }        return false;   } }

    StringResourceWrapper

    // StringResourceWrapper主要提供了两个资源构造方法。 public class StringResourceWrapper extends ResourceWrapper {    public StringResourceWrapper(String name, EntryType e) {        super(name, e, ResourceTypeConstants.COMMON);   } ​    public StringResourceWrapper(String name, EntryType e, int resType) {        super(name, e, resType);   } ​    @Override    public String getShowName() {        return name;   } ​    @Override    public String toString() {        return "StringResourceWrapper{" +            "name='" + name + '\'' +            ", entryType=" + entryType +            ", resourceType=" + resourceType +            '}';   } }

     

    Pt1.2 Entry

    每一次资源调用都会创建一个 Entry。Entry 包含了资源名、curNode(当前统计节点)、originNode(来源统计节点)等信息。

    CtEntry 为普通的 Entry,在调用 SphU.entry(xxx) 的时候创建。特性:Linked entry within current context(内部维护着 parent 和 child)

    需要注意的一点:CtEntry 构造函数中会做调用链的变换,即将当前 Entry 接到传入 Context 的调用链路上(setUpEntryFor)。

    资源调用结束时需要 entry.exit()。exit 操作会过一遍 slot chain exit,恢复调用栈,exit context 然后清空 entry 中的 context 防止重复调用。

     

    类结构也比较简单:抽象父类Entry和实现类CTEntry。

     

    Entry

    package com.alibaba.csp.sentinel; ​ import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.util.TimeUtil; import com.alibaba.csp.sentinel.util.function.BiConsumer; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.node.Node; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.context.Context; ​ // Entry 实现AutoCloseable接口用于资源管理: //   使用try-catch-resources语法创建的资源抛出异常后,JVM会自动调用close //   方法进行资源释放,当没有抛出异常正常退出try-block时候也会调用close方法。 // 也就是说Entry对象在执行完try-catch-block之后会被自动回收资源。 public abstract class Entry implements AutoCloseable { ​    private static final Object[] OBJECTS0 = new Object[0]; ​    // Entry的创建和结束时间,用于统计RT数据(平均处理时间)    private final long createTimestamp;    private long completeTimestamp; ​    // 当前统计节点: 记录当前ResourceName的统计数据    private Node curNode; ​    // 来源统计节点:记录特定Origin来源的统计数据    private Node originNode; ​    private Throwable error;    private BlockException blockError; ​    // 资源对象    protected final ResourceWrapper resourceWrapper; ​    public Entry(ResourceWrapper resourceWrapper) {        this.resourceWrapper = resourceWrapper;        this.createTimestamp = TimeUtil.currentTimeMillis();   } ​    public ResourceWrapper getResourceWrapper() {        return resourceWrapper;   } ​    /**     * Complete the current resource entry and restore the entry stack in context.     *     * @throws ErrorEntryFreeException if entry in current context does not match current entry     */    public void exit() throws ErrorEntryFreeException {        exit(1, OBJECTS0);   } ​    public void exit(int count) throws ErrorEntryFreeException {        exit(count, OBJECTS0);   } ​    /**     * AutoCloseable接口释放资源     */    @Override    public void close() {        exit();   } ​    /**     * Exit this entry. This method should invoke if and only if once at the end of the resource protection.     *     * @param count tokens to release.     * @param args extra parameters     * @throws ErrorEntryFreeException, if {@link Context#getCurEntry()} is not this entry.     */    public abstract void exit(int count, Object... args) throws ErrorEntryFreeException; ​    /**     * Exit this entry.     *     * @param count tokens to release.     * @param args extra parameters     * @return next available entry after exit, that is the parent entry.     * @throws ErrorEntryFreeException, if {@link Context#getCurEntry()} is not this entry.     */    protected abstract Entry trueExit(int count, Object... args) throws ErrorEntryFreeException; ​    /**     * Get related {@link Node} of the parent {@link Entry}.     *     * @return     */    public abstract Node getLastNode(); ​    public long getCreateTimestamp() {        return createTimestamp;   } ​    public long getCompleteTimestamp() {        return completeTimestamp;   } ​    public Entry setCompleteTimestamp(long completeTimestamp) {        this.completeTimestamp = completeTimestamp;        return this;   } ​    public Node getCurNode() {        return curNode;   } ​    public void setCurNode(Node node) {        this.curNode = node;   } ​    public BlockException getBlockError() {        return blockError;   } ​    public Entry setBlockError(BlockException blockError) {        this.blockError = blockError;        return this;   } ​    public Throwable getError() {        return error;   } ​    public void setError(Throwable error) {        this.error = error;   } ​    /**     * Get origin {@link Node} of the this {@link Entry}.     *     * @return origin {@link Node} of the this {@link Entry}, may be null if no origin specified by     * {@link ContextUtil#enter(String name, String origin)}.     */    public Node getOriginNode() {        return originNode;   } ​    public void setOriginNode(Node originNode) {        this.originNode = originNode;   } ​    /**     * Like {@code CompletableFuture} since JDK 8, it guarantees specified handler     * is invoked when this entry terminated (exited), no matter it's blocked or permitted.     * Use it when you did some STATEFUL operations on entries.     *     * @param handler handler function on the invocation terminates     * @since 1.8.0     */    public abstract void whenTerminate(BiConsumer<Context, Entry> handler);     }

    CtEntry

    package com.alibaba.csp.sentinel; ​ import java.util.LinkedList; ​ import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.context.NullContext; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.node.Node; import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.util.function.BiConsumer; ​ /** * Linked entry within current context. * * @author jialiang.linjl * @author Eric Zhao */ class CtEntry extends Entry { ​    protected Entry parent = null;    protected Entry child = null; ​    // SlotChain    protected ProcessorSlot<Object> chain;    // 上下文信息    protected Context context;    protected LinkedList<BiConsumer<Context, Entry>> exitHandlers; ​    CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {        super(resourceWrapper);        this.chain = chain;        this.context = context; ​        setUpEntryFor(context);   } ​    // 将当前Entry传入到Context的调用链路上    private void setUpEntryFor(Context context) {        // The entry should not be associated to NullContext.        if (context instanceof NullContext) {            return;       }        this.parent = context.getCurEntry();        if (parent != null) {           ((CtEntry) parent).child = this;       }        context.setCurEntry(this);   } ​    // 资源调用结束时处理    @Override    public void exit(int count, Object... args) throws ErrorEntryFreeException {        trueExit(count, args);   } ​    /**     * Note: the exit handlers will be called AFTER onExit of slot chain.     */    private void callExitHandlersAndCleanUp(Context ctx) {        if (exitHandlers != null && !exitHandlers.isEmpty()) {            for (BiConsumer<Context, Entry> handler : this.exitHandlers) {                try {                    handler.accept(ctx, this);               } catch (Exception e) {                    RecordLog.warn("Error occurred when invoking entry exit handler, current entry: "                        + resourceWrapper.getName(), e);               }           }            exitHandlers = null;       }   } ​    protected void exitForContext(Context context, int count, Object... args) throws ErrorEntryFreeException {        if (context != null) {            // Null context should exit without clean-up.            if (context instanceof NullContext) {                return;           } ​            if (context.getCurEntry() != this) {                String curEntryNameInContext = context.getCurEntry() == null ? null                   : context.getCurEntry().getResourceWrapper().getName();                // Clean previous call stack.                CtEntry e = (CtEntry) context.getCurEntry();                while (e != null) {                    e.exit(count, args);                    e = (CtEntry) e.parent;               }                String errorMessage = String.format("The order of entry exit can't be paired with the order of entry"                        + ", current entry in context: <%s>, but expected: <%s>", curEntryNameInContext,                    resourceWrapper.getName());                throw new ErrorEntryFreeException(errorMessage);           } else {                // 释放每个Slot资源                if (chain != null) {                    chain.exit(context, resourceWrapper, count, args);               }                // Go through the existing terminate handlers (associated to this invocation).                callExitHandlersAndCleanUp(context); ​                // Restore the call stack.                context.setCurEntry(parent);                if (parent != null) {                   ((CtEntry) parent).child = null;               }                if (parent == null) {                    // 释放上下文资源                    if (ContextUtil.isDefaultContext(context)) {                        ContextUtil.exit();                   }               }                // Clean the reference of context in current entry to avoid duplicate exit.                clearEntryContext();           }       }   } ​    protected void clearEntryContext() {        this.context = null;   } ​    @Override    public void whenTerminate(BiConsumer<Context, Entry> handler) {        if (this.exitHandlers == null) {            this.exitHandlers = new LinkedList<>();       }        this.exitHandlers.add(handler);   } ​    @Override    protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException {        exitForContext(context, count, args); ​        return parent;   } ​    @Override    public Node getLastNode() {        return parent == null ? null : parent.getCurNode();   } }

     

    Pt1.3 Context

    Context 代表调用链路上下文,贯穿一次调用链路中的所有 Entry。Context 维持着入口节点(entranceNode)、本次调用链路的 curNode、调用来源(origin)等信息。Context 名称即为调用链路入口名称。

    Context 维持的方式:通过 ThreadLocal 传递,只有在入口 enter 的时候生效。由于 Context 是通过 ThreadLocal 传递的,因此对于异步调用链路,线程切换的时候会丢掉 Context,因此需要手动通过 ContextUtil.runOnContext(context, f) 来变换 context。

    Context

    // 存储当前请求的调用信息(ThreadLocal线程安全)。 // 1.每一个Entry都会有一个Context对象,可以使用ContextUtil#enter()来生成,否则则会使用默认的Context。 // 2.在同一个Context多次调用entry会生成调用链树。 // 3.同一个资源在不同的Context调用时,会在NodeSelectorSlot分开进行统计。 public class Context { ​    // 上下文的名称    private final String name; ​    // 当前请求树的入口节点(当前请求调用链的根节点)    private DefaultNode entranceNode; ​    // 当前请求的Entry对象    private Entry curEntry; ​    // 区分调用者,比如服务调用方名称或者IP    private String origin = ""; ​    private final boolean async; ​    /**     * Create a new async context.     *     * @param entranceNode entrance node of the context     * @param name context name     * @return the new created context     * @since 0.2.0     */    public static Context newAsyncContext(DefaultNode entranceNode, String name) {        return new Context(name, entranceNode, true);   } ​    public Context(DefaultNode entranceNode, String name) {        this(name, entranceNode, false);   } ​    public Context(String name, DefaultNode entranceNode, boolean async) {        this.name = name;        this.entranceNode = entranceNode;        this.async = async;   } ​    public boolean isAsync() {        return async;   } ​    public String getName() {        return name;   } ​    public Node getCurNode() {        return curEntry == null ? null : curEntry.getCurNode();   } ​    public Context setCurNode(Node node) {        this.curEntry.setCurNode(node);        return this;   } ​    public Entry getCurEntry() {        return curEntry;   } ​    public Context setCurEntry(Entry curEntry) {        this.curEntry = curEntry;        return this;   } ​    public String getOrigin() {        return origin;   } ​    public Context setOrigin(String origin) {        this.origin = origin;        return this;   } ​    public double getOriginTotalQps() {        return getOriginNode() == null ? 0 : getOriginNode().totalQps();   } ​    public double getOriginBlockQps() {        return getOriginNode() == null ? 0 : getOriginNode().blockQps();   } ​    public double getOriginPassReqQps() {        return getOriginNode() == null ? 0 : getOriginNode().successQps();   } ​    public double getOriginPassQps() {        return getOriginNode() == null ? 0 : getOriginNode().passQps();   } ​    public long getOriginTotalRequest() {        return getOriginNode() == null ? 0 : getOriginNode().totalRequest();   } ​    public long getOriginBlockRequest() {        return getOriginNode() == null ? 0 : getOriginNode().blockRequest();   } ​    public double getOriginAvgRt() {        return getOriginNode() == null ? 0 : getOriginNode().avgRt();   } ​    public int getOriginCurThreadNum() {        return getOriginNode() == null ? 0 : getOriginNode().curThreadNum();   } ​    public DefaultNode getEntranceNode() {        return entranceNode;   } ​    /**     * Get the parent {@link Node} of the current.     *     * @return the parent node of the current.     */    public Node getLastNode() {        if (curEntry != null && curEntry.getLastNode() != null) {            return curEntry.getLastNode();       } else {            return entranceNode;       }   } ​    public Node getOriginNode() {        return curEntry == null ? null : curEntry.getOriginNode();   } ​    @Override    public String toString() {        return "Context{" +            "name='" + name + '\'' +            ", entranceNode=" + entranceNode +            ", curEntry=" + curEntry +            ", origin='" + origin + '\'' +            ", async=" + async +            '}';   } }

     

    Pt1.4 Node

    类结构

    Sentinel 里面的各种种类的统计节点:

    Node:所有Node的父接口,定义了获取实时统计数据的方法。

    StatisticNode:最为基础的统计节点,包含秒级和分钟级两个滑动窗口结构。

    DefaultNode:链路节点,用于统计调用链路上某个资源的数据,维持树状结构。

    ClusterNode:簇点,用于统计每个资源全局的数据(不区分调用链路),以及存放该资源的按来源区分的调用数据(类型为 StatisticNode)。特别地,Constants.ENTRY_NODE 节点用于统计全局的入口资源数据。

    EntranceNode:入口节点,特殊的链路节点,对应某个 Context 入口的所有调用数据。Constants.ROOT 节点也是入口节点。

     

    构建的时机:

    EntranceNode 在 ContextUtil.enter(xxx) 的时候就创建了,然后塞到 Context 里面。

    NodeSelectorSlot:根据 context 创建 DefaultNode,然后 set curNode to context。

    ClusterBuilderSlot:首先根据 resourceName 创建 ClusterNode,并且 set clusterNode to defaultNode;然后再根据 origin 创建来源节点(类型为 StatisticNode),并且 set originNode to curEntry。

     

    几种 Node 的维度(数目):

    ClusterNode 的维度是 resource

    DefaultNode 的维度是 resource * context,存在每个 NodeSelectorSlot 的 map 里面

    EntranceNode 的维度是 context,存在 ContextUtil 类的 contextNameNodeMap 里面

    来源节点(类型为 StatisticNode)的维度是 resource * origin,存在每个 ClusterNode 的 originCountMap 里面

     

     

    Node

    package com.alibaba.csp.sentinel.node; ​ import java.util.List; import java.util.Map; ​ import com.alibaba.csp.sentinel.Entry; import com.alibaba.csp.sentinel.node.metric.MetricNode; import com.alibaba.csp.sentinel.slots.statistic.metric.DebugSupport; import com.alibaba.csp.sentinel.util.function.Predicate; ​ // 本接口是所有Node的父接口,定义了获取Node子类中资源的实时统计数据的方法 public interface Node extends OccupySupport, DebugSupport { ​    // 每分钟总的请求数量(pass + block)    long totalRequest(); ​    // 每分钟通过的请求数量    long totalPass(); ​    // 每分钟完成的总请求数    long totalSuccess(); ​    // 每分钟被限流的请求数量    long blockRequest(); ​    // 每分钟异常数量    long totalException(); ​    // 每秒钟正常处理的请求数量    double passQps(); ​    // 每秒钟被限流的请求数量    double blockQps(); ​    // 每秒钟总的QPS数量(包括正常处理和被限流的请求)    double totalQps(); ​    // 每秒钟处理完成的请求QPS(调用Entry#exit()处理)    double successQps(); ​    // 获取截止当前为止,每秒钟完成的处理QPS最大值    double maxSuccessQps(); ​    // 获取每秒钟异常处理QPS    double exceptionQps(); ​    // 获取平均响应时间    double avgRt(); ​    // 获取最小的平均响应时间    double minRt(); ​    // 获取当前并发线程数    int curThreadNum(); ​    // 获取最近一秒钟的限流数QPS    double previousBlockQps(); ​    // 获取上一个时间窗口的QPS    double previousPassQps(); ​    /**     * Fetch all valid metric nodes of resources.     *     * @return valid metric nodes of resources     */    Map<Long, MetricNode> metrics(); ​    /**     * Fetch all raw metric items that satisfies the time predicate.     *     * @param timePredicate time predicate     * @return raw metric items that satisfies the time predicate     * @since 1.7.0     */    List<MetricNode> rawMetricsInMin(Predicate<Long> timePredicate); ​    // 添加成功请求的统计数    void addPassRequest(int count); ​    // 添加RT和成功请求的统计    void addRtAndSuccess(long rt, int success); ​    // 增加限流请求数    void increaseBlockQps(int count); ​    // 增加业务异常统计数量    void increaseExceptionQps(int count); ​    // 增加当前并发线程数    void increaseThreadNum(); ​    // 减低当前并发线程数    void decreaseThreadNum(); ​    // 重置所有内部统计数据    void reset(); }

     

    StatisticNode

    最为基础的统计节点,包含秒级和分钟级两个滑动窗口结构。

    package com.alibaba.csp.sentinel.node; ​ import com.alibaba.csp.sentinel.node.metric.MetricNode; import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; import com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric; import com.alibaba.csp.sentinel.slots.statistic.metric.Metric; import com.alibaba.csp.sentinel.util.TimeUtil; import com.alibaba.csp.sentinel.util.function.Predicate; ​ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; ​ /** * StatisticNode对象保存3中类型的实时统计指标数据: * 1.以秒为单位的统计指标; * 2.以分钟为单位的统计指标; * 3.当前并发线程数; * * Sentinel使用滑动窗口来记录和统计资源相关的实时指标数据,滑动窗口是基于LeapArray的数据结构。 * * <p> * case 1: When the first request comes in, Sentinel will create a new window bucket of * a specified time-span to store running statics, such as total response time(rt), * incoming request(QPS), block request(bq), etc. And the time-span is defined by sample count. * </p> * <pre> * 0     100ms * +-------+--→ Sliding Windows *   ^ *   | * request * </pre> * <p> * Sentinel use the statics of the valid buckets to decide whether this request can be passed. * For example, if a rule defines that only 100 requests can be passed, * it will sum all qps in valid buckets, and compare it to the threshold defined in rule. * </p> * * <p>case 2: continuous requests</p> * <pre> * 0   100ms   200ms   300ms * +-------+-------+-------+-----→ Sliding Windows *                     ^ *                     | *                   request * </pre> * * <p>case 3: requests keeps coming, and previous buckets become invalid</p> * <pre> * 0   100ms   200ms 800ms   900ms 1000ms   1300ms * +-------+-------+ ...... +-------+-------+ ...... +-------+-----→ Sliding Windows *                                                     ^ *                                                     | *                                                   request * </pre> * * <p>The sliding window should become:</p> * <pre> * 300ms     800ms 900ms 1000ms 1300ms * + ...... +-------+ ...... +-------+-----→ Sliding Windows *                                                     ^ *                                                     | *                                                   request * </pre> */ public class StatisticNode implements Node { ​    // 保存秒钟时间窗口的统计数据    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,        IntervalProperty.INTERVAL); ​    /**     * Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,     * meaning each bucket per second, in this way we can get accurate statistics of each second.     */    // 保存分钟时间窗口(最近60秒)的统计数据    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); ​    /**     * The counter for thread count.     */    // 线程并发数    private LongAdder curThreadNum = new LongAdder(); ​    /**     * The last timestamp when metrics were fetched.     */    // 最近一次数据指标更新时间    private long lastFetchTime = -1; ​    // 获取所有有效滑动窗口的统计数据    // KEY:滑动窗口的开始时间戳    // VALUE:滑动窗口时间轴内的统计数据    @Override    public Map<Long, MetricNode> metrics() {        // 获取当前时间戳        long currentTime = TimeUtil.currentTimeMillis();        // 计算当前时间戳对应的秒级滑动窗口的开始时间        currentTime = currentTime - currentTime % 1000;        Map<Long, MetricNode> metrics = new ConcurrentHashMap<>();        // rollingCounterInMinute#details()用于获取当前有效的滑动窗口中的统计数据        List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details();        long newLastFetchTime = lastFetchTime;        // Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date).        for (MetricNode node : nodesOfEverySecond) {            // 当前数据节点时效性验证:大于上次数据更新时间            // node.getTimestamp()是当前滑动窗口开始时间            if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) {                metrics.put(node.getTimestamp(), node);                newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp());           }       }        lastFetchTime = newLastFetchTime; ​        return metrics;   } ​    @Override    public List<MetricNode> rawMetricsInMin(Predicate<Long> timePredicate) {        return rollingCounterInMinute.detailsOnCondition(timePredicate);   } ​    // 数据时效性    private boolean isNodeInTime(MetricNode node, long currentTime) {        return node.getTimestamp() > lastFetchTime && node.getTimestamp() < currentTime;   } ​    private boolean isValidMetricNode(MetricNode node) {        return node.getPassQps() > 0 || node.getBlockQps() > 0 || node.getSuccessQps() > 0            || node.getExceptionQps() > 0 || node.getRt() > 0 || node.getOccupiedPassQps() > 0;   } ​    @Override    public void reset() {        rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);   } ​    // ----以下API都是从滑动窗口的数据结构中获取不同类型的统计数据,不作详细解释----    @Override    public long totalRequest() {        return rollingCounterInMinute.pass() + rollingCounterInMinute.block();   } ​    @Override    public long blockRequest() {        return rollingCounterInMinute.block();   } ​    @Override    public double blockQps() {        return rollingCounterInSecond.block() / rollingCounterInSecond.getWindowIntervalInSec();   } ​    @Override    public double previousBlockQps() {        return this.rollingCounterInMinute.previousWindowBlock();   } ​    @Override    public double previousPassQps() {        return this.rollingCounterInMinute.previousWindowPass();   } ​    @Override    public double totalQps() {        return passQps() + blockQps();   } ​    @Override    public long totalSuccess() {        return rollingCounterInMinute.success();   } ​    @Override    public double exceptionQps() {        return rollingCounterInSecond.exception() / rollingCounterInSecond.getWindowIntervalInSec();   } ​    @Override    public long totalException() {        return rollingCounterInMinute.exception();   } ​    @Override    public double passQps() {        return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();   } ​    @Override    public long totalPass() {        return rollingCounterInMinute.pass();   } ​    @Override    public double successQps() {        return rollingCounterInSecond.success() / rollingCounterInSecond.getWindowIntervalInSec();   } ​    @Override    public double maxSuccessQps() {        return (double) rollingCounterInSecond.maxSuccess() * rollingCounterInSecond.getSampleCount()                / rollingCounterInSecond.getWindowIntervalInSec();   } ​    @Override    public double occupiedPassQps() {        return rollingCounterInSecond.occupiedPass() / rollingCounterInSecond.getWindowIntervalInSec();   } ​    @Override    public double avgRt() {        long successCount = rollingCounterInSecond.success();        if (successCount == 0) {            return 0;       } ​        return rollingCounterInSecond.rt() * 1.0 / successCount;   } ​    @Override    public double minRt() {        return rollingCounterInSecond.minRt();   } ​    @Override    public int curThreadNum() {        return (int)curThreadNum.sum();   } ​    @Override    public void addPassRequest(int count) {        rollingCounterInSecond.addPass(count);        rollingCounterInMinute.addPass(count);   } ​    @Override    public void addRtAndSuccess(long rt, int successCount) {        rollingCounterInSecond.addSuccess(successCount);        rollingCounterInSecond.addRT(rt); ​        rollingCounterInMinute.addSuccess(successCount);        rollingCounterInMinute.addRT(rt);   } ​    @Override    public void increaseBlockQps(int count) {        rollingCounterInSecond.addBlock(count);        rollingCounterInMinute.addBlock(count);   } ​    @Override    public void increaseExceptionQps(int count) {        rollingCounterInSecond.addException(count);        rollingCounterInMinute.addException(count);   } ​    // 增加和减少并发线程数    @Override    public void increaseThreadNum() {        curThreadNum.increment();   }    @Override    public void decreaseThreadNum() {        curThreadNum.decrement();   } ​    @Override    public void debug() {        rollingCounterInSecond.debug();   } ​    @Override    public long tryOccupyNext(long currentTime, int acquireCount, double threshold) {        double maxCount = threshold * IntervalProperty.INTERVAL / 1000;        long currentBorrow = rollingCounterInSecond.waiting();        if (currentBorrow >= maxCount) {            return OccupyTimeoutProperty.getOccupyTimeout();       } ​        int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT;        long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL; ​        int idx = 0;        /*         * Note: here {@code currentPass} may be less than it really is NOW, because time difference         * since call rollingCounterInSecond.pass(). So in high concurrency, the following code may         * lead more tokens be borrowed.         */        long currentPass = rollingCounterInSecond.pass();        while (earliestTime < currentTime) {            long waitInMs = idx * windowLength + windowLength - currentTime % windowLength;            if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) {                break;           }            long windowPass = rollingCounterInSecond.getWindowPass(earliestTime);            if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) {                return waitInMs;           }            earliestTime += windowLength;            currentPass -= windowPass;            idx++;       } ​        return OccupyTimeoutProperty.getOccupyTimeout();   } ​    @Override    public long waiting() {        return rollingCounterInSecond.waiting();   } ​    @Override    public void addWaitingRequest(long futureTime, int acquireCount) {        rollingCounterInSecond.addWaiting(futureTime, acquireCount);   } ​    @Override    public void addOccupiedPass(int acquireCount) {        rollingCounterInMinute.addOccupiedPass(acquireCount);        rollingCounterInMinute.addPass(acquireCount);   } }

     

    DefaultNode

    链路节点,用于统计调用链路上某个资源的数据,维持树状结构。

     

    DefaultNode 的维度是 resource * context,存在每个 NodeSelectorSlot 的 map 里面。

    package com.alibaba.csp.sentinel.node; import java.util.HashSet; import java.util.Set; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.SphO; import com.alibaba.csp.sentinel.SphU; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot; /** * 用于统计调用链路上某个资源的数据,基于Resource * Context维度统计。 * * 多次调用#Entry()时,会生成多个Node节点。 */ public class DefaultNode extends StatisticNode { /** * The resource associated with the node. */ // 资源标识,说明DefaultNode是基于Resource的维度存储的 private ResourceWrapper id; /** * The list of all child nodes. */ // 统计数据Node,多次调用#Entry()时,会生成多个Node节点数据 private volatile Set<Node> childList = new HashSet<>(); /** * Associated cluster node. */ // ClusterNode也是基于Resource维度 private ClusterNode clusterNode; public DefaultNode(ResourceWrapper id, ClusterNode clusterNode) { this.id = id; this.clusterNode = clusterNode; } public ResourceWrapper getId() { return id; } public ClusterNode getClusterNode() { return clusterNode; } public void setClusterNode(ClusterNode clusterNode) { this.clusterNode = clusterNode; } /** * Add child node to current node. * * @param node valid child node */ // 新增Node节点,实际上是将node加入到Set结构中 public void addChild(Node node) { if (node == null) { RecordLog.warn("Trying to add null child to node <{}>, ignored", id.getName()); return; } if (!childList.contains(node)) { synchronized (this) { if (!childList.contains(node)) { Set<Node> newSet = new HashSet<>(childList.size() + 1); newSet.addAll(childList); newSet.add(node); childList = newSet; } } RecordLog.info("Add child <{}> to node <{}>", ((DefaultNode)node).id.getName(), id.getName()); } } /** * Reset the child node list. */ public void removeChildList() { this.childList = new HashSet<>(); } public Set<Node> getChildList() { return childList; } @Override public void increaseBlockQps(int count) { super.increaseBlockQps(count); this.clusterNode.increaseBlockQps(count); } @Override public void increaseExceptionQps(int count) { super.increaseExceptionQps(count); this.clusterNode.increaseExceptionQps(count); } @Override public void addRtAndSuccess(long rt, int successCount) { super.addRtAndSuccess(rt, successCount); this.clusterNode.addRtAndSuccess(rt, successCount); } @Override public void increaseThreadNum() { super.increaseThreadNum(); this.clusterNode.increaseThreadNum(); } @Override public void decreaseThreadNum() { super.decreaseThreadNum(); this.clusterNode.decreaseThreadNum(); } @Override public void addPassRequest(int count) { super.addPassRequest(count); this.clusterNode.addPassRequest(count); } public void printDefaultNode() { visitTree(0, this); } // 打印childList的节点树 private void visitTree(int level, DefaultNode node) { for (int i = 0; i < level; ++i) { System.out.print("-"); } if (!(node instanceof EntranceNode)) { System.out.println( String.format("%s(thread:%s pq:%s bq:%s tq:%s rt:%s 1mp:%s 1mb:%s 1mt:%s)", node.id.getShowName(), node.curThreadNum(), node.passQps(), node.blockQps(), node.totalQps(), node.avgRt(), node.totalRequest() - node.blockRequest(), node.blockRequest(), node.totalRequest())); } else { System.out.println( String.format("Entry-%s(t:%s pq:%s bq:%s tq:%s rt:%s 1mp:%s 1mb:%s 1mt:%s)", node.id.getShowName(), node.curThreadNum(), node.passQps(), node.blockQps(), node.totalQps(), node.avgRt(), node.totalRequest() - node.blockRequest(), node.blockRequest(), node.totalRequest())); } for (Node n : node.getChildList()) { DefaultNode dn = (DefaultNode)n; visitTree(level + 1, dn); } } }

     

    ClusterNode

    簇点,用于统计每个资源全局的数据(不区分调用链路),以及存放该资源的按来源区分的调用数据(类型为 StatisticNode)。特别地,Constants.ENTRY_NODE 节点用于统计全局的入口资源数据。

    ClusterNode 的维度是 resource。

     

    package com.alibaba.csp.sentinel.node; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; import com.alibaba.csp.sentinel.ResourceTypeConstants; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.util.AssertUtil; /** * 簇点,用于统计每个资源全局的数据(不区分调用链路),以及存放该资源的按来源区分的调用数据(类型为 StatisticNode)。 * ClusterNode不区分Context,他是基于Resource维度。 */ public class ClusterNode extends StatisticNode { // 簇点名称 private final String name; private final int resourceType; public ClusterNode(String name) { this(name, ResourceTypeConstants.COMMON); } public ClusterNode(String name, int resourceType) { AssertUtil.notEmpty(name, "name cannot be empty"); this.name = name; this.resourceType = resourceType; } /** * <p>The origin map holds the pair: (origin, originNode) for one specific resource.</p> * <p> * The longer the application runs, the more stable this mapping will become. * So we didn't use concurrent map here, but a lock, as this lock only happens * at the very beginning while concurrent map will hold the lock all the time. * </p> */ private Map<String, StatisticNode> originCountMap = new HashMap<>(); private final ReentrantLock lock = new ReentrantLock(); /** * Get resource name of the resource node. * * @return resource name * @since 1.7.0 */ public String getName() { return name; } /** * Get classification (type) of the resource. * * @return resource type * @since 1.7.0 */ public int getResourceType() { return resourceType; } /** * <p>Get {@link Node} of the specific origin. Usually the origin is the Service Consumer's app name.</p> * <p>If the origin node for given origin is absent, then a new {@link StatisticNode} * for the origin will be created and returned.</p> * * @param origin The caller's name, which is designated in the {@code parameter} parameter * {@link ContextUtil#enter(String name, String origin)}. * @return the {@link Node} of the specific origin */ public Node getOrCreateOriginNode(String origin) { StatisticNode statisticNode = originCountMap.get(origin); if (statisticNode == null) { lock.lock(); try { statisticNode = originCountMap.get(origin); if (statisticNode == null) { // The node is absent, create a new node for the origin. statisticNode = new StatisticNode(); HashMap<String, StatisticNode> newMap = new HashMap<>(originCountMap.size() + 1); newMap.putAll(originCountMap); newMap.put(origin, statisticNode); originCountMap = newMap; } } finally { lock.unlock(); } } return statisticNode; } public Map<String, StatisticNode> getOriginCountMap() { return originCountMap; } }

     

    EntranceNode

    入口节点,特殊的链路节点,对应某个 Context 入口的所有调用数据。Constants.ROOT 节点也是入口节点。

     

    EntranceNode 的维度是 context,存在 ContextUtil 类的 contextNameNodeMap 里面

    package com.alibaba.csp.sentinel.node; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot; /** * 入口节点,特殊的链路节点,对应某个 Context 入口的所有调用数据。 */ public class EntranceNode extends DefaultNode { public EntranceNode(ResourceWrapper id, ClusterNode clusterNode) { super(id, clusterNode); } @Override public double avgRt() { double total = 0; double totalQps = 0; for (Node node : getChildList()) { total += node.avgRt() * node.passQps(); totalQps += node.passQps(); } return total / (totalQps == 0 ? 1 : totalQps); } @Override public double blockQps() { double blockQps = 0; for (Node node : getChildList()) { blockQps += node.blockQps(); } return blockQps; } @Override public long blockRequest() { long r = 0; for (Node node : getChildList()) { r += node.blockRequest(); } return r; } @Override public int curThreadNum() { int r = 0; for (Node node : getChildList()) { r += node.curThreadNum(); } return r; } @Override public double totalQps() { double r = 0; for (Node node : getChildList()) { r += node.totalQps(); } return r; } @Override public double successQps() { double r = 0; for (Node node : getChildList()) { r += node.successQps(); } return r; } @Override public double passQps() { double r = 0; for (Node node : getChildList()) { r += node.passQps(); } return r; } @Override public long totalRequest() { long r = 0; for (Node node : getChildList()) { r += node.totalRequest(); } return r; } @Override public long totalPass() { long r = 0; for (Node node : getChildList()) { r += node.totalPass(); } return r; } }

     

    Pt1.5 Slot

    Slot用于处理统计逻辑和规则判断逻辑,他可以组成单项列表结构的SlotChain,采用SPI方式扩展。

     

    这里画了一张SlotChain调用链上entry的处理逻辑图:

     

    我们逐一分析Slot的实现原理。

     

    NodeSelectorSlot

    这个 slot 主要负责收集资源的路径,并将这些资源的调用路径以树状结构存储起来,用于根据调用路径进行流量控制。

    ContextUtil.enter("entrance1", "appA"); Entry nodeA = SphU.entry("nodeA"); if (nodeA != null) { nodeA.exit(); } ContextUtil.exit();

    上述代码通过 ContextUtil.enter() 创建了一个名为 entrance1 的上下文,同时指定调用发起者为 appA;接着通过SphU.entry()请求一个 token,如果该方法顺利执行没有抛 BlockException,表明 token 请求成功。

    以上代码将在内存中生成以下结构:

    machine-root / / EntranceNode1 / / DefaultNode(nodeA)

    注意:每个 DefaultNode 由资源 ID 和输入名称来标识。换句话说,一个资源 ID 可以有多个不同入口的 DefaultNode。

    ContextUtil.enter("entrance1", "appA"); Entry nodeA = SphU.entry("nodeA"); if (nodeA != null) { nodeA.exit(); } ContextUtil.exit(); ContextUtil.enter("entrance2", "appA"); nodeA = SphU.entry("nodeA"); if (nodeA != null) { nodeA.exit(); } ContextUtil.exit();

    以上代码将在内存中生成以下结构:

    machine-root / \ / \ EntranceNode1 EntranceNode2 / \ / \ DefaultNode(nodeA) DefaultNode(nodeA)

    上面的结构可以通过调用 curl http://localhost:8719/tree?type=root 来显示:

    EntranceNode: machine-root(t:0 pq:1 bq:0 tq:1 rt:0 prq:1 1mp:0 1mb:0 1mt:0) -EntranceNode1: Entrance1(t:0 pq:1 bq:0 tq:1 rt:0 prq:1 1mp:0 1mb:0 1mt:0) --nodeA(t:0 pq:1 bq:0 tq:1 rt:0 prq:1 1mp:0 1mb:0 1mt:0) -EntranceNode2: Entrance1(t:0 pq:1 bq:0 tq:1 rt:0 prq:1 1mp:0 1mb:0 1mt:0) --nodeA(t:0 pq:1 bq:0 tq:1 rt:0 prq:1 1mp:0 1mb:0 1mt:0) t:threadNum pq:passQps bq:blockedQps tq:totalQps rt:averageRt prq: passRequestQps 1mp:1m-passed 1mb:1m-blocked 1mt:1m-total

     

    源码讲解:

    package com.alibaba.csp.sentinel.slots.nodeselector; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.node.ClusterNode; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.node.EntranceNode; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.spi.SpiOrder; import java.util.HashMap; import java.util.Map; /** * 这个 slot 主要负责收集资源的路径,并将这些资源的调用路径以树状结构存储起来,用于根据调用路径进行流量控制。 * * <p>工作流程:</p> * <pre> * ContextUtil.enter("entrance1", "appA"); * Entry nodeA = SphU.entry("nodeA"); * if (nodeA != null) { * nodeA.exit(); * } * ContextUtil.exit(); * </pre> * * Above code will generate the following invocation structure in memory: * * <pre> * * machine-root * / * / * EntranceNode1 * / * / * DefaultNode(nodeA)- - - - - -> ClusterNode(nodeA); * </pre> * * DefaultNode and ClusterNode都会保存StatisticNode的统计数据。 * * ClusterNode是根据ResourceId来进行区分,DefaultNode是根据Resource和Context进行区分。 * 也就是说,Resource会根据Context不同产生多个DefaultNode,但是只会有一个ClusterNode。 * * * <p> * the following code shows one resource id in two different context: * </p> * * <pre> * ContextUtil.enter("entrance1", "appA"); * Entry nodeA = SphU.entry("nodeA"); * if (nodeA != null) { * nodeA.exit(); * } * ContextUtil.exit(); * * ContextUtil.enter("entrance2", "appA"); * nodeA = SphU.entry("nodeA"); * if (nodeA != null) { * nodeA.exit(); * } * ContextUtil.exit(); * </pre> * * Above code will generate the following invocation structure in memory: * * <pre> * * machine-root * / \ * / \ * EntranceNode1 EntranceNode2 * / \ * / \ * DefaultNode(nodeA) DefaultNode(nodeA) * | | * +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA); * </pre> * * * <p> * 使用如下链接可以查看结构 * curl http://localhost:8719/tree?type=root * </p> */ @SpiOrder(-10000) public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> { /** * {@link DefaultNode}s of the same resource in different context. */ // 保存同一个资源下不同Context的DefaultNode节点 // 资源的SlotChain是共享的,所以NodeSelectorSlot中保存了Resource在不同Context下数据,即DefaultNode节点 private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10); /** * */ @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { /* * It's interesting that we use context name rather resource name as the map key. * * Remember that same resource({@link ResourceWrapper#equals(Object)}) will share * the same {@link ProcessorSlotChain} globally, no matter in which context. So if * code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, Object...)}, * the resource name must be same but context name may not. * * If we use {@link com.alibaba.csp.sentinel.SphU#entry(String resource)} to * enter same resource in different context, using context name as map key can * distinguish the same resource. In this case, multiple {@link DefaultNode}s will be created * of the same resource name, for every distinct context (different context name) each. * * Consider another question. One resource may have multiple {@link DefaultNode}, * so what is the fastest way to get total statistics of the same resource? * The answer is all {@link DefaultNode}s with same resource name share one * {@link ClusterNode}. See {@link ClusterBuilderSlot} for detail. */ // 尝试获取Context对应的DefaultNode DefaultNode node = map.get(context.getName()); if (node == null) { synchronized (this) { node = map.get(context.getName()); // 第一次请求新建一个DefaultNode,并放入map中 if (node == null) { node = new DefaultNode(resourceWrapper, null); HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size()); cacheMap.putAll(map); cacheMap.put(context.getName(), node); map = cacheMap; // Context维护了当前调用链的节点树 ((DefaultNode) context.getLastNode()).addChild(node); } } } context.setCurNode(node); // 调用链上其它Slot的#entry处理 fireEntry(context, resourceWrapper, node, count, prioritized, args); } // 触发Slot Exit逻辑,处理完成后执行链上下一个Slot的Exit逻辑 @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } }

     

    ClusterBuilderSlot

    此插槽用于构建资源的 ClusterNode 以及调用来源节点。ClusterNode 保持某个资源运行统计信息(响应时间、QPS、block 数目、线程数、异常数等)以及调用来源统计信息列表。调用来源的名称由 ContextUtil.enter(contextName,origin) 中的 origin 标记。可通过如下命令查看某个资源不同调用者的访问情况:curl http://localhost:8719/origin?id=caller:

    id: nodeA idx origin threadNum passedQps blockedQps totalQps aRt 1m-passed 1m-blocked 1m-total 1 caller1 0 0 0 0 0 0 0 0 2 caller2 0 0 0 0 0 0 0 0

     

    源码讲解:

    package com.alibaba.csp.sentinel.slots.clusterbuilder; import java.util.HashMap; import java.util.Map; import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.node.ClusterNode; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.node.IntervalProperty; import com.alibaba.csp.sentinel.node.Node; import com.alibaba.csp.sentinel.node.SampleCountProperty; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; import com.alibaba.csp.sentinel.spi.SpiOrder; /** * 此插槽用于构建资源的 ClusterNode 以及调用来源节点。ClusterNode 保持某个资源运行统计信息(响应时间、QPS、block 数目、线程数、异常数等) * 以及调用来源统计信息列表。调用来源的名称由 ContextUtil.enter(contextName,origin) 中的 origin 标记。 * * Resource只有一个ClusterNode,但是可以有多个DefaultNode。 */ @SpiOrder(-9000) public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> { /** * <p> * Remember that same resource({@link ResourceWrapper#equals(Object)}) will share * the same {@link ProcessorSlotChain} globally, no matter in which context. So if * code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, boolean, Object...)}, * the resource name must be same but context name may not. * </p> * <p> * To get total statistics of the same resource in different context, same resource * shares the same {@link ClusterNode} globally. All {@link ClusterNode}s are cached * in this map. * </p> * <p> * The longer the application runs, the more stable this mapping will * become. so we don't concurrent map but a lock. as this lock only happens * at the very beginning while concurrent map will hold the lock all the time. * </p> */ // Resource共享ProcessorSlotChain,不区分Context。 // 因为Resource可以有多个Context,所以ClusterNode是用来统计Resource在所有Context的汇总统计数据。 // clusterNodeMap就是用来缓存所有这些ClusterNode数据。 private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>(); private static final Object lock = new Object(); private volatile ClusterNode clusterNode = null; // entry()是Slot核心逻辑的入口 @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { if (clusterNode == null) { synchronized (lock) { if (clusterNode == null) { // Create the cluster node. // 根据Resource创建新的ClusterNode放到Map中 clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType()); HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16)); newMap.putAll(clusterNodeMap); newMap.put(node.getId(), clusterNode); clusterNodeMap = newMap; } } } node.setClusterNode(clusterNode); /* * if context origin is set, we should get or create a new {@link Node} of * the specific origin. */ if (!"".equals(context.getOrigin())) { Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin()); context.getCurEntry().setOriginNode(originNode); } // 执行下一Slot处理 fireEntry(context, resourceWrapper, node, count, prioritized, args); } // exit()是Slot处理结束的出口 @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } /** * Get {@link ClusterNode} of the resource of the specific type. * * @param id resource name. * @param type invoke type. * @return the {@link ClusterNode} */ public static ClusterNode getClusterNode(String id, EntryType type) { return clusterNodeMap.get(new StringResourceWrapper(id, type)); } /** * Get {@link ClusterNode} of the resource name. * * @param id resource name. * @return the {@link ClusterNode}. */ public static ClusterNode getClusterNode(String id) { if (id == null) { return null; } ClusterNode clusterNode = null; for (EntryType nodeType : EntryType.values()) { clusterNode = clusterNodeMap.get(new StringResourceWrapper(id, nodeType)); if (clusterNode != null) { break; } } return clusterNode; } /** * Get {@link ClusterNode}s map, this map holds all {@link ClusterNode}s, it's key is resource name, * value is the related {@link ClusterNode}. <br/> * DO NOT MODIFY the map returned. * * @return all {@link ClusterNode}s */ public static Map<ResourceWrapper, ClusterNode> getClusterNodeMap() { return clusterNodeMap; } /** * Reset all {@link ClusterNode}s. Reset is needed when {@link IntervalProperty#INTERVAL} or * {@link SampleCountProperty#SAMPLE_COUNT} is changed. */ public static void resetClusterNodes() { for (ClusterNode node : clusterNodeMap.values()) { node.reset(); } } }

     

    LogSlot

    LogSlot负责记录处理中产生的Exceptions,从而提供具体的日志已进行故障排除。

    源码讲解:

    package com.alibaba.csp.sentinel.slots.logger; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.spi.SpiOrder; /** * LogSlot负责记录处理中产生的Exceptions,从而提供具体的日志已进行故障排除。 * * LogSlot后节点都是Sentinel规则处理节点,会产生BlockExceptions,LogSlot负责对异常进行处理。 */ @SpiOrder(-8000) public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> { // entry()是Slot核心逻辑的入口 @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args) throws Throwable { try { // 直接调用后续Slot节点,并捕获过程中产生的异常进行处理。 fireEntry(context, resourceWrapper, obj, count, prioritized, args); } catch (BlockException e) { // 对于Sentinel规则触发的BlockException,按照标准格式记录日志,并向上继续抛出异常。 EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(), context.getOrigin(), count); throw e; } catch (Throwable e) { // 非Sentinel规则异常,记录报警信息。 RecordLog.warn("Unexpected entry exception", e); } } // exit()是Slot处理结束的出口 @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { try { // 不需要额外的资源释放逻辑,直接执行下一节点逻辑 fireExit(context, resourceWrapper, count, args); } catch (Throwable e) { RecordLog.warn("Unexpected entry exit exception", e); } } }

     

    StatisticSlot

    StatisticSlot 是 Sentinel 的核心功能插槽之一,用于统计实时的调用数据。

    clusterNode:资源唯一标识的 ClusterNode 的实时统计

    origin:根据来自不同调用者的统计信息

    defaultnode: 根据入口上下文区分的资源 ID 的 runtime 统计

    入口流量的统计

     

    StatisticSlot 是 Sentinel 最为重要的类之一,用于根据规则判断结果进行相应的统计操作。

    entry 的时候:依次执行后面的判断 slot。每个 slot 触发流控的话会抛出异常(BlockException 的子类)。若有 BlockException 抛出,则记录 block 数据;若无异常抛出则算作可通过(pass),记录 pass 数据。

    exit 的时候:若无 error(无论是业务异常还是流控异常),记录 complete(success)以及 RT,线程数-1。

    记录数据的维度:线程数+1、记录当前 DefaultNode 数据、记录对应的 originNode 数据(若存在 origin)、累计 IN 统计数据(若流量类型为 IN)。

     

    Sentinel 底层采用高性能的滑动窗口数据结构 LeapArray 来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。

     

    源码讲解:

    package com.alibaba.csp.sentinel.slots.statistic; import java.util.Collection; import com.alibaba.csp.sentinel.node.Node; import com.alibaba.csp.sentinel.slotchain.ProcessorSlotEntryCallback; import com.alibaba.csp.sentinel.slotchain.ProcessorSlotExitCallback; import com.alibaba.csp.sentinel.slots.block.flow.PriorityWaitException; import com.alibaba.csp.sentinel.spi.SpiOrder; import com.alibaba.csp.sentinel.util.TimeUtil; import com.alibaba.csp.sentinel.Constants; import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.node.ClusterNode; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.block.BlockException; /** * StatisticSlot 是 Sentinel 最为重要的类之一,用于根据规则判断结果进行相应的统计操作。 * * StatisticSlot执行时需要统计以下信息: * > ClusterNode中和Resource的统计指标 * > OriginNode中和Caller的统计指标 * > DefaultNode中和Resource * Context维度的统计指标 * > 所有entrance维度的的统计数据 * * StatisticSlot记录以下统计数据: * 并发线程数; * Pass限流数; * BlockQps数; * RT; * 请求完成时间; * ExceptionQPS数; * 成功处理请求数; * * StatisticSlot统计维度: * 记录当前 DefaultNode 数据 * 记录对应的 originNode 数据(若存在 origin) * 累计 IN 统计数据(若流量类型为 IN)。 */ @SpiOrder(-7000) public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> { // entry()是Slot核心逻辑的入口 // entry 的时候:依次执行后面的判断 slot。每个 slot 触发流控的话会抛出异常(BlockException 的子类)。 // 若有 BlockException 抛出,则记录 block 数据; // 若无异常抛出则算作可通过(pass),记录 pass 数据。 @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { // 1.触发规则验证Slot:StatisticSlot之后的Slot都是规则验证节点,首先触发规则验证,在根据验证结果进行统计。 fireEntry(context, resourceWrapper, node, count, prioritized, args); // 2.无异常抛出时,说明没有触发任何规则,算作通过Sentinel验证(pass),需要统计并记录以下调用信息 // 2.1 并发线程数 + 1,pass请求数 + 1 node.increaseThreadNum(); node.addPassRequest(count); // 2.2 Origin维度下,并发线程数 + 1,pass请求数 + 1 if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } // 2.3 Inbound entry,并发线程数 + 1,pass请求数 + 1,这里是全局统计数据(所有Resource) if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { // Ocuppy时触发PriorityWaitException // 增加并发线程数的统计值 node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) { // Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e); // Add block count. // 增加被限流QPS数 node.increaseBlockQps(count); // 增加Origin维度下被限流QPS数 if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } //增加全局流量下的被限流QPS数 if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } // Handle block event with registered entry callback handlers. // 执行限流事件上注册的回调处理 for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) { // Unexpected internal error, set error to current entry. // 其它异常发生时记录ERROR context.getCurEntry().setError(e); throw e; } } // exit()是Slot处理结束的出口 // exit 的时候:若无流控异常,记录 complete(success)以及 RT,线程数-1。 @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { Node node = context.getCurNode(); // 没有发生流控异常时统计:结束时间、RT if (context.getCurEntry().getBlockError() == null) { // Calculate response time (use completeStatTime as the time of completion). // 记录请求处理完成时间completeStatTime,并计算RT long completeStatTime = TimeUtil.currentTimeMillis(); context.getCurEntry().setCompleteTimestamp(completeStatTime); long rt = completeStatTime - context.getCurEntry().getCreateTimestamp(); Throwable error = context.getCurEntry().getError(); // Record response time and success count. // 处理DefaultNode、ClusterNode、OriginNode的统计逻辑 recordCompleteFor(node, count, rt, error); recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error); if (resourceWrapper.getEntryType() == EntryType.IN) { recordCompleteFor(Constants.ENTRY_NODE, count, rt, error); } } // Handle exit event with registered exit callback handlers. Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks(); for (ProcessorSlotExitCallback handler : exitCallbacks) { handler.onExit(context, resourceWrapper, count, args); } // 触发后续exit处理,实际上StatisticSlot后续都是判断节点,没有太多的exit逻辑需要处理 fireExit(context, resourceWrapper, count); } // 记录成功请求数和RT值,减少并发线程数,并记录异常QPS // 对于BlockException:属于流控异常,被记录为block数据 // 对于ERROR:非流控异常,被统计为Success(未被限流),但同时也属于ExceptionQps private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) { if (node == null) { return; } // 记录success数和rt值 node.addRtAndSuccess(rt, batchCount); // 并发线程数 - 1 node.decreaseThreadNum(); // 发生非流控异常时,记录异常QPS数 if (error != null && !(error instanceof BlockException)) { node.increaseExceptionQps(batchCount); } } }

     

    AuthoritySlot

    黑白名单校验规则。

    源码讲解:

    package com.alibaba.csp.sentinel.slots.block.authority; import java.util.Map; import java.util.Set; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.spi.SpiOrder; /** * AuthoritySlot负责校验AuthorityRule. AuthorityRule定义了请求的黑白名单 * * 很多时候,我们需要根据调用来源来判断该次请求是否允许放行,这时候可以使用 Sentinel 的来源访问控制(黑白名单控制)的功能。 * 来源访问控制根据资源的请求来源(origin)限制资源是否通过,若配置白名单则只有请求来源位于白名单内时才可通过; * 若配置黑名单则请求来源位于黑名单时不通过,其余的请求通过。 * * 来源访问控制规则(AuthorityRule)非常简单,主要有以下配置项: * - resource:资源名,即限流规则的作用对象。 * - limitApp:对应的黑名单/白名单,不同 origin 用 , 分隔,如 appA,appB。 * - strategy:限制模式,AUTHORITY_WHITE 为白名单模式,AUTHORITY_BLACK 为黑名单模式,默认为白名单模式。 */ @SpiOrder(-6000) public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> { // entry()是Slot核心逻辑的入口 @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { // 校验黑白名单规则,触发抛出AuthorityException,结束SlotChain处理;否则执行下一节点逻辑 checkBlackWhiteAuthority(resourceWrapper, context); fireEntry(context, resourceWrapper, node, count, prioritized, args); } // exit()是Slot处理结束的出口 @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { // 不需要额外的资源释放逻辑,直接执行下一节点逻辑 fireExit(context, resourceWrapper, count, args); } void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException { // 根据Resource获取内存中加载的黑白名单规则AuthorityRule Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules(); if (authorityRules == null) { return; } Set<AuthorityRule> rules = authorityRules.get(resource.getName()); if (rules == null) { return; } // 逐条校验AuthorityRule,触发则抛出AuthorityException for (AuthorityRule rule : rules) { if (!AuthorityRuleChecker.passCheck(rule, context)) { throw new AuthorityException(context.getOrigin(), rule); } } } }

     

    SystemSlot

    这个 slot 会根据对于当前系统的整体情况,对入口资源的调用进行动态调配。其原理是让入口的流量和当前系统的预计容量达到一个动态平衡。

    注意系统规则只对入口流量起作用(调用类型为 EntryType.IN),对出口流量无效。可通过 SphU.entry(res, entryType) 指定调用类型,如果不指定,默认是EntryType.OUT。

     

    源码讲解:

    package com.alibaba.csp.sentinel.slots.system; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.spi.SpiOrder; /** * SystemSlot校验SystemRule。 * 这个 slot 会根据对于当前系统的整体情况,对入口资源的调用进行动态调配。其原理是让入口的流量和当前系统的预计容量达到一个动态平衡。 * * 注意系统规则只对入口流量起作用(调用类型为 EntryType.IN),对出口流量无效。 * 可通过 SphU.entry(res, entryType) 指定调用类型,如果不指定,默认是EntryType.OUT。 */ @SpiOrder(-5000) public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> { // entry()是Slot核心逻辑的入口 @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { // 执行SystemRule校验,失败抛出SystemBlockException(继承自BlockException) SystemRuleManager.checkSystem(resourceWrapper); // 未触发SystemRule时,执行下一节点逻辑 fireEntry(context, resourceWrapper, node, count, prioritized, args); } // exit()是Slot处理结束的出口 @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { // 不需要额外的资源释放逻辑,直接执行下一节点逻辑 fireExit(context, resourceWrapper, count, args); } }

     

    FlowSlot

    这个 slot 主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止:

    指定应用生效的规则,即针对调用方限流的;

    调用方为 other 的规则;

    调用方为 default 的规则。

     

    源码讲解:

    package com.alibaba.csp.sentinel.slots.block.flow; ​ import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.spi.SpiOrder; import com.alibaba.csp.sentinel.util.AssertUtil; import com.alibaba.csp.sentinel.util.function.Function; ​ import java.util.Collection; import java.util.List; import java.util.Map; ​ /** * <p> * Combined the runtime statistics collected from the previous * slots (NodeSelectorSlot, ClusterNodeBuilderSlot, and StatisticSlot), FlowSlot * will use pre-set rules to decide whether the incoming requests should be * blocked. * </p> * * <p> * {@code SphU.entry(resourceName)} will throw {@code FlowException} if any rule is * triggered. Users can customize their own logic by catching {@code FlowException}. * </p> * * <p> * One resource can have multiple flow rules. FlowSlot traverses these rules * until one of them is triggered or all rules have been traversed. * </p> * * <p> * Each {@link FlowRule} is mainly composed of these factors: grade, strategy, path. We * can combine these factors to achieve different effects. * </p> * * <p> * The grade is defined by the {@code grade} field in {@link FlowRule}. Here, 0 for thread * isolation and 1 for request count shaping (QPS). Both thread count and request * count are collected in real runtime, and we can view these statistics by * following command: * </p> * * <pre> * curl http://localhost:8719/tree * * idx id   thread pass blocked   success total aRt   1m-pass   1m-block   1m-all   exception * 2   abc647 0     460   46         46   1   27     630       276       897     0 * </pre> * * <ul> * <li>{@code thread} for the count of threads that is currently processing the resource</li> * <li>{@code pass} for the count of incoming request within one second</li> * <li>{@code blocked} for the count of requests blocked within one second</li> * <li>{@code success} for the count of the requests successfully handled by Sentinel within one second</li> * <li>{@code RT} for the average response time of the requests within a second</li> * <li>{@code total} for the sum of incoming requests and blocked requests within one second</li> * <li>{@code 1m-pass} is for the count of incoming requests within one minute</li> * <li>{@code 1m-block} is for the count of a request blocked within one minute</li> * <li>{@code 1m-all} is the total of incoming and blocked requests within one minute</li> * <li>{@code exception} is for the count of business (customized) exceptions in one second</li> * </ul> * * This stage is usually used to protect resources from occupying. If a resource * takes long time to finish, threads will begin to occupy. The longer the * response takes, the more threads occupy. * * Besides counter, thread pool or semaphore can also be used to achieve this. * * - Thread pool: Allocate a thread pool to handle these resource. When there is * no more idle thread in the pool, the request is rejected without affecting * other resources. * * - Semaphore: Use semaphore to control the concurrent count of the threads in * this resource. * * The benefit of using thread pool is that, it can walk away gracefully when * time out. But it also bring us the cost of context switch and additional * threads. If the incoming requests is already served in a separated thread, * for instance, a Servlet HTTP request, it will almost double the threads count if * using thread pool. * * <h3>Traffic Shaping</h3> * <p> * When QPS exceeds the threshold, Sentinel will take actions to control the incoming request, * and is configured by {@code controlBehavior} field in flow rules. * </p> * <ol> * <li>Immediately reject ({@code RuleConstant.CONTROL_BEHAVIOR_DEFAULT})</li> * <p> * This is the default behavior. The exceeded request is rejected immediately * and the FlowException is thrown * </p> * * <li>Warmup ({@code RuleConstant.CONTROL_BEHAVIOR_WARM_UP})</li> * <p> * If the load of system has been low for a while, and a large amount of * requests comes, the system might not be able to handle all these requests at * once. However if we steady increase the incoming request, the system can warm * up and finally be able to handle all the requests. * This warmup period can be configured by setting the field {@code warmUpPeriodSec} in flow rules. * </p> * * <li>Uniform Rate Limiting ({@code RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER})</li> * <p> * This strategy strictly controls the interval between requests. * In other words, it allows requests to pass at a stable, uniform rate. * </p> * <img src="https://raw.githubusercontent.com/wiki/alibaba/Sentinel/image/uniform-speed-queue.png" style="max-width: * 60%;"/> * <p> * This strategy is an implement of <a href="https://en.wikipedia.org/wiki/Leaky_bucket">leaky bucket</a>. * It is used to handle the request at a stable rate and is often used in burst traffic (e.g. message handling). * When a large number of requests beyond the system’s capacity arrive * at the same time, the system using this strategy will handle requests and its * fixed rate until all the requests have been processed or time out. * </p> * </ol> * * @author jialiang.linjl * @author Eric Zhao */ @SpiOrder(-2000) public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { ​    private final FlowRuleChecker checker; ​    public FlowSlot() {        this(new FlowRuleChecker());   } ​    /**     * Package-private for test.     *     * @param checker flow rule checker     * @since 1.6.1     */    FlowSlot(FlowRuleChecker checker) {        AssertUtil.notNull(checker, "flow checker should not be null");        this.checker = checker;   } ​    // entry()是Slot核心逻辑的入口    @Override    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,                      boolean prioritized, Object... args) throws Throwable {        // 执行规则校验        checkFlow(resourceWrapper, context, node, count, prioritized);        // 未触发限流规则,执行下一节点处理        fireEntry(context, resourceWrapper, node, count, prioritized, args);   } ​    // 校验限流规则,失败则抛出FlowException(继承自BlockException),结束SlotChain处理    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)        throws BlockException {        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);   } ​    // exit()是Slot处理结束的出口    @Override    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {        // FlowSlot结束时没有触发数据统计逻辑,直接执行下一个Slot逻辑        fireExit(context, resourceWrapper, count, args);   } ​    // 根据Resource获取内存中加载的限流规则    private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {        @Override        public Collection<FlowRule> apply(String resource) {            // Flow rule map should not be null.            Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();            return flowRules.get(resource);       }   }; }

     

    DegradeSlot

    这个 slot 主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。

     

    源码讲解:

    package com.alibaba.csp.sentinel.slots.block.degrade; ​ import java.util.List; ​ import com.alibaba.csp.sentinel.Entry; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker; import com.alibaba.csp.sentinel.spi.SpiOrder; ​ /** * DegradeSlot 主要处理熔断逻辑。 * * 这个 slot 主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。 */ @SpiOrder(-1000) public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> { ​    // entry()是Slot核心逻辑的入口    @Override    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,                      boolean prioritized, Object... args) throws Throwable {        performChecking(context, resourceWrapper);        // 未触发熔断时,执行下一节点处理。触发熔断时抛出异常,结束SlotChain处理。        fireEntry(context, resourceWrapper, node, count, prioritized, args);   } ​    void performChecking(Context context, ResourceWrapper r) throws BlockException {        // 根据Resource获取内存中加载的熔断规则        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());        if (circuitBreakers == null || circuitBreakers.isEmpty()) {            return;       }        // 逐条检验熔断规则,失败将会抛出DegradeException异常        for (CircuitBreaker cb : circuitBreakers) {            if (!cb.tryPass(context)) {                throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());           }       }   } ​    // exit()是Slot处理结束的出口    @Override    public void exit(Context context, ResourceWrapper r, int count, Object... args) {        Entry curEntry = context.getCurEntry(); ​        // 如果当前请求没有触发限流规则,也没有定义熔断规则,请求完成时会统计相关调用:请求处理结束时间、RT、是否慢调用        // 成功完成处理,没有触发限流和熔断规则时, ​        if (curEntry.getBlockError() != null) {            fireExit(context, r, count, args);            return;       }        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());        if (circuitBreakers == null || circuitBreakers.isEmpty()) {            fireExit(context, r, count, args);            return;       } ​        // 如果没有触发限流规则,请求完成时需要统计调用数据:处理结束时间、RT、是否慢调用        if (curEntry.getBlockError() == null) {            // passed request            for (CircuitBreaker circuitBreaker : circuitBreakers) {                circuitBreaker.onRequestComplete(context);           }       }        // 触发下一个Slot#exit()执行        fireExit(context, r, count, args);   } }

     

    Pt1.6 SlotChain

    SlotChain是由多个ProcessorSlot对象组成的单向链表结构。

    ProcessorSlot定义了处理节点的逻辑和指向的next节点;

    ProcessorSlotChain定义了对单向链表的处理方法,包括addFirst和addLast;

     

     

    构建SlotChain

    关于SlotChain的构建,我们从SlotChainProvider#newSlotChain方法入手,源码如下

    package com.alibaba.csp.sentinel.slotchain; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder; import com.alibaba.csp.sentinel.util.SpiLoader; public final class SlotChainProvider { // volatile保证slotChainBuilder的内存可见性 private static volatile SlotChainBuilder slotChainBuilder = null; /** * The load and pick process is not thread-safe, but it's okay since the method should be only invoked * via {@code lookProcessChain} in {@link com.alibaba.csp.sentinel.CtSph} under lock. * * @return new created slot chain */ // 新建SlotChain,方法本身不是线程安全的,但是在lookProcessChain()中操作是线程安全的。 public static ProcessorSlotChain newSlotChain() { if (slotChainBuilder != null) { return slotChainBuilder.build(); } // 如果slotChainBuilder未初始化,使用SPI的方式加载 // 加载内容在 META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class); if (slotChainBuilder == null) { // Should not go through here. RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); slotChainBuilder = new DefaultSlotChainBuilder(); } else { RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", slotChainBuilder.getClass().getCanonicalName()); } return slotChainBuilder.build(); } private SlotChainProvider() {} }

     

    最关键的是 slotChainBuilder#build里的构建逻辑。

    public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); // 使用SPI方式获取默认的SlotChain定义 List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class); for (ProcessorSlot slot : sortedSlotList) { // 如果是自定义Slot,需要实现AbstractLinkedProcessorSlot才可以 if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } // 返回SlotChain对象 chain.addLast((AbstractLinkedProcessorSlot<?>) slot); } return chain; } }

     

    Sentinel是用SPI的方式来获取并构建SlotChain,我们继续往下,看看SpiLoader#loadPrototypeInstanceListSorted这里有什么特别的地方。

    /** * Load the sorted and prototype SPI instance list for provided SPI interface. * * Note: each call return different instances, i.e. prototype instance, not singleton instance. * * @param clazz class of the SPI * @param <T> SPI type * @return sorted and different SPI instance list * @since 1.7.2 */ // 使用SPI来加载SlotChain public static <T> List<T> loadPrototypeInstanceListSorted(Class<T> clazz) { try { // 使用SPI来加载服务,这里参数是ProcessorSlot.class,对应的service名称就是clazz对应的全路径。 // SPI要加载的内容是定义在\src\main\resources\META-INF\services目录下。 // 可以看到该文件下定义了以下Slot服务: // com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot(@SpiOrder(-10000)) // com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot(@SpiOrder(-9000)) // com.alibaba.csp.sentinel.slots.logger.LogSlot(@SpiOrder(-8000)) // com.alibaba.csp.sentinel.slots.statistic.StatisticSlot(@SpiOrder(-7000)) // com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot(@SpiOrder(-6000)) // com.alibaba.csp.sentinel.slots.system.SystemSlot(@SpiOrder(-5000)) // com.alibaba.csp.sentinel.slots.block.flow.FlowSlot(@SpiOrder(-2000)) // com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot(@SpiOrder(-1000)) ServiceLoader<T> serviceLoader = ServiceLoaderUtil.getServiceLoader(clazz); // orderWrappers是根据Slot对象定义的Order值来升序排列, 所以SlotChain的调用链顺序是: // NodeSelectorSlot > ClusterBuilderSlot > LogSlot > StatisticSlot > AuthoritySlot > // SystemSlot > FlowSlot > DegradeSlot List<SpiOrderWrapper<T>> orderWrappers = new ArrayList<>(); for (T spi : serviceLoader) { // 获取Slot注解定义的SPIOrder值 int order = SpiOrderResolver.resolveOrder(spi); // Since SPI is lazy initialized in ServiceLoader, we use online sort algorithm here. SpiOrderResolver.insertSorted(orderWrappers, spi, order); RecordLog.debug("[SpiLoader] Found {} SPI: {} with order {}", clazz.getSimpleName(), spi.getClass().getCanonicalName(), order); } // 返回List<ProcessorSlot>对象 List<T> list = new ArrayList<>(orderWrappers.size()); for (int i = 0; i < orderWrappers.size(); i++) { list.add(orderWrappers.get(i).spi); } return list; } catch (Throwable t) { RecordLog.error("[SpiLoader] ERROR: loadPrototypeInstanceListSorted failed", t); t.printStackTrace(); return new ArrayList<>(); } }

     

    可以看到,Sentinel使用SPI来加载SlotChain,并根据SPI_ORDER来规定Chain中的Slot的加载顺序。注释中的文字部分是我从各个Slot源码中捞出来的定义,里面包含了SpiOrder注解,编排了Slot的加载顺序。

    我们再来看看SPI的定义,SPI是放在\src\main\resources\META-INF\services目录下,SpiLoader#loadPrototypeInstanceListSorted加载的是ProcessorSlot.class对象,全路径就是com.alibaba.csp.sentinel.slotchain.ProcessorSlot,所以在\services目录下找到全路径定义的文件,里面是SPI定义。

    # Sentinel default ProcessorSlots com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot com.alibaba.csp.sentinel.slots.logger.LogSlot com.alibaba.csp.sentinel.slots.statistic.StatisticSlot com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot com.alibaba.csp.sentinel.slots.system.SystemSlot com.alibaba.csp.sentinel.slots.block.flow.FlowSlot com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

    根据SPI定义的类路径,按照一定顺序加载Slot并加入到SlotChain中。

     

    Pt2 滑动窗口

    Sentinel使用滑动窗口来保存实时统计数据,主要有以下几个核心类:

    ArrayMetric:Sentinel用于保存滑动窗口的对象类,存储了滑动窗口数据LeapArray<MetricBucket>,和对滑动窗口的操作行为;

    LeapArray:滑动窗口的核心数据结构,包含了滑动窗口的定义和实例数据AtomicReferenceArray<WindowWrap<T>>;

    WindowWrap:滑动窗口对象,定义了滑动窗口起始时间和存放的统计数据;

    MetricBucket:滑动窗口存放数据的实例数据结构,包含了各种指标的数据;

    MetricEvent:Sentinel中实时统计的几个数据指标;

     

    Sentinel的滑动窗口如下图所示,假定滑动窗口的长度为1000ms,Sentinel等于是在绝得的时间轴上规划了一个如下的滑动窗口轴:

     

    非常简单,我们直接看源码的实现,里面有详细的讲解。我们按照从前往后的顺序来说明类定义。

     

    Pt2.1 MetricEvent

    // 定义了Metric中几个数据统计指标维度 public enum MetricEvent { PASS, BLOCK, EXCEPTION, SUCCESS, RT, OCCUPIED_PASS }

     

    Pt2.2 MetricBucket

    package com.alibaba.csp.sentinel.slots.statistic.data; import com.alibaba.csp.sentinel.config.SentinelConfig; import com.alibaba.csp.sentinel.slots.statistic.MetricEvent; import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; // MetricBucket用于保存一定时间段的统计指标数据 public class MetricBucket { // 存放各维度指标统计数据,数据长度为MetricEvent枚举种类数 private final LongAdder[] counters; // 系统自保护策略能容忍的最大RT private volatile long minRt; public MetricBucket() { // MetricEvent保存了统计指标信息维度:PASS,BLOCK,EXCEPTION,SUCCESS,RT,OCCUPIED_PASS MetricEvent[] events = MetricEvent.values(); // 初始化counters变量:使用LongAdder分别保存各个统计指标维度的数据 this.counters = new LongAdder[events.length]; for (MetricEvent event : events) { // event.ordinal()是根据各个枚举在类中定义顺序作为数组下标 counters[event.ordinal()] = new LongAdder(); } // 从配置中加载[系统自保护策略能容忍的最大RT] initMinRt(); } // 重置统计数据为指定的bucket存储内容 public MetricBucket reset(MetricBucket bucket) { for (MetricEvent event : MetricEvent.values()) { counters[event.ordinal()].reset(); counters[event.ordinal()].add(bucket.get(event)); } initMinRt(); return this; } private void initMinRt() { this.minRt = SentinelConfig.statisticMaxRt(); } // 重置统计数据 public MetricBucket reset() { for (MetricEvent event : MetricEvent.values()) { counters[event.ordinal()].reset(); } initMinRt(); return this; } public long get(MetricEvent event) { return counters[event.ordinal()].sum(); } public MetricBucket add(MetricEvent event, long n) { counters[event.ordinal()].add(n); return this; } public long pass() { return get(MetricEvent.PASS); } public long occupiedPass() { return get(MetricEvent.OCCUPIED_PASS); } public long block() { return get(MetricEvent.BLOCK); } public long exception() { return get(MetricEvent.EXCEPTION); } public long rt() { return get(MetricEvent.RT); } public long minRt() { return minRt; } public long success() { return get(MetricEvent.SUCCESS); } public void addPass(int n) { add(MetricEvent.PASS, n); } public void addOccupiedPass(int n) { add(MetricEvent.OCCUPIED_PASS, n); } public void addException(int n) { add(MetricEvent.EXCEPTION, n); } public void addBlock(int n) { add(MetricEvent.BLOCK, n); } public void addSuccess(int n) { add(MetricEvent.SUCCESS, n); } public void addRT(long rt) { add(MetricEvent.RT, rt); // Not thread-safe, but it's okay. if (rt < minRt) { minRt = rt; } } @Override public String toString() { return "p: " + pass() + ", b: " + block() + ", w: " + occupiedPass(); } }

     

    Pt2.3 WindowWrap

    package com.alibaba.csp.sentinel.slots.statistic.base; // 滑动窗口对象 public class WindowWrap<T> { // 滑动窗口时间长度 private final long windowLengthInMs; // 滑动窗口起始时间 private long windowStart; // 记录统计数据 private T value; /** * @param windowLengthInMs a single window bucket's time length in milliseconds. * @param windowStart the start timestamp of the window * @param value statistic data */ public WindowWrap(long windowLengthInMs, long windowStart, T value) { this.windowLengthInMs = windowLengthInMs; this.windowStart = windowStart; this.value = value; } public long windowLength() { return windowLengthInMs; } public long windowStart() { return windowStart; } public T value() { return value; } public void setValue(T value) { this.value = value; } /** * Reset start timestamp of current bucket to provided time. * * @param startTime valid start timestamp * @return bucket after reset */ public WindowWrap<T> resetTo(long startTime) { this.windowStart = startTime; return this; } /** * Check whether given timestamp is in current bucket. * * @param timeMillis valid timestamp in ms * @return true if the given time is in current bucket, otherwise false * @since 1.5.0 */ public boolean isTimeInWindow(long timeMillis) { return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs; } @Override public String toString() { return "WindowWrap{" + "windowLengthInMs=" + windowLengthInMs + ", windowStart=" + windowStart + ", value=" + value + '}'; } }

     

    Pt2.4 LeapArray

    package com.alibaba.csp.sentinel.slots.statistic.base; ​ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.locks.ReentrantLock; ​ import com.alibaba.csp.sentinel.util.AssertUtil; import com.alibaba.csp.sentinel.util.TimeUtil; ​ /** * LeapArray使用滑动窗口算法来进行统计数据, 是Sentinel实时数据统计的数据存储结构。 * * LeapArray中包含了对滑动窗口的定义标准: * > 滑动窗口的时间间隔windowLengthInMs * > 历时总的滑动窗口长度intervalInMs * > 记录的滑动窗口总数sampleCount * > 滑动窗口样本数据array,新的滑动窗口数据会不断覆盖老的,最终只会保留最新的sampleCount个滑动窗口数据 * * array中数据的格式为WindowWrap<MetricBucket>: * > WindowWrap是一个滑动窗口的实例,保存了滑动窗口的起始位置和其中的数据; * > MetricBucket就是滑动窗口的数据,MetricBucket统计的是windowLengthInMs的时间长度数据 * > bucket总数量sampleCount = intervalInMs / windowLengthInMs * * Sentinel的滑动窗口是固定位置的滑动窗口, 相当于在绝对的时间轴位置上,根据滑动窗口的大小,划分出相对的滑动窗口位置。 * 举例来说,比如滑动窗口的大小是1000ms,那么1分钟=60 * 1000ms,也就是固定有60个滑动窗口。那么5501ms/5546ms/5599都会落到第56个滑动窗口。 * 所以只要确定了滑动窗口的大小,等于所有时间对应的滑动窗口位置就固定了。 * 这个设计还是比较简单容易理解的。这里要区别实时位置的滑动窗口,开始我理解是一个滑动窗口是当前时间往前推固定窗口大小长度的时间,所以很多地方的逻辑觉得很奇怪。 */ public abstract class LeapArray<T> { ​    // Bucket覆盖到的滑动窗口时间长度    protected int windowLengthInMs;    // 总的滑动窗口长度    protected int intervalInMs;    // Bucket样本总数(每个Bucket是固定滑动窗口的统计值)    protected int sampleCount;    // 总的滑动窗口长度(Second)    private double intervalInSecond;    // 保存多个滑动窗口数据,具体保存多少个滑动窗口是根据sampleCount来初始化的。    // array存放的是最新的sampleCount个滑动窗口数据    protected final AtomicReferenceArray<WindowWrap<T>> array; ​    /**     * The conditional (predicate) update lock is used only when current bucket is deprecated.     */    private final ReentrantLock updateLock = new ReentrantLock(); ​    /**     * The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.     *     * @param sampleCount bucket count of the sliding window     * @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds     */    public LeapArray(int sampleCount, int intervalInMs) {        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided"); ​        this.windowLengthInMs = intervalInMs / sampleCount;        this.intervalInMs = intervalInMs;        this.intervalInSecond = intervalInMs / 1000.0;        this.sampleCount = sampleCount; ​        this.array = new AtomicReferenceArray<>(sampleCount);   } ​    /**     * Get the bucket at current timestamp.     *     * @return the bucket at current timestamp     */    // 获取当前时间戳的滑动窗口    public WindowWrap<T> currentWindow() {        return currentWindow(TimeUtil.currentTimeMillis());   } ​    /**     * Create a new statistic value for bucket.     *     * @param timeMillis current time in milliseconds     * @return the new empty bucket     */    public abstract T newEmptyBucket(long timeMillis); ​    /**     * Reset given bucket to provided start time and reset the value.     *     * @param startTime the start time of the bucket in milliseconds     * @param windowWrap current bucket     * @return new clean bucket at given start time     */    protected abstract WindowWrap<T> resetWindowTo(WindowWrap<T> windowWrap, long startTime); ​    // 根据timeMillis计算出滑动窗口的位置和在array中的存储下标    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {        // timeId是当前时间戳对应的滑动窗口位置        // 相当于自1970/1/1起(时间戳的起点),根据windowLengthInMs划分出所有滑动窗口的位置,而根据当前        // timeMillis就可以计算出位于哪个滑动窗口中。        long timeId = timeMillis / windowLengthInMs;        // array存放的是固定数量的滑动窗口的实例,timeId % array.length会不停更新array        // 中滑动窗口数据,保存的是最近连续的sampleCount个滑动窗口        return (int)(timeId % array.length());   } ​    // 计算当前窗口的位置    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {        return timeMillis - timeMillis % windowLengthInMs;   } ​    /**     * 功能说明:     * 根据时间戳计算对应滑动窗口。     * 这块是滑动窗口处理的核心逻辑,关于滑动窗口如何计算、保存和定位。     *     * 我们以实际例子来说明对滑动窗口的操作:     * > 根据TimeUtil.currentTimeMillis()计算出来的当前时间戳timeMillis=1601434107798     * > 滑动窗口的长度为windowLengthInMs=1s=1000ms     * > 滑动窗口样本总数sampleCount=10     * > 滑动窗口总长度intervalInMs=10 * 1000ms     */    public WindowWrap<T> currentWindow(long timeMillis) {        if (timeMillis < 0) {            return null;       } ​        // 1.计算timeMillis所在的滑动窗口位置        int idx = calculateTimeIdx(timeMillis);        // 2.计算滑动窗口的起始时间位置        long windowStart = calculateWindowStart(timeMillis); ​        /*         * 3.根据timeMillis计算的滑动窗口信息,和array[idx]位置的数据比较,有以下3种情况:         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.         * (2) Bucket is up-to-date, then just return the bucket.         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.         */        // 源码中的注释非常生动易懂,这里就不画蛇添足了。        while (true) {            WindowWrap<T> old = array.get(idx);            if (old == null) {                /*                 *     B0       B1     B2   NULL     B4                 * ||_______|_______|_______|_______|_______||___                 * 200     400     600     800     1000   1200 timestamp                 *                             ^                 *                         time=888                 *           bucket is empty, so create new and update                 *                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},                 * then try to update circular array via a CAS operation. Only one thread can                 * succeed to update, while other threads yield its time slice.                 */                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));                if (array.compareAndSet(idx, null, window)) {                    // Successfully updated, return the created bucket.                    return window;               } else {                    // Contention failed, the thread will yield its time slice to wait for bucket available.                    Thread.yield();               }           } else if (windowStart == old.windowStart()) {                /*                 *     B0       B1     B2     B3     B4                 * ||_______|_______|_______|_______|_______||___                 * 200     400     600     800     1000   1200 timestamp                 *                             ^                 *                         time=888                 *           startTime of Bucket 3: 800, so it's up-to-date                 *                 * If current {@code windowStart} is equal to the start timestamp of old bucket,                 * that means the time is within the bucket, so directly return the bucket.                 */                return old;           } else if (windowStart > old.windowStart()) {                /*                 *   (old)                 *             B0       B1     B2   NULL     B4                 * |_______||_______|_______|_______|_______|_______||___                 * ...   1200     1400   1600   1800   2000   2200 timestamp                 *                             ^                 *                           time=1676                 *         startTime of Bucket 2: 400, deprecated, should be reset                 *                 * If the start timestamp of old bucket is behind provided time, that means                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.                 * Note that the reset and clean-up operations are hard to be atomic,                 * so we need a update lock to guarantee the correctness of bucket update.                 *                 * The update lock is conditional (tiny scope) and will take effect only when                 * bucket is deprecated, so in most cases it won't lead to performance loss.                 */                if (updateLock.tryLock()) {                    try {                        // Successfully get the update lock, now we reset the bucket.                        return resetWindowTo(old, windowStart);                   } finally {                        updateLock.unlock();                   }               } else {                    // Contention failed, the thread will yield its time slice to wait for bucket available.                    Thread.yield();               }           } else if (windowStart < old.windowStart()) {                // Should not go through here, as the provided time is already behind.                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));           }       }   } ​    /**     * Get the previous bucket item before provided timestamp.     *     * @param timeMillis a valid timestamp in milliseconds     * @return the previous bucket item before provided timestamp     */    public WindowWrap<T> getPreviousWindow(long timeMillis) {        if (timeMillis < 0) {            return null;       }        int idx = calculateTimeIdx(timeMillis - windowLengthInMs);        timeMillis = timeMillis - windowLengthInMs;        WindowWrap<T> wrap = array.get(idx); ​        if (wrap == null || isWindowDeprecated(wrap)) {            return null;       } ​        if (wrap.windowStart() + windowLengthInMs < (timeMillis)) {            return null;       } ​        return wrap;   } ​    /**     * Get the previous bucket item for current timestamp.     *     * @return the previous bucket item for current timestamp     */    public WindowWrap<T> getPreviousWindow() {        return getPreviousWindow(TimeUtil.currentTimeMillis());   } ​    /**     * Get statistic value from bucket for provided timestamp.     *     * @param timeMillis a valid timestamp in milliseconds     * @return the statistic value if bucket for provided timestamp is up-to-date; otherwise null     */    public T getWindowValue(long timeMillis) {        if (timeMillis < 0) {            return null;       }        int idx = calculateTimeIdx(timeMillis); ​        WindowWrap<T> bucket = array.get(idx); ​        if (bucket == null || !bucket.isTimeInWindow(timeMillis)) {            return null;       } ​        return bucket.value();   } ​    /**     * Check if a bucket is deprecated, which means that the bucket     * has been behind for at least an entire window time span.     *     * @param windowWrap a non-null bucket     * @return true if the bucket is deprecated; otherwise false     */    public boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) {        return isWindowDeprecated(TimeUtil.currentTimeMillis(), windowWrap);   } ​    // 判断当前滑动窗口对象是否有效,判断依据是滑动窗口的起始时间在intervalInMs有效时间内    public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {        return time - windowWrap.windowStart() > intervalInMs;   } ​    /**     * Get valid bucket list for entire sliding window.     * The list will only contain "valid" buckets.     *     * @return valid bucket list for entire sliding window.     */    public List<WindowWrap<T>> list() {        return list(TimeUtil.currentTimeMillis());   } ​    public List<WindowWrap<T>> list(long validTime) {        int size = array.length();        List<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>(size); ​        for (int i = 0; i < size; i++) {            WindowWrap<T> windowWrap = array.get(i);            if (windowWrap == null || isWindowDeprecated(validTime, windowWrap)) {                continue;           }            result.add(windowWrap);       } ​        return result;   } ​    /**     * Get all buckets for entire sliding window including deprecated buckets.     *     * @return all buckets for entire sliding window     */    public List<WindowWrap<T>> listAll() {        int size = array.length();        List<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>(size); ​        for (int i = 0; i < size; i++) {            WindowWrap<T> windowWrap = array.get(i);            if (windowWrap == null) {                continue;           }            result.add(windowWrap);       } ​        return result;   } ​    /**     * Get aggregated value list for entire sliding window.     * The list will only contain value from "valid" buckets.     *     * @return aggregated value list for entire sliding window     */    public List<T> values() {        return values(TimeUtil.currentTimeMillis());   } ​    // 聚合所有intervalInMs时间内有效的滑动窗口数据    public List<T> values(long timeMillis) {        if (timeMillis < 0) {            return new ArrayList<T>();       }        int size = array.length();        List<T> result = new ArrayList<T>(size); ​        for (int i = 0; i < size; i++) {            WindowWrap<T> windowWrap = array.get(i);            if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {                continue;           }            result.add(windowWrap.value());       }        return result;   } ​    /**     * Get the valid "head" bucket of the sliding window for provided timestamp.     * Package-private for test.     *     * @param timeMillis a valid timestamp in milliseconds     * @return the "head" bucket if it exists and is valid; otherwise null     */    WindowWrap<T> getValidHead(long timeMillis) {        // Calculate index for expected head time.        int idx = calculateTimeIdx(timeMillis + windowLengthInMs); ​        WindowWrap<T> wrap = array.get(idx);        if (wrap == null || isWindowDeprecated(wrap)) {            return null;       } ​        return wrap;   } ​    /**     * Get the valid "head" bucket of the sliding window at current timestamp.     *     * @return the "head" bucket if it exists and is valid; otherwise null     */    public WindowWrap<T> getValidHead() {        return getValidHead(TimeUtil.currentTimeMillis());   } ​    /**     * Get sample count (total amount of buckets).     *     * @return sample count     */    public int getSampleCount() {        return sampleCount;   } ​    /**     * Get total interval length of the sliding window in milliseconds.     *     * @return interval in second     */    public int getIntervalInMs() {        return intervalInMs;   } ​    /**     * Get total interval length of the sliding window.     *     * @return interval in second     */    public double getIntervalInSecond() {        return intervalInSecond;   } ​    public void debug(long time) {        StringBuilder sb = new StringBuilder();        List<WindowWrap<T>> lists = list(time);        sb.append("Thread_").append(Thread.currentThread().getId()).append("_");        for (WindowWrap<T> window : lists) {            sb.append(window.windowStart()).append(":").append(window.value().toString());       }        System.out.println(sb.toString());   } ​    public long currentWaiting() {        // TODO: default method. Should remove this later.        return 0;   } ​    public void addWaiting(long time, int acquireCount) {        // Do nothing by default.        throw new UnsupportedOperationException();   } }

     

    Pt2.5 ArrayMetric

    package com.alibaba.csp.sentinel.slots.statistic.metric; ​ import java.util.ArrayList; import java.util.List; ​ import com.alibaba.csp.sentinel.config.SentinelConfig; import com.alibaba.csp.sentinel.node.metric.MetricNode; import com.alibaba.csp.sentinel.slots.statistic.MetricEvent; import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket; import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; import com.alibaba.csp.sentinel.slots.statistic.metric.occupy.OccupiableBucketLeapArray; import com.alibaba.csp.sentinel.util.function.Predicate; ​ /** * The basic metric class in Sentinel using a {@link BucketLeapArray} internal. * * @author jialiang.linjl * @author Eric Zhao */ public class ArrayMetric implements Metric { ​    private final LeapArray<MetricBucket> data; ​    public ArrayMetric(int sampleCount, int intervalInMs) {        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);   } ​    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {        // 不管使用哪个子类初始化,最终是LeapArray<MetricBucket>数据类型        if (enableOccupy) {            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);       } else {            this.data = new BucketLeapArray(sampleCount, intervalInMs);       }   } ​    /**     * For unit test.     */    public ArrayMetric(LeapArray<MetricBucket> array) {        this.data = array;   } ​    // 获取intervalInMs时间内的滑动窗口的成功请求数之和    @Override    public long success() {        data.currentWindow();        long success = 0; ​        List<MetricBucket> list = data.values();        for (MetricBucket window : list) {            success += window.success();       }        return success;   } ​    // 获取intervalInMs时间内的滑动窗口的最大成功请求数    @Override    public long maxSuccess() {        data.currentWindow();        long success = 0; ​        List<MetricBucket> list = data.values();        for (MetricBucket window : list) {            if (window.success() > success) {                success = window.success();           }       }        return Math.max(success, 1);   } ​    // 获取intervalInMs时间内的滑动窗口的异常请求数之和    @Override    public long exception() {        data.currentWindow();        long exception = 0;        List<MetricBucket> list = data.values();        for (MetricBucket window : list) {            exception += window.exception();       }        return exception;   } ​    // 获取intervalInMs时间内的滑动窗口的被限流请求数之和    @Override    public long block() {        data.currentWindow();        long block = 0;        List<MetricBucket> list = data.values();        for (MetricBucket window : list) {            block += window.block();       }        return block;   } ​    // 获取intervalInMs时间内的滑动窗口的通过请求数之和    @Override    public long pass() {        data.currentWindow();        long pass = 0;        List<MetricBucket> list = data.values(); ​        for (MetricBucket window : list) {            pass += window.pass();       }        return pass;   } ​    // 获取intervalInMs时间内滑动窗口的正在处理中的请求数之和    @Override    public long occupiedPass() {        data.currentWindow();        long pass = 0;        List<MetricBucket> list = data.values();        for (MetricBucket window : list) {            pass += window.occupiedPass();       }        return pass;   } ​    // 获取intervalInMs时间内的滑动窗口的RT之和    @Override    public long rt() {        data.currentWindow();        long rt = 0;        List<MetricBucket> list = data.values();        for (MetricBucket window : list) {            rt += window.rt();       }        return rt;   } ​    // 获取intervalInMs时间内的滑动窗口的最小RT    @Override    public long minRt() {        data.currentWindow();        long rt = SentinelConfig.statisticMaxRt();        List<MetricBucket> list = data.values();        for (MetricBucket window : list) {            if (window.minRt() < rt) {                rt = window.minRt();           }       } ​        return Math.max(1, rt);   } ​    @Override    public List<MetricNode> details() {        List<MetricNode> details = new ArrayList<>();        data.currentWindow();        List<WindowWrap<MetricBucket>> list = data.list();        for (WindowWrap<MetricBucket> window : list) {            if (window == null) {                continue;           } ​            details.add(fromBucket(window));       } ​        return details;   } ​    @Override    public List<MetricNode> detailsOnCondition(Predicate<Long> timePredicate) {        List<MetricNode> details = new ArrayList<>();        data.currentWindow();        List<WindowWrap<MetricBucket>> list = data.list();        for (WindowWrap<MetricBucket> window : list) {            if (window == null) {                continue;           }            if (timePredicate != null && !timePredicate.test(window.windowStart())) {                continue;           } ​            details.add(fromBucket(window));       } ​        return details;   } ​    private MetricNode fromBucket(WindowWrap<MetricBucket> wrap) {        MetricNode node = new MetricNode();        node.setBlockQps(wrap.value().block());        node.setExceptionQps(wrap.value().exception());        node.setPassQps(wrap.value().pass());        long successQps = wrap.value().success();        node.setSuccessQps(successQps);        if (successQps != 0) {            node.setRt(wrap.value().rt() / successQps);       } else {            node.setRt(wrap.value().rt());       }        node.setTimestamp(wrap.windowStart());        node.setOccupiedPassQps(wrap.value().occupiedPass());        return node;   } ​    @Override    public MetricBucket[] windows() {        data.currentWindow();        return data.values().toArray(new MetricBucket[0]);   } ​    @Override    public void addException(int count) {        WindowWrap<MetricBucket> wrap = data.currentWindow();        wrap.value().addException(count);   } ​    @Override    public void addBlock(int count) {        WindowWrap<MetricBucket> wrap = data.currentWindow();        wrap.value().addBlock(count);   } ​    @Override    public void addWaiting(long time, int acquireCount) {        data.addWaiting(time, acquireCount);   } ​    @Override    public void addOccupiedPass(int acquireCount) {        WindowWrap<MetricBucket> wrap = data.currentWindow();        wrap.value().addOccupiedPass(acquireCount);   } ​    @Override    public void addSuccess(int count) {        WindowWrap<MetricBucket> wrap = data.currentWindow();        wrap.value().addSuccess(count);   } ​    @Override    public void addPass(int count) {        WindowWrap<MetricBucket> wrap = data.currentWindow();        wrap.value().addPass(count);   } ​    @Override    public void addRT(long rt) {        WindowWrap<MetricBucket> wrap = data.currentWindow();        wrap.value().addRT(rt);   } ​    @Override    public void debug() {        data.debug(System.currentTimeMillis());   } ​    @Override    public long previousWindowBlock() {        data.currentWindow();        WindowWrap<MetricBucket> wrap = data.getPreviousWindow();        if (wrap == null) {            return 0;       }        return wrap.value().block();   } ​    @Override    public long previousWindowPass() {        data.currentWindow();        WindowWrap<MetricBucket> wrap = data.getPreviousWindow();        if (wrap == null) {            return 0;       }        return wrap.value().pass();   } ​    public void add(MetricEvent event, long count) {        data.currentWindow().value().add(event, count);   } ​    public long getCurrentCount(MetricEvent event) {        return data.currentWindow().value().get(event);   } ​    /**     * Get total sum for provided event in {@code intervalInSec}.     *     * @param event event to calculate     * @return total sum for event     */    public long getSum(MetricEvent event) {        data.currentWindow();        long sum = 0; ​        List<MetricBucket> buckets = data.values();        for (MetricBucket bucket : buckets) {            sum += bucket.get(event);       }        return sum;   } ​    /**     * Get average count for provided event per second.     *     * @param event event to calculate     * @return average count per second for event     */    public double getAvg(MetricEvent event) {        return getSum(event) / data.getIntervalInSecond();   } ​    @Override    public long getWindowPass(long timeMillis) {        MetricBucket bucket = data.getWindowValue(timeMillis);        if (bucket == null) {            return 0L;       }        return bucket.pass();   } ​    @Override    public long waiting() {        return data.currentWaiting();   } ​    @Override    public double getWindowIntervalInSec() {        return data.getIntervalInSecond();   } ​    @Override    public int getSampleCount() {        return data.getSampleCount();   } }

     

    Pt3 限流源码逻辑

    Pt3.1 示例FlowQpsDemo

    public class FlowQpsDemo { private static final String KEY = "abc"; private static AtomicInteger pass = new AtomicInteger(); private static AtomicInteger block = new AtomicInteger(); private static AtomicInteger total = new AtomicInteger(); private static volatile boolean stop = false; private static final int threadCount = 1; private static int seconds = 60 + 40; public static void main(String[] args) throws Exception { initFlowQpsRule(); simulateTraffic(); } // 1、定义限流规则; private static void initFlowQpsRule() { List<FlowRule> rules = new ArrayList<FlowRule>(); FlowRule rule1 = new FlowRule(); rule1.setResource(KEY); rule1.setCount(20); rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); rule1.setLimitApp("default"); rules.add(rule1); FlowRuleManager.loadRules(rules); } private static void simulateTraffic() { while (!stop) { Entry entry = null; try { // 2、资源定义并执行Slot处理 entry = SphU.entry(KEY); pass.addAndGet(1); } catch (BlockException e1) { // 3、限流处理 block.incrementAndGet(); } catch (Exception e2) { } finally { total.incrementAndGet(); if (entry != null) { // 4、释放资源 entry.exit(); } } Random random2 = new Random(); try { TimeUnit.MILLISECONDS.sleep(random2.nextInt(50)); } catch (InterruptedException e) { } } } }

     

    Pt3.2 代码流程说明

    从上面的示例可以看出,Sentinel处理可以分为几个步骤:

    定义并加载限流规则到内存中;

    定义资源并进行规则校验(Slot节点);

    限流处置;

    释放资源;

     

    Pt3.3 定义并加载规则

    首先定义FlowRule的逻辑

       private static void initFlowQpsRule() {        List<FlowRule> rules = new ArrayList<FlowRule>();        FlowRule rule1 = new FlowRule();        rule1.setResource(KEY);        // set limit qps to 20        rule1.setCount(20);        rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);        rule1.setLimitApp("default");        rules.add(rule1);        FlowRuleManager.loadRules(rules);   }

     

    然后我们看 FlowRuleManager#loadRules是怎么处理的。

    /* * Copyright 1999-2018 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.csp.sentinel.slots.block.flow; ​ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; ​ import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.util.AssertUtil; import com.alibaba.csp.sentinel.util.StringUtil; import com.alibaba.csp.sentinel.node.metric.MetricTimerListener; import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; import com.alibaba.csp.sentinel.property.PropertyListener; import com.alibaba.csp.sentinel.property.SentinelProperty; ​ /** * <p> * One resources can have multiple rules. And these rules take effects in the following order: * <ol> * <li>requests from specified caller</li> * <li>no specified caller</li> * </ol> * </p> * * @author jialiang.linjl * @author Eric Zhao */ public class FlowRuleManager { ​    private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>(); ​    private static final FlowPropertyListener LISTENER = new FlowPropertyListener();    private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>(); ​    @SuppressWarnings("PMD.ThreadPoolCreationRule")    private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,        new NamedThreadFactory("sentinel-metrics-record-task", true)); ​    static {        // 添加Lisnen监听逻辑        currentProperty.addListener(LISTENER);        SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS);   } ​    /**     * Listen to the {@link SentinelProperty} for {@link FlowRule}s. The property is the source of {@link FlowRule}s.     * Flow rules can also be set by {@link #loadRules(List)} directly.     *     * @param property the property to listen.     */    public static void register2Property(SentinelProperty<List<FlowRule>> property) {        AssertUtil.notNull(property, "property cannot be null");        synchronized (LISTENER) {            RecordLog.info("[FlowRuleManager] Registering new property to flow rule manager");            currentProperty.removeListener(LISTENER);            property.addListener(LISTENER);            currentProperty = property;       }   } ​    /**     * Get a copy of the rules.     *     * @return a new copy of the rules.     */    public static List<FlowRule> getRules() {        List<FlowRule> rules = new ArrayList<FlowRule>();        for (Map.Entry<String, List<FlowRule>> entry : flowRules.entrySet()) {            rules.addAll(entry.getValue());       }        return rules;   } ​    /**     * Load {@link FlowRule}s, former rules will be replaced.     *     * @param rules new rules to load.     */    // 可以看出FlowRuleManager.loadRules的逻辑比较简单,核心在内部类FlowPropertyListener的逻辑中。    public static void loadRules(List<FlowRule> rules) {        // currentProperty.updateValue会对当前已存在的rules和定义的rules进行比较,如果不同,则触发FlowPropertyListener的逻辑        currentProperty.updateValue(rules);   } ​    static Map<String, List<FlowRule>> getFlowRuleMap() {        return flowRules;   } ​    public static boolean hasConfig(String resource) {        return flowRules.containsKey(resource);   } ​    public static boolean isOtherOrigin(String origin, String resourceName) {        if (StringUtil.isEmpty(origin)) {            return false;       } ​        List<FlowRule> rules = flowRules.get(resourceName); ​        if (rules != null) {            for (FlowRule rule : rules) {                if (origin.equals(rule.getLimitApp())) {                    return false;               }           }       } ​        return true;   } ​    // 内部类实现监听逻辑    private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> { ​        // 根据定义重新加载FlowRules        @Override        public void configUpdate(List<FlowRule> value) {            Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);            if (rules != null) {                flowRules.clear();                flowRules.putAll(rules);           }            RecordLog.info("[FlowRuleManager] Flow rules received: {}", flowRules);       } ​        @Override        public void configLoad(List<FlowRule> conf) {            Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);            if (rules != null) {                flowRules.clear();                flowRules.putAll(rules);           }            RecordLog.info("[FlowRuleManager] Flow rules loaded: {}", flowRules);       }   } ​ }

     

    我们看看Listener怎么实现:

    package com.alibaba.csp.sentinel.property; import java.util.Collections; import java.util.HashSet; import java.util.Set; import com.alibaba.csp.sentinel.log.RecordLog; public class DynamicSentinelProperty<T> implements SentinelProperty<T> { protected Set<PropertyListener<T>> listeners = Collections.synchronizedSet(new HashSet<PropertyListener<T>>()); private T value = null; public DynamicSentinelProperty() { } public DynamicSentinelProperty(T value) { super(); this.value = value; } @Override public void addListener(PropertyListener<T> listener) { listeners.add(listener); listener.configLoad(value); } @Override public void removeListener(PropertyListener<T> listener) { listeners.remove(listener); } // 比较rules是否发生变化,如果发生变化,触发FlowPropertyListener的逻辑 @Override public boolean updateValue(T newValue) { if (isEqual(value, newValue)) { return false; } RecordLog.info("[DynamicSentinelProperty] Config will be updated to: {}", newValue); value = newValue; for (PropertyListener<T> listener : listeners) { listener.configUpdate(newValue); } return true; } private boolean isEqual(T oldValue, T newValue) { if (oldValue == null && newValue == null) { return true; } if (oldValue == null) { return false; } return oldValue.equals(newValue); } public void close() { listeners.clear(); } } package com.alibaba.csp.sentinel.property; ​ /** * 这个类负责监听SentinelProperty#updateValue(Object)调用事件 */ public interface PropertyListener<T> { ​    /**     * SentinelProperty#updateValue(Object)被调用且返回true时被触发     */    void configUpdate(T value); ​    /**     * 值第一次被加载时触发     */    void configLoad(T value); }

     

    这就是FlowRules的加载过程,其实比较简单。

     

    Pt3.4 限流规则校验Sph*#entry()

    第1步:以SphU.entry()为入口,创建对应的Resource对象。

    public class FlowQpsDemo { ......            private static void simulateTraffic() {        while (!stop) {            Entry entry = null; ​            try {                // 1.1 资源定义并执行Slot规则                entry = SphU.entry(KEY);                 // TODO           } catch (BlockException e1) { // TODO           } catch (Exception e2) { // TODO           } finally { // TODO           }   } } // 在[1.1]中触发SphU#entry执行,以下逻辑被执行 //   以SphU.entry()为入口,首先根据入参生成对应的Resource对象 public class CtSph implements Sph {   ......    @Override    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {        // 1.2 创建Resource对象        StringResourceWrapper resource = new StringResourceWrapper(name, type);        // 1.3 对资源进行统计和规则判定        return entry(resource, count, args);   }   ...... } // 在[1.2]中触发new一个Resource对象 // 其核心逻辑是在父类中初始化Resource对象 public abstract class ResourceWrapper { // 资源名称 protected final String name; // Entry类型 protected final EntryType entryType; // 资源类型 protected final int resourceType; public ResourceWrapper(String name, EntryType entryType, int resourceType) { AssertUtil.notEmpty(name, "resource name cannot be empty"); AssertUtil.notNull(entryType, "entryType cannot be null"); this.name = name; this.entryType = entryType; this.resourceType = resourceType; } ...... }  

    第2步:对参数和全局配置项做检测

    步骤[2.1]内容如下。

    public class CtSph implements Sph {   ......            // private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)        throws BlockException {        // 2.1 对参数和全局配置项做检测,如果不符合要求就直接返回了一个CtEntry对象,不会再进行后面的限流检测,否则进入下面的检测流程。        // 获取一个ThreadLocal的上下文Context        Context context = ContextUtil.getContext();        if (context instanceof NullContext) {            // 如果处理失败,直接返回CtEntry,第二个参数SlotChain为null,不会进行后续的统计和限流监测            return new CtEntry(resourceWrapper, null, context);       } ​        if (context == null) {            // Using default context.            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);       } ​        // Global switch is close, no rule checking will do.        if (!Constants.ON) {            // 如果处理失败,直接返回CtEntry,第二个参数SlotChain为null,不会进行后续的统计和限流监测            return new CtEntry(resourceWrapper, null, context);       } ​        // 2.2 获取资源的SlotChain对象        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); ​        /*         * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},         * so no rule checking will be done.         */        if (chain == null) {            return new CtEntry(resourceWrapper, null, context);       } ​        // 2.3 生成Entry对象        Entry e = new CtEntry(resourceWrapper, chain, context);        try {            // 2.4 执行Slot Chain的entry方法            chain.entry(context, resourceWrapper, null, count, prioritized, args);       } catch (BlockException e1) {            e.exit(count, args);            throw e1;       } catch (Throwable e1) {            // This should not happen, unless there are errors existing in Sentinel internal.            RecordLog.info("Sentinel unexpected exception", e1);       }        return e;   }   ...... }

     

    第3步:获取Resource对应的SlotChain

    步骤[2.2]是根据包装过的Resource对象获取对应的SlotChain,我们来看看CtSph#lookProcessChain()做了些什么。

       /**     * Get {@link ProcessorSlotChain} of the resource. new {@link ProcessorSlotChain} will     * be created if the resource doesn't relate one.     *     * <p>Same resource({@link ResourceWrapper#equals(Object)}) will share the same     * {@link ProcessorSlotChain} globally, no matter in which {@link Context}.<p/>     *     * <p>     * Note that total {@link ProcessorSlot} count must not exceed {@link Constants#MAX_SLOT_CHAIN_SIZE},     * otherwise null will return.     * </p>     *     * @param resourceWrapper target resource     * @return {@link ProcessorSlotChain} of the resource     */    // 1.获取资源对应的SlotChain,如果资源还没有关联则创建新的SlotChain。    // 2.同一个资源在不同的调用中(Context不同)使用相同的SlotChain对象。    // 3.chainMap对象不能超过限定数量,否则返回null。    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {        // HashMap#get()是根据hash(key)来定位数据,resourceWrapper#hash被重写为ResourceWrapper.name#hashCode        ProcessorSlotChain chain = chainMap.get(resourceWrapper);        if (chain == null) {            // 资源对应的SlotChain还没有被初始化,这里通过线程安全的方式进行初始化。            synchronized (LOCK) {                // double check防止线程安全性问题                chain = chainMap.get(resourceWrapper);                if (chain == null) {                    // 这里原注释是Entry size limit,应该是和资源数量相关                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {                        return null;                   }                    // 新建SlotChain并放入原来的chainMap中                    chain = SlotChainProvider.newSlotChain();                    // 这里在新增的时候是采用替换Map对象的方式,这在源码中比较常见,主要是防止直接在原Map中新增可能引发扩容导致性能问题                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(                        chainMap.size() + 1);                    newMap.putAll(chainMap);                    newMap.put(resourceWrapper, chain);                    chainMap = newMap;               }           }       }        return chain;   }

     

    所以,最关键的还是SlotChainProvider#newSlotChain()方法,这部分的逻辑,请参考SlotChain部分源码解析。

     

    第4步:创建Entry对象

    步骤[2.3]根据Resource、Context和SlotChain参数创建Entry对象。

     

    第5步:执行SlotChain的entry方法

    根据SlotChain中Slot的加载顺序,执行各Slot#entry方法逻辑:

    1.如果SlotChain的entry方法抛出了BlockException,则将该异常继续向上抛出

    2.如果SlotChain的entry方法正常执行了,则最后会将该entry对象返回

     

    关于各个Slot的逻辑,可以参照Slot部分源码解析,这里就不重复拷贝了。

     

    第6步:如果上层方法捕获了BlockException,则说明请求被限流了,否则请求能正常执行

    public class CtSph implements Sph { ​ // 执行#entry()方法    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized,                                    Object... args) throws BlockException {       .......                    try {            // 第4步:顺序执行Slot#entry()方法,具体可以看每个Slot实现类的#entry()逻辑            chain.entry(context, resourceWrapper, null, count, prioritized, args);       } catch (BlockException e1) {            e.exit(count, args);            throw e1;       } catch (Throwable e1) {            // This should not happen, unless there are errors existing in Sentinel internal.            RecordLog.info("Sentinel unexpected exception", e1);       }        return e;

     

    到这里就是Sentinel整块限流核心逻辑的执行顺序。

     

    Pt3.5 限流处置

    如果触发了Slot的限流规则,会抛出BlockException的子类,在业务代码中可以对异常进行捕获和处理。同时SlotChain会根据Exception的类型,对数据进行统计,得到限流、降级和成功的数据。

     

    主要包含两部分处理:

    自定义部分:业务端捕获BlockException异常后,判断为请求被限流,可以进行对应的业务逻辑处理,比如等待,丢弃,快速失败,或者进行自定义数据统计等等;

    Sentinel限流部分:针对抛出BlockException,SlotChain会对Exception进行处理。比如LogSlot会统计并输出异常日志。StatisticSlot会统计限流数据和正常请求数据。这部分是Sentinel SlotChain自带的处理,输出数据也是常用的维度。当然我们也可以自定义Slot来进行数据统计。

     

    Pt3.6 释放资源

    1、在限流处理结束后,需要再finnaly里显示调用Entry.exit()释放资源。

    try {    entry = SphU.entry(KEY);    // 业务处理 } catch (BlockException e1) {    // 限流处理 } catch (Exception e2) {    // biz exception } finally {    if (entry != null) {        // 这里调用显示释放资源        entry.exit();   } }

     

    Slot#exit的执行顺序和Slot#entry一样,exit主要是负责一些资源释放和后期数据统计逻辑的处理。

    这部分在Slot源码部分也做过整理,这里不再进行说明。

    Processed: 0.011, SQL: 8