laitimes

Hystrix 源码分析及实践

author:Flash Gene

1 Background

Hystrix 源码分析及实践

Just as there are always two sides to a coin, distributed systems allow us to no longer be limited by the performance of a single machine and a single point of failure, while also increasing the complexity of the system and the probability of failure. A machine may be able to run for up to a year without failure, but for a large cluster, there are many times when there are machine failures. The probability of problems in the network or the application itself is often greater than that of the machine itself.

In addition, in a typical microservice call chain, if a downstream failure causes the failure to respond to the upstream request, the specific request of the upstream service will be suspended until it times out, and when there are many requests, it will even exhaust the resources of the upper-layer service that was originally not faulty, so that the upper-layer service will also fail, layer by layer, and finally cause the entire system to collapse.

As a concrete example, if an application relies on 30 microservices, let's assume that the services it depends on have decent availability: 99.99%. If we don't do any elastic design for system calls, then the availability of this application is only 99.7%, and the service will be unavailable for two hours every month.

Therefore, in order to improve the availability of its own services and prevent cascading failures, it is necessary to consider the failure of downstream services when calling other services through the network, and make corresponding resiliency design.

Elasticity can also be thought of as fault tolerance, i.e., the ability of the system to tolerate errors. In this regard, there are some classic design patterns, including:

  • Bulkhead (Isolation). The word Bulkhead is derived from the bulkhead in a boat, which is used to separate the waterproof sealed cabin. In this way, when the ship hits the reef and enters the water, it will not cause the entire ship to sink directly. In software engineering, it is used to isolate important resources based on purpose, user, etc., to prevent the spread of errors in the system.
  • Circuit Breaker. Similar to a fuse in a circuit, a circuit breaker detects a failure downstream by packaging a downstream call. If the failure rate reaches a specified threshold, the circuit breaker is turned on, and calls for a period of time will directly return an error. After a period of time, if it finds that the downstream service has returned to normal, it resumes normal calls to the downstream service. It follows the principle of Fail Fast, so that calls from upstream do not have to wait for a reply from a downstream service that is not working, and it also gives breathing room for a struggling downstream service.
  • Retry. For some occasional short-lived errors, retry may be a better option, which can also improve the availability of the top-level system to a certain extent. It is important to note that not all errors and all interfaces are suitable for retry, and the number and interval of retries need to be considered.
  • Degradtion. When an error occurs in the downstream and the service cannot be provided, it is downgraded to a response that does not rely on the faulty service, such as returning the default value and cached value, to ensure the overall availability of the service.

The Hystrix we're going to look at in this article is a library that Netflix has designed to be resilient to unreliable remote calls and third-party libraries by applying these patterns. In a complex distributed environment where failure is inevitable, it can effectively prevent cascading failures and improve the availability of the system.

2 Hystrix

Hystrix 在 2012 年末由 Netflix 作为他们对分布式系统弹性设计的成果开源,见 Introducing Hystrix for Resilience Engineering[1]。 在开源后,Hystrix 也加入到 Spring Cloud Netflix 中,得到了广泛的使用。

Hystrix has the following features:

  • Protect services and provide control in the event of delays and failed calls
  • Prevent cascading failures in complex distributed systems
  • Fail fast, recover fast
  • Graceful downgrade
  • Real-time monitoring, alerting, configuration

In 2018, Netflix announced that Hystrix was officially in maintenance, adding no new features and recommending new projects to use Hystrix-inspired, lighter, more functional-invoked resilience4j[2]. Netflix itself has turned to concurrency-limits[3] that adjusts to the app's real-time performance adaptation (the project was last updated in 2019 and can be considered discontinued).

However, Hystrix doesn't lose value as a mature, production-tested, resilient calling library. Its ideas and design have been adopted by many other tools, and it is very stable to use. There are also many use cases of Hystrix in the existing services of Lingyue, and Hystrix has also been extended to meet our needs.

Next, let's start with the concept and principles of Hystrix, and talk about the extensions we have made to Hystrix and our experience with it.

2.1 Execution Process

The execution process of Hystrix is described in detail in Hystrix: How it Works[4], which is briefly repeated here for the following to expand, and students who are already familiar with the Hystrix execution process can skip it.

When a request comes in and calls a downstream dependency secured by Hystrix, Hystrix executes it as follows:

  1. 构建 HystrixCommand 或 HystrixObservableCommand。
  2. Hystrix uses a Command Pattern[5] to wrap requests for downstream dependencies. The implementation of Command includes logic for making calls downstream, and each call builds a new instance of Command.
  3. 根据调用产生的结果是单个还是多个,用户可以选择是继承 HystrixCommand 还是 HystrixObservableCommand。
  4. Execute the Command instance created in the previous step.
  5. 根据 Command 的类型 (HystrixCommand/ HystrixObservableCommand) 以及执行方式 (同步 / 异步 / 即时 / 延后),选择如下方法中的一种来执行 Command:
  6. execute(): 仅适用于 HystrixCommand,同步阻塞执行,并返回下游返回的响应或抛出异常,是 queue().get() 的 shortcut。
  7. queue(): 仅适用于 HystrixCommand,异步执行,返回 Future 对象,是 toObservable().toBlocking().toFuture() 的 shortcut。
  8. observe(): 订阅发送响应的 Observable 对象,并返回该 Observable 的镜像。 (即时执行)
  9. toObservable(): Returns the Observable object that sent the response, and only when the user actively subscribes will the specific logic start to execute and return the response. (Delayed Execution)
  10. It can be found that in fact, all four execution methods are based on the implementation of Observable.
  11. If you don't know anything about Observable, you can take a look at the literature on ReactiveX, or its implementation in Java, RxJava, which is heavily used in the Hystrix implementation. In simple terms, it's a stream-based publish-subscribe model, and an observable is a publishing source that you can subscribe to.
  12. K value = command.execute(); Future<K> fValue = command.queue(); Observable<K> ohValue = command.observe(); //hot observable Observable<K> ocValue = command.toObservable(); //cold observable
  13. Determine whether response caching is enabled.
  14. If a cache is available, the value in the cache will be returned directly at this step.
  15. Determine whether the circuit breaker is turned on or not.
  16. If the circuit breaker is open, the Command will not continue to be executed, but will directly attempt to get the Fallback.
  17. If the circuit breaker is turned off, the execution continues.
  18. Determine whether the thread pool / queue queue / semaphore is full.
  19. According to the quarantine policy adopted by Command (more on this later), if the number of requests in progress is full, the execution is abandoned in an attempt to get a fallback.
  20. Make the actual call.
  21. 触发具体的调用实现:HystrixCommand.run() 或 HystrixObservableCommand.construct()。 如果调用超过了配置的超时时间,会抛出一个 TimeoutException,随后和抛出其他除了 HystrixBadRequestException 的异常一样进入获取 Fallback 的流程。
  22. Hystrix can only attempt to interrupt threads that are actually executing calls and blocking due to timeouts, etc. However, since most Java HTTP clients do not respond to an InterruptedException, it is possible that the thread will continue to block until the connection times out, causing the thread pool to be full. Therefore, it is a good idea to handle InterruptedException in your code and configure a reasonable timeout for the http client.
  23. If the call executes normally (without timeouts or exceptions), Hystrix writes a log and records the monitoring information and returns.
  24. Calculate the health of the line
  25. Based on the newly obtained monitoring information, determine whether to turn on or off the circuit breaker.
  26. Get Fallback.
  27. 在上述提到的数种情况下不执行具体调用或者调用失败,Hystrix 均会尝试获取 Fallback 响应,也就是调用用户实现的 HystrixCommand.getFallback() 或 HystrixObservableCommand.resumeWithFallback()。 Fallback 顾名思义是一种降级的举措,所以用户尽量应该让这一步不会失败。 如果恰巧获取 Fallback 也是网络调用,则需要通过 HystrixCommand 或 HystrixObservableCommand 再包一层。
  28. If the user doesn't implement the Fallback method or if the fallback itself throws an exception, Hystrix returns an Observable instance that sends an onError notification directly. Here are four ways to call how to behave when a fallback is not implemented or a fallback throws an exception:
  29. execute() - 抛出异常
  30. queue() - 返回 Future,但是在调用 get() 方法时会抛出异常
  31. observe() - 返回 Observable 实例,当被订阅时会马上结束并调用订阅者的 onError 方法
  32. toObservable() - 同上
  33. In the form of an observable, a response to a successful call is returned. Depending on how the call is made, the Observable may be transformed.
  • execute() - Gets the Future object in the same way as queue() and calls get() to get a single response sent by the lowest-level observable
  • queue() - 将 Observable 转化为 BlockingObservable 以转化为 Future 并返回
  • observe() - 订阅 Observable 使得执行马上开始,并返回一个在用户订阅后可以回放 Observable 的镜像 Observable。
  • toObservable() - 返回原样的 Observable,仅在用户进行 subscribe 之后才开始执行。

2.2 Main Components

HystrixCommand and HystrixObservableCommand are the two main classes that Hystrix is exposed to, and use the Command pattern to wrap potentially risky function functions (typically downstream services over the network) with error and latency tolerance, performance data collection, circuit breakers, and isolation.

In addition to the basic invocation logic, they will also contain corresponding thread pools, circuit breakers, monitoring data collection, and other components. It would be too verbose and unclear to follow the entire execution process. Here we divide Hystrix into several modules according to the function, expand the description respectively, and finally look at the process of HystrixCommand and HystrixObservableCommand, which will be clearer.

Among them, Caching and Collapser are not so important compared to the other four modules, which will not be elaborated on in this article, and interested partners can learn about it for themselves.

2.2.1 动态配置 DynamicProperties

Hystrix runs on a configuration basis, and there are many configuration items at each stage, and many of them need to be adjusted according to the operation situation on the line, such as timeout timeout, number of threads, etc. If you use a static configuration that can't be changed in real time, it can be difficult to adjust. Therefore, one of the major features of Hystrix is the ability to dynamically modify configurations, both in terms of interface and use.

By default, it uses its own Archaius as the dynamic configuration source, but users can switch to their own dynamic configuration source implementation.

At the outermost layer, Hystrix is divided into several configuration classes according to the purpose of the configuration, such as HystrixCommandProperties and HystrixThreadPoolProperties, which correspond to Command-related configurations and thread pool-related configurations, respectively. There are getters for each specific configuration, which returns a generic interface instance, HystrixProperty<T>. This design provides enough flexibility for configuration providers to implement specific configuration acquisition logic. In the default implementation, dynamic configuration of chain priorities is implemented through a multi-layer composition (composite).

以 HystrixCommandProperties 为例,其结构如图:

When obtaining the configuration, an instance of the HystrixProperty interface will be returned, the specific implementation class is ChainHystrixProperty, and the bottom layer is ChainProperty, in which several HystrixDynamicProperties are connected in the form of a linked list, corresponding to the keys of the same configuration with different priorities. For example, hystrix.command.xxx.xxx has a higher priority than hystrix.command.default.xxx.xxx and only uses the former if it is not found. The HystrixDynamicProperty is obtained from HystrixDynamicProperties, which is the dynamic configuration source mentioned above, which can be configured in HystrixPlugins, and will use HystrixDynamicProperties by defaultArchaius, which is an Archaius-based implementation.

Archaius itself is also a library that supports dynamic configuration modification, and there is also support for Spring, for Spring Boot applications, as long as the Spring configuration is modified, the configuration of Archaius can be modified, and most of the configurations can be modified online in real time.

However, not all configurations can be dynamically modified, as can be found in the configuration section of the official repository wiki.

2.2.2 断路器 CircuitBreaker

Circuit breaking is another important feature of Hystrix in addition to isolation. By default, a circuit break can be triggered when a certain number of requests fail and a certain percentage of timeouts are reached within a certain period of time, and then new requests will directly fail without requesting the failed downstream service. After a period of time, Hystrix checks whether the downstream is back to normal by allowing one request, and if it does, stops the circuit break and releases the subsequent request, otherwise repeat the circuit break process.

The circuit disconnection function not only helps upstream services not waste too many resources to request downstream requests in the event of downstream failures, but also leaves some breathing space for downstream, which is conducive to timely recovery of downstream.

In the code, the circuit breaker implements the HystrixCircuitBreaker interface:

public interface HystrixCircuitBreaker {

    // 是否允许执行
    boolean allowRequest();

    // 断路器是否处在打开状态
    boolean isOpen();

    // 在半打开状态,表示测试请求调用成功
    void markSuccess();

    // 在半打开状态,表示测试请求调用失败
    void markNonSuccess();

    // 在 Command 开始执行时调用
    boolean attemptExecution();
}
           

2.2.2.1 默认实现 Default Implementation

In Hystrix code, the circuit breaker is implemented by default as HystrixCircuitBreakerImpl. The circuit breaker behavior mentioned above actually refers to the behavior under the default implementation. If the user has specific needs, it is completely possible to achieve it by implementing the circuit breaker themselves. Command only interacts with circuit breaker instances through some methods such as allowRequest and isOpen that are exposed externally, and a variety of logic can be implemented internally. In our practice, we find that the default circuit breaker implementation uses a single request to determine whether the downstream is healthy or not, and it is possible to run into embarrassing situations where a single request happens to succeed and then a large number of requests are released, resulting in a timeout. As a result, we implemented a circuit breaker with stepped recovery to replace the default circuit breaker implementation under the high-traffic Command. Let's start with the default implementation.

There are three circuit breaker states under the default circuit breaker implementation: CLOSED, OPEN and HALF_OPEN. CLOSED indicates that the current circuit breaker is closed, and the request passes normally. OPEN means that the circuit breaker is open, and the request cannot be passed within a certain period of time. HALF_OPEN indicates that after the circuit breaker has been open for a period of time, a request is allowed downstream to be returned as a result. Circuit breakers are called concurrently, so it is necessary to ensure that the transition of state is concurrency safe. The circuit breaker uses AtomicReference to save the current state and modify it with CAS when a state change is required.

So, how does the circuit breaker adjust between different states? When introducing the thread pool later, I mentioned the statistical class HystrixThreadPoolMetrics, which is based on sliding windows and buckets, in fact, Command also has a similar statistical class HystrixCommandMetrics, which are all implementation classes of HystrixMetrics, and the mechanism is very similar. The circuit breaker subscribes to HystrixCommandMetrics, and determines whether to change the status of the circuit breaker from off to on based on the number of requests and the success rate in the latest window when the sliding window scrolls.

// 断路器订阅 HystrixCommandMetrics 的回调方法
public void onNext(HealthCounts hc) {
    // 是否到达判断是否断路的最低请求量,否则跳过,因为请求量少的时候通过成功百分比来判断不准确
    if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {

    } else {
        if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
          // 错误率没有达到上限,无需进行操作

        } else {
            // 错误率超过上限,通过 CAS 将断路器状态调整为 OPEN,并记录时间用于后续休眠时间的判断
            if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                circuitOpened.set(System.currentTimeMillis());
            }
        }
    }
}
           

Suppose the circuit breaker is open here due to a high error rate, it will continue to reject incoming requests within the user-configured sleep window (circuitBreakerSleepWindowInMilliseconds). After this window, the state will be changed to HALF_OPEN when the next request enters, and the specific execution logic is in the attemptExecution method:

if (isAfterSleepWindow()) {
    //only the first request after sleep window should execute
    //if the executing command succeeds, the status will transition to CLOSED
    //if the executing command fails, the status will transition to OPEN
    //if the executing command gets unsubscribed, the status will transition to OPEN
    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
        return true;
    } else {
        return false;
    }
} else {
    return false;
}
           

After the request is executed, the circuit breaker is closed or reopened by calling the markSuccess or markNonSuccess callback methods.

The overall state diagram is as follows:

At this point, the default behavior and implementation of the circuit breaker are relatively clear.

2.2.2.2 改进 Improvement

In our practice, we find that the default circuit breaker implementation has two drawbacks:

  • In the semi-open state, it is not accurate to determine the health of the downstream by allowing a request and then decide whether to close the circuit breaker. The logic that makes the circuit breaker open is that out of a certain number of requests, a certain percentage of requests fail. In such a state, the probability of allowing a request to succeed is not small, and subsequent circuit breakers may still be disconnected immediately, resulting in circuit breakers in a state of repeated switching. It is conceivable that the downstream is a clogged water pipe that can only flow a drop of water in a second, while the upstream is a large water pipe. After the circuit break is triggered, the upstream water pipe stops the water, and the downstream water pipe can handle the dredging. However, before the downstream water pipe dredging is completed, the upstream water pipe releases a drop of water, judges the health of the downstream, and starts to release a large amount of water, and the downstream dredging work cannot be carried out.
  • There is a lack of a certain notification mechanism for changes in the state of the circuit breaker. The EventNotifier in HystrixPlugins can notify you when the break is broken, but we want to be able to notify you of the event that you have recovered, and plug the state change of the circuit breaker into our early warning system.

For the first problem, we implemented the HystrixSteppingRecoverCircuitBreaker for ladder recovery. It extends the default circuit breaker behavior by allowing a single request to be successful, entering a ladder recovery state, gradually allowing more and more requests downstream based on the percentage. If the request success rate for release reaches the required rate, continue to increase the percentage of release requests until 100% is restored. Since it is judged based on the success rate, it has certain requirements for the number of calls, and is suitable for use on interfaces with large traffic. Some of the codes are as follows:

public class HystrixSteppingRecoverCircuitBreaker implements HystrixCircuitBreaker {

  enum Status {
    CLOSED,
    OPEN,
    HALF_OPEN_SINGLE,
    HALF_OPEN_STEPPING; // 添加 STEPPING 状态
  }
  
  // ...
  
  @Override
  public boolean allowRequest() {
    if (properties.circuitBreakerForceOpen().get()) {
      return false;
    }
    if (properties.circuitBreakerForceClosed().get()) {
      return true;
    }
    if (circuitOpened.get() == -1) {
      return true;
    }
    if (status.get() == Status.HALF_OPEN_STEPPING) {
      // 处于 STEPPING 状态时,放行当前 step 百分比的请求
      return ThreadLocalRandom.current().nextFloat() < stepper.currentStep();
    }
    if (isAfterSleepWindow() && status.compareAndSet(Status.OPEN, Status.HALF_OPEN_SINGLE)) {
      write(Status.OPEN, Status.HALF_OPEN_SINGLE);
      return true;
    }
    return false;
  }

  @Override
  public void markSuccess() {
    // 在 HALF_OPEN_SINGLE 状态,放行单个请求成功后,进入 stepping 阶梯恢复状态
    toSteppingIfSingle();
  }

  // ...
  
  // 状态转换方法
  private void onNext(HealthCounts healthCounts) {
    if (status.get() == Status.HALF_OPEN_STEPPING && isSteppingRequestVolumeSufficient(healthCounts)) {
      if (isSuccessRateSufficient(healthCounts)) {
        if (stepper.nextStep() == 1) {
          toCloseIfStepping();
        }
        return;
      }
      toOpenIfStepping();
    }

    if (isRequestVolumeSufficient(healthCounts)
        && !isSuccessRateSufficient(healthCounts)
        && status.compareAndSet(Status.CLOSED, Status.OPEN)) {
      circuitOpened.set(System.currentTimeMillis());
      write(Status.CLOSED, Status.OPEN);
    }
  }

  private void toSteppingIfSingle() {
    if (status.compareAndSet(Status.HALF_OPEN_SINGLE, Status.HALF_OPEN_STEPPING)) {
      resubscribe();
      write(Status.HALF_OPEN_SINGLE, Status.HALF_OPEN_STEPPING);
    }
  }

