Sentinel扩展-流控告警案例
在实际开发过程中 ,如果生产环境使用了Sentinel,那就需要对接口的流控参数不断优化,在兼容系统性能的同时,增加接口的流量处理能力。本文主要对Sentinel的源码做出扩展,在接口触发流控时,通过钉钉告警的方式通知相应人员,以便于及时调整接口参数。
原理:
在Sentinel源码中主要使用了责任链设计模式,责任链中最重要的统计流量的功能便是在
StatisticSlot
中实现的,看下它的源码:
@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
//关注点1: 执行后续校验逻辑,如果触发流控熔断则会抛出异常
fireEntry(context, resourceWrapper, node, count, prioritized, args);
//关注点2: 对通过的QPS及线程数进行统计
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// 扩展点1:成功回调
for (ProcessorSlotEntryCallback<DefaultNode> handler :
StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler :
StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
//关注点3:触发流控
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
//统计流控指标
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// 扩展点2:流控回调,所以如果增加流控告警功能,需要对此处的handler进行扩展
for (ProcessorSlotEntryCallback<DefaultNode> handler :
StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
}
关注点1: 执行后续校验逻辑,如果触发流控熔断则会抛出异常
关注点2: 对通过的QPS及线程数进行统计
关注点3:触发流控会抛出BlockException异常
扩展点1:成功回调
handler.onPass(context, resourceWrapper, node, count, args)
扩展点2:流控回调
handler.onBlocked(e, context, resourceWrapper, node, count, args)
所以如果增加流控告警功能,需要对此处的handler进行扩展,因此需要实现ProcessorSlotEntryCallback接口
public interface ProcessorSlotEntryCallback<T> {
void onPass(Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args) throws Exception;
void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args);
}
那ProcessorSlotEntryCallback 接口是如果初始化的呢?我们可以借助Sentinel体系中的SPI机制来初始化
在Sentinel源码中有一个初始化接口
InitFunc
,在第一次调用的时候会调用所有InitFunc实现类的 init 方法
public interface InitFunc {
void init() throws Exception;
}
public final class InitExecutor {
private static AtomicBoolean initialized = new AtomicBoolean(false);
/**
* If one {@link InitFunc} throws an exception, the init process
* will immediately be interrupted and the application will exit.
*
* The initialization will be executed only once.
*/
public static void doInit() {
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
//关注点1: SPI机制,会加载META-INF/services 下配置的InitFunc实现类
List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : initFuncs) {
RecordLog.info("[InitExecutor] Found init func: {}",
initFunc.getClass().getCanonicalName());
insertSorted(initList, initFunc);
}
for (OrderWrapper w : initList) {
//关注点2 : 执行init方法
w.func.init();
RecordLog.info("[InitExecutor] Executing {} with order {}",
w.func.getClass().getCanonicalName(), w.order);
}
} catch (Exception ex) {
RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
ex.printStackTrace();
} catch (Error error) {
RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
error.printStackTrace();
}
}
}
关注点1: SPI机制,会加载META-INF/services 下配置的InitFunc实现类
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLyYWM3YGZ5gjMykzY3YmNhRzMyQjMmJGZ3UWZjZmZzMzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
关注点2 : 执行init方法
所以可以在init方法中,完成ProcessorSlotEntryCallback接口实现类的初始化
案例:
在正式开始流控告警的代码前,还需要写一个钉钉推送的功能,这里给大家提供了一个工具类:
@Slf4j
@Component
public class RobotPushUtils {
private static TemplateEngine templateEngine;
@Resource
TemplateEngine thymeleaf;
@PostConstruct
public void init() {
templateEngine = thymeleaf;
}
/**
* 钉钉推送
*
* @param req 请求
*/
public static void push(RobotPushReq req) {
try {
RobotPushReq.MsgType.of(req.getMsgType());
if (Objects.isNull(req.getTemplatePath())) {
throw new Exception("内容为空");
}
//发送消息
DingTalkClient client = new DefaultDingTalkClient(req.getDingRobertUrl());
OapiRobotSendRequest sendReq = initSendReq(req);
OapiRobotSendResponse response = client.execute(sendReq);
String body = response.getBody();
log.info("钉钉推送成功,req -> {}, res -> {}", JSON.toJSONString(sendReq), body);
} catch (Exception e) {
log.error("钉钉推送异常,error -> {}", e.getMessage(), e);
}
}
/**
* 封装消息内容
*
* @param req 请求
* @return 消息内容
*/
private static OapiRobotSendRequest initSendReq(RobotPushReq req) throws Exception {
Context context = new Context();
if (Objects.nonNull(req.getTemplateData())) {
req.getTemplateData().entrySet().forEach(entry ->
context.setVariable(entry.getKey(), entry.getValue()));
}
String content = templateEngine.process(req.getTemplatePath(), context);
if (Objects.isNull(content)) {
throw new Exception("内容为空");
}
// @人
OapiRobotSendRequest.At at = new OapiRobotSendRequest.At();
at.setIsAtAll(req.isAtAll());
at.setAtMobiles(req.getAtMobiles());
// 封装推送请求
OapiRobotSendRequest request =new OapiRobotSendRequest();
request.setMsgtype(req.getMsgType());
request.setAt(at);
if (RobotPushReq.MsgType.TEXT.code.equals(req.getMsgType())) {
// text格式
OapiRobotSendRequest.Text text = new OapiRobotSendRequest.Text();
text.setContent(content);
request.setText(text);
} else if (RobotPushReq.MsgType.MARKDOWN.code.equals(req.getMsgType())) {
// markdown格式
OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown();
markdown.setTitle(req.getTitle());
markdown.setText(content);
request.setMarkdown(markdown);
} else {
throw new Exception("不支持的消息格式");
}
return request;
}
}
此工具类是以thymeleaf的方式推送消息,所以需要自己写推送模板,还有就是需要引入依赖:
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>alibaba-dingtalk-service-sdk</artifactId>
<version>1.0.1</version>
</dependency>
进入本文重点,根据上面所说需要一个ProcessorSlotEntryCallback实现类,触发流控时回调onBlocked方法
@Slf4j
@Component
public class WarnCallback implements ProcessorSlotEntryCallback<DefaultNode>, ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode param, int count, Object... args) throws Exception {
//暂不处理
}
@Override
public void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, DefaultNode param, int count, Object... args) {
DingdingRobotUtil dingdingRobotUtil = applicationContext.getBean(DingdingRobotUtil.class);
String resourceName = resourceWrapper.getName();
String msg = StringUtils.EMPTY;
if (ex instanceof FlowException) {
msg = "该资源流控指标已达阈值,接口触发限流";
} else if (ex instanceof DegradeException) {
msg = "该资源熔断指标已达阈值,接口触发熔断";
} else if (ex instanceof AuthorityException) {
msg = "该资源权限校验未通过,接口触发限流";
} else if (ex instanceof SystemBlockException) {
msg = "系统流控指标已达阈值,接口触发限流";
} else if (ex instanceof ParamFlowException) {
msg = "该资源热点参数流控指标已达阈值,接口触发限流";
}
if (StringUtils.isNotEmpty(msg) && StringUtils.isNotEmpty(resourceName)) {
dingdingRobotUtil.setLimitMsg(resourceName, msg);
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
WarnCallback.setContext(applicationContext);
}
public static void setContext(ApplicationContext applicationContext) {
WarnCallback.applicationContext = applicationContext;
}
}
提供一个初始化类,初始化WarnCallback
public class WarnCallbackInit implements InitFunc {
@Override
public void init() throws Exception {
StatisticSlotCallbackRegistry.addEntryCallback(WarnCallback.class.getCanonicalName(),
new WarnCallback());
}
}
在META-INF/services中配置初始化类
以上就是此次案例中的全部代码,当触发流控的时候会在钉钉群里推送告警消息,另外钉钉群还需要配置机器人,大家自行百度一下