天天看点

Sentinel扩展-流控告警案例

Sentinel扩展-流控告警案例

在实际开发过程中 ,如果生产环境使用了Sentinel,那就需要对接口的流控参数不断优化,在兼容系统性能的同时,增加接口的流量处理能力。本文主要对Sentinel的源码做出扩展,在接口触发流控时,通过钉钉告警的方式通知相应人员,以便于及时调整接口参数。

原理:

在Sentinel源码中主要使用了责任链设计模式,责任链中最重要的统计流量的功能便是在

StatisticSlot

中实现的,看下它的源码:

@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            //关注点1: 执行后续校验逻辑,如果触发流控熔断则会抛出异常
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            //关注点2: 对通过的QPS及线程数进行统计
            node.increaseThreadNum();
            node.addPassRequest(count);

            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // 扩展点1:成功回调
            for (ProcessorSlotEntryCallback<DefaultNode> handler : 
                 StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : 
                 StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            //关注点3:触发流控
            // Blocked, set block exception to current entry.
            context.getCurEntry().setBlockError(e);

            //统计流控指标
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // 扩展点2:流控回调,所以如果增加流控告警功能,需要对此处的handler进行扩展
            for (ProcessorSlotEntryCallback<DefaultNode> handler : 
                 StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);

            throw e;
        }
    }
}    
           

关注点1: 执行后续校验逻辑,如果触发流控熔断则会抛出异常

关注点2: 对通过的QPS及线程数进行统计

关注点3:触发流控会抛出BlockException异常

扩展点1:成功回调

handler.onPass(context, resourceWrapper, node, count, args)

扩展点2:流控回调

handler.onBlocked(e, context, resourceWrapper, node, count, args)

所以如果增加流控告警功能,需要对此处的handler进行扩展,因此需要实现ProcessorSlotEntryCallback接口

public interface ProcessorSlotEntryCallback<T> {

    void onPass(Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args) throws Exception;

    void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args);
}
           

那ProcessorSlotEntryCallback 接口是如果初始化的呢?我们可以借助Sentinel体系中的SPI机制来初始化

在Sentinel源码中有一个初始化接口

InitFunc

,在第一次调用的时候会调用所有InitFunc实现类的 init 方法

public interface InitFunc {

    void init() throws Exception;
}

           
public final class InitExecutor {

    private static AtomicBoolean initialized = new AtomicBoolean(false);

    /**
     * If one {@link InitFunc} throws an exception, the init process
     * will immediately be interrupted and the application will exit.
     *
     * The initialization will be executed only once.
     */
    public static void doInit() {
        if (!initialized.compareAndSet(false, true)) {
            return;
        }
        try {
            //关注点1: SPI机制,会加载META-INF/services 下配置的InitFunc实现类
            List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
            List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
            for (InitFunc initFunc : initFuncs) {
                RecordLog.info("[InitExecutor] Found init func: {}", 
                               initFunc.getClass().getCanonicalName());
                insertSorted(initList, initFunc);
            }
            for (OrderWrapper w : initList) {
                //关注点2 : 执行init方法
                w.func.init();
                RecordLog.info("[InitExecutor] Executing {} with order {}",
                    w.func.getClass().getCanonicalName(), w.order);
            }
        } catch (Exception ex) {
            RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
            ex.printStackTrace();
        } catch (Error error) {
            RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
            error.printStackTrace();
        }
    }
}    
           

关注点1: SPI机制,会加载META-INF/services 下配置的InitFunc实现类

Sentinel扩展-流控告警案例

关注点2 : 执行init方法

所以可以在init方法中,完成ProcessorSlotEntryCallback接口实现类的初始化

案例:

在正式开始流控告警的代码前,还需要写一个钉钉推送的功能,这里给大家提供了一个工具类:

@Slf4j
@Component
public class RobotPushUtils {

    private static TemplateEngine templateEngine;

    @Resource
    TemplateEngine thymeleaf;

    @PostConstruct
    public void init() {
        templateEngine = thymeleaf;
    }
    