  private void toCloseIfStepping() {
    if (status.compareAndSet(Status.HALF_OPEN_STEPPING, Status.CLOSED)) {
      resubscribe();
      stepper.resetStep();
      circuitOpened.set(-1L);
      write(Status.HALF_OPEN_STEPPING, Status.CLOSED);
    }
  }

  private void toOpenIfStepping() {
    if (status.compareAndSet(Status.HALF_OPEN_STEPPING, Status.OPEN)) {
      stepper.resetStep();
      circuitOpened.set(System.currentTimeMillis());
      write(Status.HALF_OPEN_STEPPING, Status.OPEN);
    }
  }

// 实现 stepper,管理放行百分比
  static class Stepper {

    private AtomicInteger currentStepPos = new AtomicInteger(0);
    private List<Double> steps;
    
    // ...

    private double currentStep() {
      int index = currentStepPos.get();
      return isStepEnd(index) ? 1 : steps.get(index);
    }

    private double nextStep() {
      int index = currentStepPos.incrementAndGet();
      if (isStepEnd(index)) {
        return 1;
      }
      return steps.get(index);
    }

    private boolean isStepEnd(int index) {
      return index >= steps.size() || steps.get(index) == 1;
    }

    private void resetStep() {
      currentStepPos.set(0);
    }
  }
}
           

For the second problem, we create an Observable subscription stream that is responsible for storing events for a circuit breaker state change, which is written when the circuit breaker state changes. On top of this, the callback mechanism can be implemented to allow users to register callbacks for a command, subscribe to the events of the command, and implement multi-layer subscriptions through composition.

2.2.3 隔离机制 Isolation

2.2.3.1 Isolation Level

As mentioned earlier, Hystrix uses the Bulkhead pattern to provide fault tolerance. To put it simply, it is to isolate the resources used by the system dependent but unrelated service calls, so that when a downstream service fails, the resources of the entire service will not be occupied.

Hystrix 为不同的应用场景提供两种隔离级别:Thread 和 Semaphore。

Thread isolation

Of these, Thread is the most widely used and recommended on Netflix. The thread isolation level is easy to understand, which means that calls are executed in a different thread, and that all related calls use threads from a thread pool.

The benefits of this are as follows:

  • For the delegate thread, it can walk away at any time when there is a timeout call, and execute the fallback logic, without blocking the connection timeout and dragging down the response time of the service.
  • In terms of isolation, when a downstream service has a timeout failure, only that thread pool will be full, and there will be no impact on unrelated services using other thread pools or the service itself. When the downstream is healthy, the thread pool becomes available again and returns to its normal state.
  • For monitoring, because the thread pool has a variety of monitoring data, such as the number of threads occupied, the number of queued requests, the number of tasks executed, etc., we can sense and respond to the performance changes of the client or downstream services at the first time.
  • For the project, this is the equivalent of introducing a small concurrency module that makes it easy to build asynchronous systems with a synchronous client (which is what the Netflix API does)

However, how do I set the size of the thread pool? In most cases, the default 10 threads will suffice. If you want to adjust it further, the official gives a simple and effective formula:

requests per second at peak when healthy × 99th percentile latency in seconds + some breathing room

Peak qps * P99 response time + appropriate number of extra buffer threads

As a simple example, for an interface with a peak of 30 calls per second and a P99 response time of 0.2s, you can calculate that it takes 30 * 0.2 + 4 = 10 threads. However, in practical applications, it still needs to be analyzed on a case-by-case basis, and this formula is not suitable for some interfaces with large numerical variance.

Of course, thread isolation is not a silver bullet either. From the time a business thread submits a specific call to the thread pool until the execution is completed, it needs to pay the overhead of task queuing, thread pool scheduling, and context switching. Netflix has taken this into account and tested it accordingly. For an interface that is requested 60 times per second, the overhead of using thread isolation at P50, P90, and P99 is 0ms, 3ms, and 9ms, respectively.

Given the advantages of thread isolation, this overhead is often acceptable for most interfaces.

Semaphore isolation

However, if your interface response time is so small that you can't afford the overhead of thread isolation, and you can trust the interface to return quickly, you can use the Semaphore isolation level. The reason for this is that semaphore isolation naturally does not allow it to return directly when a timeout occurs, as is the case with thread isolation, but instead needs to wait for the end of the client's blocking. In Hystrix, Semaphore is supported for command execution as well as for fallbacks. Configure execution.isolation.strategy to SEMAPHORE to change the default THREAD isolation level to semaphore isolation. Based on the response time of the interface and the number of calls per unit of time, you can calculate the number of concurrent executions that can be allowed in a similar way to counting the number of threads.

2.2.3.2 Implementation

When initialized, Command passes its own ThreadPoolKey (groupKey by default) to the HystrixThreadPool.Factory factory class. A ThreadPoolKey corresponds to a thread pool, and the factory will first check whether it has been created in the ConcurrentHashMap cache, and if it has been created, it will return it directly, and if not, it will create it.

