目錄
Feignclient注冊
FeignClient初始化
指定url
不指定url
ribbon負載
總結
上篇文章我講了openfeign的逾時和重試。首先我想發2個勘誤:
1.下面的2個配置對單個接口逾時并沒有起作用,作為eureka用戶端使用時,起作用的其實是預設逾時時間,作為普通http用戶端時,起作用的其實也是預設逾時時間。
hystrix.command.FeignAsHttpCient#feignReadTimeout().execution.isolation.thread.timeoutInMilliseconds=13000
hystrix.command.FeignAsEurekaClient#feignReadTimeout().execution.isolation.thread.timeoutInMilliseconds=23000
2.openfeign作為普通用戶端,其實是可以重試的。
看了本文的源碼解讀,就可以搞明白上面的2個問題了。
服務啟動時,feignclient需要注冊為spring的bean,具體實作代碼在FeignClientsRegistrar,這個類實作了ImportBeanDefinitionRegistrar,spring初始化容器的時候會掃描實作這個接口的方法,進行bean注冊。
接口定義的方法是registerBeanDefinitions,FeignClientsRegistrar的實作如下:
public void registerBeanDefinitions(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
registerDefaultConfiguration(metadata, registry);
registerFeignClients(metadata, registry);
}
private void registerDefaultConfiguration(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
Map<String, Object> defaultAttrs = metadata
.getAnnotationAttributes(EnableFeignClients.class.getName(), true);
if (defaultAttrs != null && defaultAttrs.containsKey("defaultConfiguration")) {
String name;
if (metadata.hasEnclosingClass()) {
name = "default." + metadata.getEnclosingClassName();
}
else {
name = "default." + metadata.getClassName();//name="default.boot.Application"
}
registerClientConfiguration(registry, name,
defaultAttrs.get("defaultConfiguration"));
}
}
下面這個方法看過spring代碼的就熟悉了,一個bean的注冊:
private void registerClientConfiguration(BeanDefinitionRegistry registry, Object name,
Object configuration) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder
.genericBeanDefinition(FeignClientSpecification.class);
builder.addConstructorArgValue(name);
builder.addConstructorArgValue(configuration);
registry.registerBeanDefinition(
//這裡name="default.boot.Application.FeignClientSpecification",
//bean="org.springframework.cloud.openfeign.FeignClientSpecification"
name + "." + FeignClientSpecification.class.getSimpleName(),
builder.getBeanDefinition());
}
下面的代碼是注冊Feign用戶端
public void registerFeignClients(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
ClassPathScanningCandidateComponentProvider scanner = getScanner();
scanner.setResourceLoader(this.resourceLoader);
//配置要掃描的basePackage,這裡是"boot"(@SpringBootApplication(scanBasePackages = {"boot"}))
for (String basePackage : basePackages) {
Set<BeanDefinition> candidateComponents = scanner
.findCandidateComponents(basePackage);
for (BeanDefinition candidateComponent : candidateComponents) {
if (candidateComponent instanceof AnnotatedBeanDefinition) {
// verify annotated class is an interface
AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
Assert.isTrue(annotationMetadata.isInterface(),
"@FeignClient can only be specified on an interface");
//找出注解是FeignClient的attributes,注冊到spring容器
Map<String, Object> attributes = annotationMetadata
.getAnnotationAttributes(
FeignClient.class.getCanonicalName());
String name = getClientName(attributes);//這裡的name是springboot-mybatis
registerClientConfiguration(registry, name,
attributes.get("configuration"));
//這個方法就不講了,封裝BeanDefinition,注冊到spring容器
registerFeignClient(registry, annotationMetadata, attributes);
}
}
}
}
Feign用戶端的初始化在FeignClientFactoryBean類,這個類實作了FactoryBean接口,在getObject,這裡的uml類圖如下:
getObject方法的代碼如下:
<T> T getTarget() {
FeignContext context = applicationContext.getBean(FeignContext.class);
Feign.Builder builder = feign(context);
//如果feignClient沒有指定url,就走這個分支,這裡會通過ribbon走負載均衡
if (!StringUtils.hasText(this.url)) {
if (!this.name.startsWith("http")) {
url = "http://" + this.name;
}
else {
url = this.name;
}
//http://springboot-mybatis
url += cleanPath();
return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type,
this.name, url));
}
//feignClient指定了url,走到這兒
if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
this.url = "http://" + this.url;
}
//url = "http://localhost:8083"
String url = this.url + cleanPath();
//LoadBalancerFeignClient
Client client = getOptional(context, Client.class);
if (client != null) {
if (client instanceof LoadBalancerFeignClient) {
// not load balancing because we have a url,
// but ribbon is on the classpath, so unwrap
//OKHttpClient
client = ((LoadBalancerFeignClient)client).getDelegate();
}
builder.client(client);
}
//這裡是HystrixTargeter,不知道為什麼總是不用DefaultTargeter
Targeter targeter = get(context, Targeter.class);
return (T) targeter.target(this, builder, context, new HardCodedTarget<>(
this.type, this.name, url));
}
我們先來看一下FeignClient不指定url的情況,代碼如下:
protected <T> T loadBalance(Feign.Builder builder, FeignContext context,
HardCodedTarget<T> target) {
//這裡的client是LoadBalancerFeignClient
Client client = getOptional(context, Client.class);
if (client != null) {
builder.client(client);
//這裡的targeter是HystrixTargeter
Targeter targeter = get(context, Targeter.class);
return targeter.target(this, builder, context, target);
}
throw new IllegalStateException(
"No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-netflix-ribbon?");
}
再看看HystrixTargeter中的target
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context,
Target.HardCodedTarget<T> target) {
if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
return feign.target(target);
}
feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
SetterFactory setterFactory = getOptional(factory.getName(), context,
SetterFactory.class);
if (setterFactory != null) {
builder.setterFactory(setterFactory);
}
Class<?> fallback = factory.getFallback();
if (fallback != void.class) {
return targetWithFallback(factory.getName(), context, target, builder, fallback);
}
Class<?> fallbackFactory = factory.getFallbackFactory();
if (fallbackFactory != void.class) {
return targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory);
}
//這裡傳回的是一個HardCodedTarget的代理,HardCodedTarget(type=FeignAsEurekaClient, name=springboot-mybatis, url=http://springboot-mybatis)
//FeignAsEurekaClient就是我demo中的feign用戶端類,可以看出,這裡是為FeignAsEurekaClient做了一個代理
return feign.target(target);
}
上面targe傳回的對象debug内容如下:
proxy = {$Proxy168@11372} "HardCodedTarget(type=FeignAsEurekaClient, name=springboot-mybatis, url=http://springboot-mybatis)"
h = {HystrixInvocationHandler@11366} "HardCodedTarget(type=FeignAsEurekaClient, name=springboot-mybatis, url=http://springboot-mybatis)"
target = {Target$HardCodedTarget@11142} "HardCodedTarget(type=FeignAsEurekaClient, name=springboot-mybatis, url=http://springboot-mybatis)"
type = {Class@9295} "interface boot.feign.FeignAsEurekaClient"
name = "springboot-mybatis"
url = "http://springboot-mybatis"
dispatch = {LinkedHashMap@11346} size = 5
{Method@11392} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.getEmployeebyName(java.lang.String)" -> {SynchronousMethodHandler@11431}
{Method@11393} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.saveEmployeebyName(boot.feign.Employee)" -> {SynchronousMethodHandler@11432}
{Method@11394} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.feignReadTimeout()" -> {SynchronousMethodHandler@11433}
{Method@11395} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.uploadFile(org.springframework.web.multipart.MultipartFile)" -> {SynchronousMethodHandler@11434}
{Method@11396} "public abstract feign.Response boot.feign.FeignAsEurekaClient.downloadFile(java.lang.String)" -> {SynchronousMethodHandler@11435}
fallbackFactory = null
fallbackMethodMap = {LinkedHashMap@11382} size = 5
{Method@11392} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.getEmployeebyName(java.lang.String)" -> {Method@11392} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.getEmployeebyName(java.lang.String)"
{Method@11393} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.saveEmployeebyName(boot.feign.Employee)" -> {Method@11393} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.saveEmployeebyName(boot.feign.Employee)"
{Method@11394} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.feignReadTimeout()" -> {Method@11394} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.feignReadTimeout()"
{Method@11395} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.uploadFile(org.springframework.web.multipart.MultipartFile)" -> {Method@11395} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.uploadFile(org.springframework.web.multipart.MultipartFile)"
{Method@11396} "public abstract feign.Response boot.feign.FeignAsEurekaClient.downloadFile(java.lang.String)" -> {Method@11396} "public abstract feign.Response boot.feign.FeignAsEurekaClient.downloadFile(java.lang.String)"
setterMethodMap = {LinkedHashMap@11383} size = 5
{Method@11392} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.getEmployeebyName(java.lang.String)" -> {HystrixCommand$Setter@11414}
{Method@11393} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.saveEmployeebyName(boot.feign.Employee)" -> {HystrixCommand$Setter@11415}
{Method@11394} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.feignReadTimeout()" -> {HystrixCommand$Setter@11416}
{Method@11395} "public abstract java.lang.String boot.feign.FeignAsEurekaClient.uploadFile(org.springframework.web.multipart.MultipartFile)" -> {HystrixCommand$Setter@11417}
{Method@11396} "public abstract feign.Response boot.feign.FeignAsEurekaClient.downloadFile(java.lang.String)" -> {HystrixCommand$Setter@11418}
我們再來看一下FeignClient指定url的情況,這種情況跟不指定url類似,隻是代理的類中有url值。debug發現使用的代理也是HardCodedTarget,代碼如下:
proxy = {$Proxy161@11205} "HardCodedTarget(type=FeignAsHttpCient, name=feign, url=http://localhost:8083)"
h = {ReflectiveFeign$FeignInvocationHandler@11201} "HardCodedTarget(type=FeignAsHttpCient, name=feign, url=http://localhost:8083)"
target = {Target$HardCodedTarget@11192} "HardCodedTarget(type=FeignAsHttpCient, name=feign, url=http://localhost:8083)"
type = {Class@9298} "interface boot.feign.FeignAsHttpCient"
name = "feign"
url = "http://localhost:8083"
dispatch = {LinkedHashMap@11194} size = 1
{Method@11221} "public abstract java.lang.String boot.feign.FeignAsHttpCient.feignReadTimeout()" -> {SynchronousMethodHandler@11222}
從上面的代碼分析中,我們看出,這2種方式的主要不同是,如果不指定url,則給Feign傳入的是LoadBalancerFeignClient,它是一個裝飾器,裡面的delegate指定了實際的client,這裡是OkHttpClient。而如果指定了url,給Feign傳入的就是實際的httpclient,這裡是OKHttpClient。
上面使用了代理,這裡的UML類圖如下:
通過這張圖,我們可以看到代理是怎麼最終走到OkHttpClient的。如果使用了熔斷,則使用HystrixInvocationHandler,否則使用FeignInvocationHandler,他們的invoke方法最終都調用了SynchronousMethodHandler的invoke,這裡最終調用了底層的OkHttpClient。
上面的類圖看出,SynchronousMethodHandler這個類的invoke方法是上面的代理中反射觸發的方法,我們來看一下:
public Object invoke(Object[] argv) throws Throwable {
//RequestTemplate封裝RequestTemplate
RequestTemplate template = buildTemplateFromArgs.create(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
return executeAndDecode(template);
} catch (RetryableException e) {
//這裡可以看出,無論是不是指定url,都會走重試的邏輯,預設重試是不生效的
try {
retryer.continueOrPropagate(e);
} catch (RetryableException th) {
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null) {
throw cause;
} else {
throw th;
}
}
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
Object executeAndDecode(RequestTemplate template) throws Throwable {
Request request = targetRequest(template);
Response response;
long start = System.nanoTime();
try {
//這裡調用OkHttpClient,這個并不是原生的那個OkHttpClient,而是Feign封裝的,看下面的講解
response = client.execute(request, options);
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
boolean shouldClose = true;
try {
//省略部分代碼
//處理響應
if (response.status() >= 200 && response.status() < 300) {
if (void.class == metadata.returnType()) {
return null;
} else {
Object result = decode(response);
shouldClose = closeAfterDecode;
return result;
}
} else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) {
Object result = decode(response);
shouldClose = closeAfterDecode;
return result;
} else {
throw errorDecoder.decode(metadata.configKey(), response);
}
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime);
}
throw errorReading(request, response, e);
} finally {
if (shouldClose) {
ensureClosed(response.body());
}
}
}
前面我們講過,如果指定了url,就不走ribbon的LoadBalance了,而是直接用httpclient去發送請求。其實說"直接",也不完全是直接,因為feign封裝了一個自己的OkHttpClient,并且有自己的Request,Response。
OkHttpClient這個裝飾器類首先包含了一個okhttp3.OkHttpClient的用戶端,發送請求的時候,首先把feign.Request轉換成okhttp的Request,而接收響應的時候,會把okhttp的Response轉換成feign.Response,代碼如下:
feign.Request轉換成okhttp的Request
static Request toOkHttpRequest(feign.Request input) {
Request.Builder requestBuilder = new Request.Builder();
requestBuilder.url(input.url());//封裝url
MediaType mediaType = null;
boolean hasAcceptHeader = false;
//封裝headers
for (String field : input.headers().keySet()) {
if (field.equalsIgnoreCase("Accept")) {
hasAcceptHeader = true;
}
for (String value : input.headers().get(field)) {
requestBuilder.addHeader(field, value);
if (field.equalsIgnoreCase("Content-Type")) {
mediaType = MediaType.parse(value);
if (input.charset() != null) {
mediaType.charset(input.charset());
}
}
}
}
// Some servers choke on the default accept string.
if (!hasAcceptHeader) {
requestBuilder.addHeader("Accept", "*/*");
}
byte[] inputBody = input.body();
boolean isMethodWithBody =
HttpMethod.POST == input.httpMethod() || HttpMethod.PUT == input.httpMethod()
|| HttpMethod.PATCH == input.httpMethod();
if (isMethodWithBody) {
requestBuilder.removeHeader("Content-Type");
if (inputBody == null) {
// write an empty BODY to conform with okhttp 2.4.0+
// http://johnfeng.github.io/blog/2015/06/30/okhttp-updates-post-wouldnt-be-allowed-to-have-null-body/
inputBody = new byte[0];
}
}
//封裝body
RequestBody body = inputBody != null ? RequestBody.create(mediaType, inputBody) : null;
requestBuilder.method(input.httpMethod().name(), body);
return requestBuilder.build();
}
把okhttp的Response轉換成feign.Response
private static feign.Response toFeignResponse(Response response, feign.Request request)
throws IOException {
return feign.Response.builder()
.status(response.code())
.reason(response.message())
.request(request)
.headers(toMap(response.headers()))
.body(toBody(response.body()))
.build();
}
發送請求的方法
public feign.Response execute(feign.Request input, feign.Request.Options options)
throws IOException {
okhttp3.OkHttpClient requestScoped;
//這裡delegate的connectTimeoutMillis預設是2000,delegate的readTimeoutMillis預設是100000
//從代碼可以看到,如果配置了options的逾時時間跟不一樣,會被替換掉
/**
*比如下面的時間設定就會替換掉預設時間
*feign.client.config.default.connectTimeout=3000
*feign.client.config.default.readTimeout=13000
*
*網上說的對單個接口設定逾時時間,下面這個逾時時間是不生效的,從源碼中我們也能看到了
*hystrix.command.FeignAsHttpCient#feignReadTimeout().execution.isolation.thread.timeoutInMilliseconds=13000
*
*也可以看出,要想自定義逾時,最好的方法就是給Request定制Options
*
*/
if (delegate.connectTimeoutMillis() != options.connectTimeoutMillis()
|| delegate.readTimeoutMillis() != options.readTimeoutMillis()) {
requestScoped = delegate.newBuilder()
.connectTimeout(options.connectTimeoutMillis(), TimeUnit.MILLISECONDS)
.readTimeout(options.readTimeoutMillis(), TimeUnit.MILLISECONDS)
.followRedirects(options.isFollowRedirects())
.build();
} else {
requestScoped = delegate;
}
Request request = toOkHttpRequest(input);
Response response = requestScoped.newCall(request).execute();
return toFeignResponse(response, input).toBuilder().request(input).build();
}
到這裡,我們就講完了指定url的FeignClient請求流程,相信你對逾時和重試也有了一定的認識。
上一節的UML類圖我們可以看出,無論是否指定url,最終都是要從SynchronousMethodHandler類的executeAndDecode方法調用HttpClient。不指定url的情況下,使用的client是LoadBalancerFeignClient。我們看一下他的execute方法:
public Response execute(Request request, Request.Options options) throws IOException {
try {
//asUri="http://springboot-mybatis/feign/feignReadTimeout"
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();//springboot-mybatis
URI uriWithoutHost = cleanUrl(request.url(), clientName);//http:///feign/feignReadTimeout
//下面封裝了OkHttpClient,預設連接配接逾時是2s,讀逾時是10s
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
//這裡如果配置了feign相關的配置,就是我們配置的,否則就是預設的DEFAULT_OPTIONS
IClientConfig requestConfig = getClientConfig(options, clientName);
return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
上面的executeWithLoadBalancer調用了AbstractLoadBalancerAwareClient的executeWithLoadBalancer方法,代碼如下:
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
//這裡的finalUri是http://192.168.0.118:8083/feign/feignReadTimeout
URI finalUri = reconstructURIWithServer(server, request.getUri());//這個就是一個拼接url的方法,不細講了
//下面的requestForServer是FeignLoadBalancer,看上面的UML類圖,是AbstractLoadBalancerAwareClient的子類
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
上面的execute方法執行的是FeignLoadBalancer裡面的execute方法,代碼如下:
public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
throws IOException {
Request.Options options;
if (configOverride != null) {
/**
* 下面就是我們配置的逾時時間,在這裡被替換到了Request的Options中,XXX是default或者是服務名
* feign.client.config.XXX.connectTimeout=3000
* feign.client.config.XXX.readTimeout=7000
*/
RibbonProperties override = RibbonProperties.from(configOverride);
options = new Request.Options(
override.connectTimeout(this.connectTimeout),
override.readTimeout(this.readTimeout));
}
else {
options = new Request.Options(this.connectTimeout, this.readTimeout);
}
//這個request裡面的client就是OkHttpClient
Response response = request.client().execute(request.toRequest(), options);
return new RibbonResponse(request.getUri(), response);
}
後面的邏輯就是feign.okhttp.OkHttpClient的execute方法了,跟上節介紹的一樣,這裡不再贅述了。
可以看出,不指定url的情況,會使用ribbon做負載均衡,并對feign的Request和Response進行了一層封裝,封裝類是RibbonRequest和RibbonResponse。
順帶講一下ribbon的負載吧。上面的講解中提到了AbstractLoadBalancerAwareClient的executeWithLoadBalancer方法,我們再貼一次代碼:
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
//這裡的finalUri是http://192.168.0.118:8083/feign/feignReadTimeout
URI finalUri = reconstructURIWithServer(server, request.getUri());//這個就是一個拼接url的方法,不細講了
//下面的requestForServer是FeignLoadBalancer,看上面的UML類圖,是AbstractLoadBalancerAwareClient的子類
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
我們看一下上面的command.submit方法,這個方法調用了LoadBalancerCommand的submit方法,代碼如下:
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
/**
* 下面的配置就是目前server請求失敗後再重試一次,如果還失敗,就請求下一個server,如過還了3個server都失敗,就傳回錯誤了
*# 對目前server的重試次數,預設是0
*ribbon.maxAutoRetries=1
*# 切換執行個體的重試次數,預設是0
*ribbon.maxAutoRetriesNextServer=3
*# 對所有操作請求都進行重試,這裡建議不要設定成true,否則會對所有操作請求都進行重試
*ribbon.okToRetryOnAllOperations=true
*# 根據Http響應碼進行重試
*ribbon.retryableStatusCodes=500,404,502
**/
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
//省略部分代碼
});
//如果沒有擷取到,那就重試
if (maxRetrysNext > 0 && server == null)
o = o.retry(retryPolicy(maxRetrysNext, false));
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
//省略部分代碼
});
}
我們看一下selectServer方法:
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
繼續跟蹤,我們來看getServerFromLoadBalancer方法:
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
String host = null;
int port = -1;
if (original != null) {
host = original.getHost();
}
if (original != null) {
Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);
port = schemeAndPort.second();
}
// Various Supported Cases
// The loadbalancer to use and the instances it has is based on how it was registered
// In each of these cases, the client might come in using Full Url or Partial URL
ILoadBalancer lb = getLoadBalancer();
if (host == null) {
// Partial URI or no URI Case
// well we have to just get the right instances from lb - or we fall back
if (lb != null){
Server svc = lb.chooseServer(loadBalancerKey);
//省略代碼
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
return svc;
} else {//省略代碼
}
} else {//省略代碼
}
// end of creating final URL
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to");
}
// just verify that at this point we have a full URL
return new Server(host, port);
}
簡單看一下上面這個ILoadBalancer,這裡是一個ZoneAwareLoadBalancer,裡面儲存的服務的server清單和狀态:
lb = {ZoneAwareLoadBalancer@14492} "DynamicServerListLoadBalancer:{NFLoadBalancer:name=springboot-mybatis,current list of Servers=[192.168.0.118:8083],Load balancer stats=Zone stats: ]}ServerList:org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList@43db1f5d"
balancers = {ConcurrentHashMap@17695} size = 1
"defaultzone" -> {BaseLoadBalancer@17904} "{NFLoadBalancer:name=springboot-mybatis_defaultzone,current list of Servers=[192.168.0.118:8083],Load balancer stats=Zone stats: {]\n]}"
通過這個負載均衡器,feign就可以擷取到一個server位址,然後把請求發送出去。
openfeign作為eureka用戶端和普通http用戶端,有所不同。作為eureka用戶端時,不用指定url,使用ribbon封裝了請求和響應,并且通過ribbon作為負載均衡。
openfeign作為eureka用戶端和普通http用戶端,都是可以重試的。因為都是通過SynchronousMethodHandler這個類invoke來觸發的,失敗了都會捕獲RetryableException。但是要知道,預設配置是不支援重試的。
openfeign作為eureka用戶端和普通http用戶端,對單個接口設定逾時時間,都是不生效的,實際上還是使用了預設的逾時時間。
上期文章: