上文《Hystrix浅入浅出:(一)背景与功能初探》已经提到过,使用Hystrix我们需要将自动熔断的业务逻辑通过Command模式来包装,于是,我们只需要继承HystrixCommand抽象类,实现run、getFallback等方法,你就拥有了一个具有基本熔断功能的类。从使用来看,所有的核心逻辑都由AbstractCommand(即HystrixCommand的父类,HystrixCommand只是对AbstractCommand进行了简单包装)抽象类串起来,从功能上来说,AbstractCommand必须将如下功能联系起来:
策略配置:Hystrix有两种降级模型,即信号量(同步)模型和线程池(异步)模型,这两种模型所有可定制的部分都体现在了HystrixCommandProperties和HystrixThreadPoolProperties两个类中。然而还是那句老话,Hystrix只提供了配置修改的入口,没有将配置界面化,如果想在页面上动态调整配置,还需要自己实现。
数据统计:Hystrix以命令模式的方式来控制业务逻辑以及熔断逻辑的调用时机,所以说数据统计对它来说不算难事,但如何高效、精准的在内存中统计数据,还需要一定的技巧。
断路器:断路器可以说是Hystrix内部最重要的状态机,是它决定着每个Command的执行过程。
监控露出:能通过某种可配置方式将统计数据展现在仪表盘上。
一. Hystrix内部流程
本文将主要阐述【断路器】和【数据统计】两大组件的设计和实现。在介绍两大组件之前,我们先简单了解下Hystrix工作时的内部流程,官方的图有些复杂(https://github.com/Netflix/Hystrix/wiki/How-it-Works),过于细节,这里画个简单的(只显示了关键环节):
上图简单罗列的一个请求(即我们包装的Command)在Hystrix内部被执行的关键过程。
【创建Command对象】这一过程也包含了策略、资源的初始化,参看AbstractCommand的构造函数:
protected AbstractCommand(...) {
// 初始化group,group主要是用来对不同的command key进行统一管理,比如统一监控、告警等
this.commandGroup = initGroupKey(...);
// 初始化command key,用来标识降级逻辑,可以理解成command的id
this.commandKey = initCommandKey(...);
// 初始化自定义的降级策略
this.properties = initCommandProperties(...);
// 初始化线程池key,相同的线程池key将公用线程池
this.threadPoolKey = initThreadPoolKey(...);
// 初始化监控器
this.metrics = initMetrics(...);
// 初始化断路器
this.circuitBreaker = initCircuitBreaker(...);
// 初始化线程池
this.threadPool = initThreadPool(...);
// Hystrix通过SPI实现了插件机制,允许用户对事件通知、处理和策略进行自定义
this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
this.executionHook = initExecutionHook(executionHook);
this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
/* fallback semaphore override if applicable */
this.fallbackSemaphoreOverride = fallbackSemaphore;
/* execution semaphore override if applicable */
this.executionSemaphoreOverride = executionSemaphore;
}
上一篇《Hystrix浅入浅出:(一)背景与功能初探》说过,因为Command对象是有状态的(比如每次请求参数可能不同),所以每次请求都需要新创建Command,这么多初始化工作,如果并发量过高,会不会带来过大的系统开销?其实构造函数中的很多初始化工作只会集中在创建第一个Command时来做,后续创建的Command对象主要是从静态Map中取对应的实例来赋值,比如监控器、断路器和线程池的初始化,因为相同的Command的command key和线程池key都是一致的,在HystrixCommandMetrics、HystrixCircuitBreaker.Factory、HystrixThreadPool中会分别有如下静态属性:
private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<String, HystrixCommandMetrics>();
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
可见所有Command对象都可以在这里找到自己对应的资源实例。
二. Hystrix的断路器设计
断路器是Hystrix最核心的状态机,只有了解它的变更条件,我们才能准确掌握Hystrix的内部行为。上面的内部流程图中【断路器状态判断】这个环节直接决定着这次请求(或者说这个Command对象)是尝试去执行正常业务逻辑(即run())还是走降级后的逻辑(即getFallback()),断路器HystrixCircuitBreaker有三个状态,
CLOSED关闭状态:允许流量通过。
OPEN打开状态:不允许流量通过,即处于降级状态,走降级逻辑。
HALF_OPEN半开状态:允许某些流量通过,并关注这些流量的结果,如果出现超时、异常等情况,将进入OPEN状态,如果成功,那么将进入CLOSED状态。
为了能做到状态能按照指定的顺序来流转,并且是线程安全的,断路器的实现类HystrixCircuitBreakerImpl使用了AtomicReference:
enum Status {
CLOSED, OPEN, HALF_OPEN;
}
// 断路器初始状态肯定是关闭状态
private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
断路器在状态变化时,使用了AtomicReference#compareAndSet来确保当条件满足时,只有一笔请求能成功改变状态。各状态流转顺序如下:
那么,什么条件下断路器会改变状态?
1. CLOSED -> OPEN :
时间窗口内(默认10秒)请求量大于请求量阈值(即circuitBreakerRequestVolumeThreshold,默认值是20),并且该时间窗口内错误率大于错误率阈值(即circuitBreakerErrorThresholdPercentage,默认值为50,表示50%),那么断路器的状态将由默认的CLOSED状态变为OPEN状态。看代码可能更直接:
// 检查是否超过了我们设置的断路器请求量阈值
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// 如果没有超过统计窗口的请求量阈值,则不改变断路器状态,
// 如果它是CLOSED状态,那么仍然是CLOSED.
// 如果它是HALF-OPEN状态,我们需要等待请求被成功执行,
// 如果它是OPEN状态, 我们需要等待睡眠窗口过去。
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
//如果没有超过统计窗口的错误率阈值,则不改变断路器状态,,
// 如果它是CLOSED状态,那么仍然是CLOSED.
// 如果它是HALF-OPEN状态,我们需要等待请求被成功执行,
// 如果它是OPEN状态, 我们需要等待【睡眠窗口】过去。
} else {
// 如果错误率太高,那么将变为OPEN状态
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
// 因为断路器处于打开状态会有一个时间范围,所以这里记录了变成OPEN的时间
circuitOpened.set(System.currentTimeMillis());
}
}
}
这里的错误率是个整数,即errorPercentage= (int) ((double) errorCount / totalCount * 100);,至于睡眠窗口,下面会提到。
2. OPEN ->HALF_OPEN:
前面说过,当进入OPEN状态后,会进入一段睡眠窗口,即只会OPEN一段时间,所以这个睡眠窗口过去,就会“自动”从OPEN状态变成HALF_OPEN状态,这种设计是为了能做到弹性恢复,这种状态的变更,并不是由调度线程来做,而是由请求来触发,每次请求都会进行如下检查:
@Override
public boolean attemptExecution() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
// circuitOpened值等于1说明断路器状态为CLOSED
if (circuitOpened.get() == -1) {
return true;
} else {
if (isAfterSleepWindow()) {
// 睡眠窗口过去后只有第一个请求能被执行
// 如果执行成功,那么状态将会变成CLOSED
// 如果执行失败,状态仍变成OPEN
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
return true;
} else {
return false;
}
} else {
return false;
}
}
}
// 睡眠窗口是否过去
private boolean isAfterSleepWindow() {
// 还记得上面CLOSED->OPEN时记录的时间吗?
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
return currentTime > circuitOpenTime + sleepWindowTime;
}
3. HALF_OPEN ->CLOSED :
变为半开状态后,会放第一笔请求去执行,并跟踪它的执行结果,如果是成功,那么将由HALF_OPEN状态变成CLOSED状态:
@Override
public void markSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
//This thread wins the race to close the circuit - it resets the stream to start it over from 0
metrics.resetStream();
Subscription previousSubscription = activeSubscription.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
Subscription newSubscription = subscribeToStream();
activeSubscription.set(newSubscription);
// 已经进入了CLOSED阶段,所以将OPEN的修改时间设置成-1
circuitOpened.set(-1L);
}
}
4. HALF_OPEN ->OPEN :
变为半开状态时,如果第一笔被放去执行的请求执行失败(资源获取失败、异常、超时等),就会由HALP_OPEN状态再变为OPEN状态:
@Override
public void markNonSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
// This thread wins the race to re-open the circuit - it resets the start time for the sleep window
circuitOpened.set(System.currentTimeMillis());
}
}
三. 滑动窗口(滚动窗口)
上面提到的断路器需要的时间窗口请求量和错误率这两个统计数据,都是指固定时间长度内的统计数据,断路器的目标,就是根据这些统计数据来预判并决定系统下一步的行为,Hystrix通过滑动窗口来对数据进行“平滑”统计,默认情况下,一个滑动窗口包含10个桶(Bucket),每个桶时间宽度是1秒,负责1秒的数据统计。滑动窗口包含的总时间以及其中的桶数量都是可以配置的,来张官方的截图认识下滑动窗口:
上图的每个小矩形代表一个桶,可以看到,每个桶都记录着1秒内的四个指标数据:成功量、失败量、超时量和拒绝量,这里的拒绝量指的就是上面流程图中【信号量/线程池资源检查】中被拒绝的流量。10个桶合起来是一个完整的滑动窗口,所以计算一个滑动窗口的总数据需要将10个桶的数据加起来。
我们现在来具体看看滑动窗口和桶的设计,如果将滑动窗口设计成对一个长度为10的整形数组的操作,第一个想到的应该是AtomicLongArray,AtomicLongArray中每个位置的数据都能线程安全的操作,提供了譬如incrementAndGet、getAndSet、compareAndSet等常用方法。但由于一个桶需要维护四个指标,如果用四个AtomicLongArray来实现,做法不够高级,于是我们想到了AtomicReferenceArray<Bucket>,Bucket对象内部可以用AtomicLong来维护着这四个指标。滑动窗口和桶的设计特别讲究技巧,需要尽可能做到性能、数据准确性两方面的极致,我们来看Hystrix是如何做到的。
桶的数据统计简单来说可以分为两类,一类是简单自增计数器,比如请求量、错误量等,另一类是并发最大值,比如一段时间内的最大并发量(或者说线程池的最大任务数),下面是桶类Bucket的定义:
class Bucket {
// 标识是哪一秒的桶数据
final long windowStart;
// 如果是简单自增统计数据,那么将使用adderForCounterType
final LongAdder[] adderForCounterType;
// 如果是最大并发类的统计数据,那么将使用updaterForCounterType
final LongMaxUpdater[] updaterForCounterType;
Bucket(long startTime) {
this.windowStart = startTime;
// 预分配内存,提高效率,不同事件对应不同的数组index
adderForCounterType = new LongAdder[HystrixRollingNumberEvent.values().length];
for (HystrixRollingNumberEvent type : HystrixRollingNumberEvent.values()) {
if (type.isCounter()) {
adderForCounterType[type.ordinal()] = new LongAdder();
}
}
// 预分配内存,提高效率,不同事件对应不同的数组index
updaterForCounterType = new LongMaxUpdater[HystrixRollingNumberEvent.values().length];
for (HystrixRollingNumberEvent type : HystrixRollingNumberEvent.values()) {
if (type.isMaxUpdater()) {
updaterForCounterType[type.ordinal()] = new LongMaxUpdater();
// initialize to 0 otherwise it is Long.MIN_VALUE
updaterForCounterType[type.ordinal()].update(0);
}
}
}
...略...
}
我们可以看到,并没有用所谓的AtomicLong,为了方便的管理各种事件(参见com.netflix.hystrix.HystrixEventType)的数据统计,Hystrix对不同的事件使用不同的数组index(即枚举的顺序),这样对于某个桶(即某一秒)的指定类型的数据,总能从数组中找到对应的LongAdder(用于统计前面说的简单自增)或LongMaxUpdater(用于统计前面说的最大并发值)对象来进行自增或更新操作。对于性能有要求的中间件或库类都避不开要CPUCache优化的问题,比如cache line,以及cache line带来的false sharing问题。Bucket的内部并没有使用AtomicLong,而是使用了JDK8新提供的LongAdder,在高并发的单调自增场景,LongAdder提供了比AtomicLong更好的性能,至于LongAdder的设计思想,本文不展开,感兴趣的朋友可以去拜读Doug Lea大神的代码(有意思的是Hystrix没有直接使用JDK中的LongAdder,而是copy过来改了改)。LongMaxUpdater也是类似的,它和LongAddr一样都派生于Striped64,这里不再展开。
滑动窗口由多个桶组成,业界一般的做法是将数组做成环,Hystrix中也类似,多个桶是放在AtomicReferenceArray<Bucket>来维护的,为了将其做成环,需要保存头尾的引用,于是有了ListState类:
class ListState {
/*
* 这里的data之所以用AtomicReferenceArray而不是普通数组,是因为data需要
* 在不同的ListState对象中跨线程来引用,需要可见性和并发性的保证。
*/
private final AtomicReferenceArray<Bucket> data;
private final int size;
private final int tail;
private final int head;
private ListState(AtomicReferenceArray<Bucket> data, int head, int tail) {
this.head = head;
this.tail = tail;
if (head == 0 && tail == 0) {
size = 0;
} else {
this.size = (tail + dataLength - head) % dataLength;
}
this.data = data;
}
...略...
}
我们可以发现,真正的数据是data,而ListState只是一个时间段的数据快照而已,所以tail和head都是final,这样做的好处是我们不需要去为head、tail的原子操作而苦恼,转而变成对ListState的持有操作,所以滑动窗口看起来如下:
我们可以看到,由于默认一个滑动窗口包含10个桶,所以AtomicReferenceArray<Bucket>的size得达到10+1=11才能“滑动/滚动”起来,在确定的某一秒内,只有一个桶被更新,其他的桶数据都没有变化。既然通过ListState可以拿到所有的数据,那么我们只需要持有最新的ListState对象即可,为了能做到可见性和原子操作,于是有了环形桶类BucketCircularArray:
class BucketCircularArray implements Iterable<Bucket> {
// 持有最新的ListState
private final AtomicReference<ListState> state;
...略...
}
我们注意到BucketCircularArray实现了迭代器接口,这是因为我们输出给断路器的数据需要计算滑动窗口中的所有桶,于是你可以看到真正的滑动窗口类HystrixRollingNumber有如下属性和方法:
public class HystrixRollingNumber {
// 环形桶数组
final BucketCircularArray buckets;
// 获取该事件类型当前滑动窗口的统计值
public long getRollingSum(HystrixRollingNumberEvent type) {
Bucket lastBucket = getCurrentBucket();
if (lastBucket == null)
return 0;
long sum = 0;
// BucketCircularArray实现了迭代器接口环形桶数组
for (Bucket b : buckets) {
sum += b.getAdder(type).sum();
}
return sum;
}
...略...
}
断路器就是通过监控来从HystrixRollingNumber的getRollingSum方法来获取统计值的。
到这里断路器和滑动窗口的核心部分已经分析完了,当然里面还有不少细节没有提到,感兴趣的朋友可以去看一下源码。Hystrix中通过RxJava来实现了事件的发布和订阅,所以如果想深入了解Hystrix,需要熟悉RxJava,而RxJava在服务端的应用没有像客户端那么广,一个原因是场景的限制,还一个原因是大多数开发者认为RxJava设计的过于复杂,加上响应式编程模型,有一定的入门门槛。