static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
    
    String key = threadPoolKey.name();

    // 若已创建过,则返回缓存的线程池
    HystrixThreadPool previouslyCached = threadPools.get(key);
    if (previouslyCached != null) {
        return previouslyCached;
    }

    // 尚未创建,进入同步块进行创建。这里的同步块主要是防止重复创建线程池
    synchronized (HystrixThreadPool.class) {
        if (!threadPools.containsKey(key)) {
            threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
        }
    }
    return threadPools.get(key);
}

           

HystrixThreadPool is a simple interface that has a method to get the ExecutorService, a method for the Scheduler used in rxJava, and some ways to record metrics. The specific implementation class is HystrixThreadPoolDefault. It's simpler to implement because it offloads the task of creating a thread pool to HystrixConcurrencyStrategy in HystrixPlugins. It is mainly responsible for providing external interfaces to obtain thread pools and schedulers, adjusting thread pools according to configuration changes, and recording the monitoring information of thread pools.

/* package */static class HystrixThreadPoolDefault implements HystrixThreadPool {
    private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class);

    private final HystrixThreadPoolProperties properties;
    private final BlockingQueue<Runnable> queue;
    private final ThreadPoolExecutor threadPool;
    private final HystrixThreadPoolMetrics metrics;
    private final int queueSize;

    public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
        this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
        HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
        this.queueSize = properties.maxQueueSize().get();

        this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
                // 通过 concurrencyStrategy 创建线程池,这是用户可扩展的插件
                concurrencyStrategy.getThreadPool(threadPoolKey, properties),
                properties);
        this.threadPool = this.metrics.getThreadPool();
        this.queue = this.threadPool.getQueue();

        /* strategy: HystrixMetricsPublisherThreadPool */
        HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
    }
  // ...

           

Here's a quick rundown of HystrixConcurrencyStrategy. It is a plug-in responsible for controlling concurrency-related policies, and can be configured as a self-implemented subclass in HystrixPlugins, with the default system implementation being HystrixConcurrencyStrategyDefault. The methods that can be reridden in HystrixConcurrencyStrategy are as follows:

// 控制如何根据配置创建线程池
ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);
ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties);
// 控制如何创建阻塞队列
BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize);
// 控制是否对 Callable 进行处理,例如存储上下文信息
<T> Callable<T> wrapCallable(Callable<T> callable);
// 控制获取请求上下文
<T> HystrixRequestVariable<T> getRequestVariable(final HystrixRequestVariableLifecycle<T> rv);
           

We will also extend it to implement the ability to pass trace information between the business request thread and the Hystrix execution thread.

HystrixThreadPoolMetrics collects the monitoring information of the thread pool in the form of a configuration-based sliding window + bucket and outputs it to the user-configured HystrixMetricsPublisher. Let's skip it here, because the metrics of the thread pool are only a part, and there are other metrics later, and their statistics and output methods are similar, which can be discussed later.

The corresponding interface for semaphore is TryableSemaphore, and the interface definition is very simple:

interface TryableSemaphore {
    // 尝试获取信号量,若返回 true 为成功获取
    public abstract boolean tryAcquire();
    // 释放已经获得的信号量
    public abstract void release();
    // 获取已经占用的数量
    public abstract int getNumberOfPermitsUsed();
}
           

It has two implementation classes, TryableSemaphoreActual, which uses semaphore as the isolation level for Command, and TryableSemaphoreNoOp, which is used without any restrictions when Command uses threads as isolation level. Here's a quick rundown of TrableSemaphoreActual. It implements a counter-based semaphore that can be dynamically adjusted. Because it doesn't need to block, and it needs to implement the ability to dynamically adjust the concurrency upper limit, it doesn't use the Semaphore implementation that is already in the Java concurrent package, but uses AtomicInteger to implement one itself, and the code is also very simple, introducing a HystrixProperty that can be dynamically changed as an upper limit, automatically incrementing the counter every time the request is acquired, and returning false and self-decreasing if the upper limit is reached.

class TryableSemaphoreActual implements TryableSemaphore {
    protected final HystrixProperty<Integer> numberOfPermits;
    private final AtomicInteger count = new AtomicInteger(0);

    @Override
    public boolean tryAcquire() {
        int currentCount = count.incrementAndGet();
        if (currentCount > numberOfPermits.get()) {
            count.decrementAndGet();
            return false;
        } else {
            return true;
        }
    }
    // ...
}
           

2.2.4 运行数据 Metrics

During the HystrixCommand and HystrixObservableCommand runs, a lot of run data is generated, such as latency, results, queue time, and so on. Individually or aggregated, this data is useful for users to understand how the system is performing and make adjustments. Here's a diagram of Command logging run data while executing:

These resulting metrics are stored in memory for a certain amount of time, making them easy to query and export.

