天天看點

Sentinel源碼分析三、SlotChain鍊

前面提到過埋點入口,現在先從入口方法看起:

com.alibaba.csp.sentinel.SphU#entry(java.lang.String)

public static Entry entry(String name) throws BlockException {
    return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
           

com.alibaba.csp.sentinel.CtSph#entry(java.lang.String, com.alibaba.csp.sentinel.EntryType, int, java.lang.Object…)

@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
    StringResourceWrapper resource = new StringResourceWrapper(name, type);
    return entry(resource, count, args);
}
 public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        return entryWithPriority(resourceWrapper, count, false, args);
 }
           

首先根據Name構造了一個StringResourceWrapper資源對象,然後調用entryWithPriority方法

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
    throws BlockException {
    Context context = ContextUtil.getContext();
    if (context instanceof NullContext) {
        // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
        // so here init the entry only. No rule checking will be done.
        return new CtEntry(resourceWrapper, null, context);
    }

    if (context == null) {
        // Using default context.
        context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
    }

    // Global switch is close, no rule checking will do.
    if (!Constants.ON) {
        return new CtEntry(resourceWrapper, null, context);
    }

    // 擷取該資源對應的SlotChain
    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

    /*
     * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
     * so no rule checking will be done.
     */
    if (chain == null) {
        return new CtEntry(resourceWrapper, null, context);
    }

    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        // 執行Slot的entry方法
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
        // 抛出BlockExecption
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
        // This should not happen, unless there are errors existing in Sentinel internal.
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}
           

前半部分對Context的驗證操作,然後構造SlotChain鍊,最後通過chain.entry開始鍊調用。當抛出BlockException異常時,執行exit()。

本篇重點關注構造SlotChain鍊和SlotChain鍊的執行過程

ProcessorSlot chain = lookProcessChain(resourceWrapper);

chain.entry(context, resourceWrapper, null, count, prioritized, args);

構造SlotChain鍊

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry size limit.
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }

                // 具體構造chain的方法
                chain = SlotChainProvider.newSlotChain();
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                    chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}
           

首先是一個Map緩存,先從緩存中擷取,如果緩存不存在則建立。 這裡的key 是ResourceWrapper類型,看一下對應的equals和hashCode方法

@Override
public int hashCode() {
    return getName().hashCode();
}

/**
 * Only {@link #getName()} is considered.
 */
@Override
public boolean equals(Object obj) {
    if (obj instanceof ResourceWrapper) {
        ResourceWrapper rw = (ResourceWrapper)obj;
        return rw.getName().equals(getName());
    }
    return false;
}
           

即name相同則Resource相同。 也就是說一個Resource對應一個SlotChain

回來繼續看SlotChain的構造

SlotChainProvider.newSlotChain();
public static ProcessorSlotChain newSlotChain() {
    if (slotChainBuilder != null) {
        return slotChainBuilder.build();
    }

    // Resolve the slot chain builder SPI.
    // SPI擷取構造器
    slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);

    if (slotChainBuilder == null) {
        // Should not go through here.
        RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
        slotChainBuilder = new DefaultSlotChainBuilder();
    } else {
        RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
            + slotChainBuilder.getClass().getCanonicalName());
    }
    return slotChainBuilder.build();
}
           

建造者模式。構造器SPI接口SlotChainBuilder,預設使用DefaultSlotChainBuilder。

下面是DefaultSlotChainBuilder的build方法:

@Override
public ProcessorSlotChain build() {
    // 先建立一個default作為header
    // chain是一個單向連結清單 first-> next -> end
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();

    // Note: the instances of ProcessorSlot should be different, since they are not stateless.
    // SPI找到所有slot的實作,并根據@SpiOrder注解排序
    List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
    for (ProcessorSlot slot : sortedSlotList) {
        // 剔除掉Abstract類
        if (!(slot instanceof AbstractLinkedProcessorSlot)) {
            RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
            continue;
        }


        chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
    }

    return chain;
}
           

首先初始化一個DefaultProcessorSlotChain, DefaultProcessorSlotChain裡面維護了一個Slot的單向連結清單。

然後從SPI接口ProcessorSlot 中得到所有的實作,這裡是排好序的,排序規則是按照@SpiOrder注解指定的順序。 addLast方法往連結清單的最後追加Slot對象。

@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
    end.setNext(protocolProcessor);
    end = protocolProcessor;
}
 public void setNext(AbstractLinkedProcessorSlot<?> next) {
        this.next = next;
 }
           

至此SlotChain構造完成。

Sentinel源碼分析三、SlotChain鍊

執行SlotChain鍊

SlotChain鍊執行的入口是 chain.entry(context, resourceWrapper, null, count, prioritized, args);這行

com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain#entry

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
    throws Throwable {
    first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
           

就是執行first的transformEntry

void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
    throws Throwable {
    T t = (T)o;
    entry(context, resourceWrapper, t, count, prioritized, args);
}
           

first的entry方法

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
    throws Throwable {
    super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
           

父類的fireEntry

@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
    throws Throwable {
    if (next != null) {
        next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
    }
}
           

可以看到,實際上就是執行了next的transformEntry,而next的transformEntry又會調用它的entry方法,entry方法由每個Slot自己實作,隻要在裡面調用fireEntry,即觸發next的next的entry調用。

這樣繼續下去,直到next為空

簡單畫個圖描述一下這個過程:

Sentinel源碼分析三、SlotChain鍊

exit()方法和entry方法類似。這裡就不分析了

至此SlotChain調用鍊分析完了。總結一下

  • Chain中維護了一個單向連結清單
  • 通過fileEntry觸發next的entry調用。
  • 責任鍊模式