    /**
     * 钉钉推送
     *
     * @param req 请求
     */
    public static void push(RobotPushReq req) {
        try {
            RobotPushReq.MsgType.of(req.getMsgType());

            if (Objects.isNull(req.getTemplatePath())) {
                throw new Exception("内容为空");
            }

            //发送消息
            DingTalkClient client = new DefaultDingTalkClient(req.getDingRobertUrl());
            OapiRobotSendRequest sendReq = initSendReq(req);
            OapiRobotSendResponse response = client.execute(sendReq);
            String body = response.getBody();

            log.info("钉钉推送成功,req -> {}, res -> {}", JSON.toJSONString(sendReq), body);
        } catch (Exception e) {
            log.error("钉钉推送异常,error -> {}", e.getMessage(), e);
        }
    }

    /**
     * 封装消息内容
     *
     * @param req 请求
     * @return 消息内容
     */
    private static OapiRobotSendRequest initSendReq(RobotPushReq req) throws Exception {
        Context context = new Context();
        if (Objects.nonNull(req.getTemplateData())) {
            req.getTemplateData().entrySet().forEach(entry ->
                    context.setVariable(entry.getKey(), entry.getValue()));
        }
        String content = templateEngine.process(req.getTemplatePath(), context);
        if (Objects.isNull(content)) {
            throw new Exception("内容为空");
        }

        // @人
        OapiRobotSendRequest.At at = new OapiRobotSendRequest.At();
        at.setIsAtAll(req.isAtAll());
        at.setAtMobiles(req.getAtMobiles());

        // 封装推送请求
        OapiRobotSendRequest request =new OapiRobotSendRequest();
        request.setMsgtype(req.getMsgType());
        request.setAt(at);

        if (RobotPushReq.MsgType.TEXT.code.equals(req.getMsgType())) {
            // text格式
            OapiRobotSendRequest.Text text = new OapiRobotSendRequest.Text();
            text.setContent(content);
            request.setText(text);
        } else if (RobotPushReq.MsgType.MARKDOWN.code.equals(req.getMsgType())) {
            // markdown格式
            OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown();
            markdown.setTitle(req.getTitle());
            markdown.setText(content);
            request.setMarkdown(markdown);
        } else {
            throw new Exception("不支持的消息格式");
        }
        return request;
    }

}

           

此工具类是以thymeleaf的方式推送消息,所以需要自己写推送模板,还有就是需要引入依赖:

<dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>alibaba-dingtalk-service-sdk</artifactId>
            <version>1.0.1</version>
        </dependency>
           

进入本文重点,根据上面所说需要一个ProcessorSlotEntryCallback实现类,触发流控时回调onBlocked方法

@Slf4j
@Component
public class WarnCallback implements ProcessorSlotEntryCallback<DefaultNode>, ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode param, int count, Object... args) throws Exception {
        //暂不处理
    }

    @Override
    public void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, DefaultNode param, int count, Object... args) {

        DingdingRobotUtil dingdingRobotUtil = applicationContext.getBean(DingdingRobotUtil.class);

        String resourceName = resourceWrapper.getName();
        String msg = StringUtils.EMPTY;

        if (ex instanceof FlowException) {
            msg = "该资源流控指标已达阈值,接口触发限流";
        } else if (ex instanceof DegradeException) {
            msg = "该资源熔断指标已达阈值,接口触发熔断";
        } else if (ex instanceof AuthorityException) {
            msg = "该资源权限校验未通过,接口触发限流";
        } else if (ex instanceof SystemBlockException) {
            msg = "系统流控指标已达阈值,接口触发限流";
        } else if (ex instanceof ParamFlowException) {
            msg = "该资源热点参数流控指标已达阈值,接口触发限流";
        }

        if (StringUtils.isNotEmpty(msg) && StringUtils.isNotEmpty(resourceName)) {
            dingdingRobotUtil.setLimitMsg(resourceName, msg);
        }
    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        WarnCallback.setContext(applicationContext);
    }

    public static void setContext(ApplicationContext applicationContext) {
        WarnCallback.applicationContext = applicationContext;
    }
}
           

提供一个初始化类,初始化WarnCallback

public class WarnCallbackInit implements InitFunc {

    @Override
    public void init() throws Exception {
        StatisticSlotCallbackRegistry.addEntryCallback(WarnCallback.class.getCanonicalName(),
                new WarnCallback());
    }

}
           

在META-INF/services中配置初始化类

Sentinel扩展-流控告警案例
Sentinel扩展-流控告警案例

以上就是此次案例中的全部代码,当触发流控的时候会在钉钉群里推送告警消息,另外钉钉群还需要配置机器人,大家自行百度一下

继续阅读