At the top level, Metrics is divided into three categories based on categories, which are used to update and obtain monitoring data. These include HystrixCommandMetrics, Hystrix ThreadPoolMetrics, and HystrixCollapserMetrics.

Their underlying implementations are similar, using a bucket structure to store data per unit of time and buckets within a certain time window to provide metrics statistics within a sliding time window. Each time per unit of time, the oldest bucket is deleted, the latest bucket is added, and a new bucket is opened to count the data of the next unit of time. Implementing the RxJava-based Observable Subscription Stream and its window function will be cumbersome to read, and the logic is already relatively clear, so I will not explain it here. A structure diagram is given here, and interested students can look at the source code by themselves.

As complicated as it may seem, users don't need to worry about this, they just need to implement HystrixMetricsPublisher provided by Hystrix to export the running data as they see fit. HystrixMetricsPublisher is an abstract class that consists of three methods:

public abstract class HystrixMetricsPublisher {
  // 返回用于上报 CommandMetrics 的 HystrixMetricsPublisherCommand
  HystrixMetricsPublisherCommand getMetricsPublisherForCommand(HystrixCommandKey commandKey, HystrixCommandGroupKey commandGroupKey, HystrixCommandMetrics metrics, HystrixCircuitBreaker circuitBreaker, HystrixCommandProperties properties);
  // 返回用于上报 ThreadPoolMetrics 的 HystrixMetricsPublisherThreadPool
  HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties);
  // 返回用于上报 CollapserMetrics 的 HystrixMetricsPublisherCollapser
  HystrixMetricsPublisherCollapser getMetricsPublisherForCollapser(HystrixCollapserKey collapserKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties);
}
           

Returns the implementations that are used to report different Metrics. However, it would be confusing to click on the definitions of these interfaces, because there is only one initialize method in them, such as HystrixMetricsPublisherThreadPool:

public interface HystrixMetricsPublisherThreadPool {
    void initialize();
}
           

In fact, Hystrix provides a variety of Metrics subscription streams for downstream subscriptions, and the user-provided implementation classes are the ones that should subscribe to these streams and output them to an external source. The API to get the stream is as follows:

Class Method Return Type
HystrixCommandStartStream observe() Observable
HystrixCommandCompletionStream observe() Observable
HystrixThreadPoolStartStream observe() Observable
HystrixThreadPoolCompletionStream observe() Observable
HystrixCollapserEventStream observe() Observable
HystrixRequestEventsStream observe() Observable

On top of these basic streams, Hystrix also provides some common aggregate streams, directly providing the aggregated monitoring data that most users need, such as RollingCommandEventCounterStream.

All you need to do is initialize an implementation class such as HystrixMetricsPublisherCommand in HystrixMetricsPublisher, create the required flows, and initialize external sources. In the initialize method, confirm the subscription relationship and start consuming data.

In practice, we implemented HystrixMetricsPublisher to write metrics to Influxdb for easy viewing in Grafana charts.

以导出 Command Metrics 的 HystrixPublisherCommand 为例:

public class HystrixYqgMetricsPublisherCommand implements HystrixMetricsPublisherCommand {

  private final HystrixCommandKey commandKey;
  private final HystrixCommandGroupKey commandGroupKey;
  private final Observable<CachedHistograms> bucketedHistogram;
  private final MetricsReporter metricsReporter;

  public HystrixYqgMetricsPublisherCommand(HystrixCommandKey commandKey, HystrixCommandGroupKey commandGroupKey, HystrixCircuitBreaker circuitBreaker, HystrixCommandProperties properties, MetricsReporter metricsReporter) {
    this.commandKey = commandKey;
    this.commandGroupKey = commandGroupKey;
    this.metricsReporter = metricsReporter;

    final int metricWindow = properties.metricsRollingStatisticalWindowInMilliseconds().get();
    final int numCounterBuckets = properties.metricsRollingStatisticalWindowBuckets().get();
    final int bucketSizeInMs = metricWindow / numCounterBuckets;
    final HystrixCommandCompletionStream stream = HystrixCommandCompletionStream.getInstance(commandKey);

    this.bucketedHistogram = getBucketedHistogram(stream, bucketSizeInMs);
  }

  private Observable<CachedHistograms> getBucketedHistogram(final HystrixCommandCompletionStream stream, final int bucketSizeInMs) {
    return stream
        .observe()
        .observeOn(Schedulers.computation())
        .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
        .flatMap(bucket -> bucket.reduce(new Histograms(), this::addValuesToBucket))
        .map(CachedHistograms::backedBy);
  }

  @Override
  public void initialize() {
    this.bucketedHistogram.subscribe(cachedHistograms ->
        // metrics reporter 负责将数据写入到 Influxdb                             
        metricsReporter.writeCommandMetrics(commandGroupKey.name(), commandKey.name(), cachedHistograms));
  }

  private Histograms addValuesToBucket(Histograms initialDistribution, HystrixCommandCompletion event) {
    for (HystrixEventType eventType: HystrixEventType.values()) {
      switch (eventType) {
        case EXCEPTION_THROWN: break;
        default:
          initialDistribution.counters[eventType.ordinal()] += event.getEventCounts().getCount(eventType);
          break;
      }
    }

    if (event.didCommandExecute() && event.getTotalLatency() > -1 && event.getExecutionLatency() > -1) {
      initialDistribution.totalLatency.recordValue(event.getTotalLatency());
      initialDistribution.executeLatency.recordValue(event.getExecutionLatency());
      initialDistribution.queueLatency.recordValue(event.getTotalLatency() - event.getExecutionLatency());
    }

    return initialDistribution;
  }
}
           

View a graph of the volume of different Command calls in the service on Grafana:

不过,你也可以直接使用 hystrix-metrics-event-stream[6] 来使用官方的 dashboard。

Additional details can be found in the official metrics wiki[7].

At this point, the main components of Hystrix are pretty much the same. Combined with the execution process described above, I believe that you can have some understanding of the inner workings of Hystrix.

2.3 Usage

Common uses of Hystrix include direct use, use in combination with Feign[8] in the same technology stack, and integration in Spring Boot.

2.3.1 Direct Use

Users can implement HystrixCommand or HystrixObservableCommand and configure it, which is primitive but also flexible.

For example:

public class CommandHelloWorld extends HystrixCommand<String> {

    private final String name;

    public CommandHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected String run() {
        // a real example would do work like a network call here
        return "Hello " + name + "!";
    }
}

String s = new CommandHelloWorld("Bob").execute();
Future<String> s = new CommandHelloWorld("Bob").queue();
Observable<String> s = new CommandHelloWorld("Bob").observe();
           

2.3.2 in conjunction with Feign

Feign is a templated http tool under Netflix, users only need to write interfaces and annotations, and the feign runtime generates the corresponding http call code, and the bottom layer can specify different clients and load balancers, etc.

通过 HystrixFeign[9] 模块,能够将具体的调用嵌套在 Hystrix command 中。 以下是使用 HystrixFeign 构建具备 Hystrix 保护的客户端,并配置 Command Key、fallback 的使用样例:

GitHub github = HystrixFeign.builder()
        .setterFactory(commandKeyIsRequestLine)
        .target(GitHub.class, "https://api.github.com", fallback);
           

However, when using Feign, you need to pay attention to the problem of error handling. For some non-functional business errors, you need to wrap it as HystrixBadRequestException, so that Hystrix does not count the error and cause a circuit breaker.

2.3.3 Integration in Spring Boot

By introducing the dependency spring-cloud-starter-hystrix into your project, you can easily execute it in a bean using the @HystrixCommand annotation method under Hystrix, for example

@Service
public class GreetingService {
    @HystrixCommand(fallbackMethod = "defaultGreeting")
    public String getGreeting(String username) {
        return new RestTemplate()
          .getForObject("http://localhost:9090/greeting/{username}", 
          String.class, username);
    }
 
    private String defaultGreeting(String username) {
        return "Hello User!";
    }
}
           

3 Future developments

As mentioned earlier, Hystrix is now under maintenance and new users can move to a similar library, resilience4j. Netflix's concurrency limit[10], a library that adaptively throttles based on service latency, has not been updated in recent years and is likely to have been abandoned. However, transparent flow control for users and adaptive adjustment, eliminating cumbersome configuration will also be the direction of our efforts.

There are some sidecar-based flow control solutions, such as Istio[11], that can provide elasticity to inter-service calls in a microservice architecture. Although it may not be as feature-rich as Hystrix, the benefit is simplicity and transparency.

4 Summary

Starting from elastic calling, this article introduces the execution process, main component functions and implementations, common usage patterns, etc. of the commonly used Hystrix library, combined with the transformation of Hystrix in the company's internal practice, hoping to help understand Hystrix.

Links

[1]Introducing Hystrix for Resilience Engineering: https://netflixtechblog.com/introducing-hystrix-for-resilience-engineering-13531c1ab362

[2]resilience4j: https://github.com/resilience4j/resilience4j

[3]concurrency-limits: https://github.com/Netflix/concurrency-limits

[4]Hystrix: How it Works: https://github.com/Netflix/Hystrix/wiki/How-it-Works

[5]Command Pattern: https://en.wikipedia.org/wiki/Command_pattern

[6]hystrix-metrics-event-stream: https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-metrics-event-stream

[7]metrics wiki: https://github.com/Netflix/Hystrix/wiki/Metrics-and-Monitoring

[8]Feign: https://github.com/OpenFeign/feign

[9]HystrixFeign: https://github.com/OpenFeign/feign/blob/master/hystrix

[10]concurrency limit: https://github.com/Netflix/concurrency-limits

[11]Istio: https://istio.io/latest/zh/docs/concepts/traffic-management/           

Author: Zhan Wenhui

Source-WeChat public account: Foreign money bank technical team

Source: https://mp.weixin.qq.com/s/xUxHM0LcJ5ISH7vbWf5TOQ