Netflix的Feign作為Http Client的話,我們就可以做到像Dubbo的聲明式服務一樣,服務的調用者直接調用接口方法調用遠端服務,而不需要通過正常的Http Client構造請求再解析傳回資料。
目錄
1. 背景
2.該如何入手?
2.1 什麼是Feign
3. spring-cloud-openfeign源碼
3.1 相關配置
3.2 EnableFeignClients和FeignAutoConfiguration
3.2 FeignClientFactoryBean
4. feign-ribbon及feign-hystrix源碼
4.1 相關配置
4.3 Feign及HystrixFeign
4.4 SynchronousMethodHandler
4.5 與Hystrix的內建
4.6 與ribbon的內建
4.7 與eureka的內建
5. 功能上(異步支援)
6. 快速開始(QuickStart)
6.1 第一步,自定義contract
6.2 第二步,執行個體化contract及調用
7. 尚需完善(不足)
1. 背景
為啥要寫這個呢?
最近一直在關注springcloud,也陸續有一些項目引入了這個技術棧,本項目将對大家最為熟悉的元件feign進行拓展,主要是異步上的支援,畢竟百度搜這個,網上的解決方案比較單一。
2.該如何入手?
考慮到公司中很多系統都是老系統(基于springmvc3.2.x,非springboot項目),不能直接接入spring-cloud-starter-feign。需要先了解spring-cloud-starter-feign的源碼,然後再了解feign-hystrix源碼。
2.1 什麼是Feign
spring-cloud-openfeign 在 Github 描述了其特性:
Declarative REST Client: Feign creates a dynamic implementation of an interface decorated with JAX-RS or Spring MVC annotations
Feign 支援兩種不同的注解(feign的注解和springmvc的注解)來描述接口,簡化了 Java HTTP Client 的調用過程,隐藏了實作細節。
用法
Feign 的精華是一種設計思想,它設計了一種全新的HTTP調用方式,屏蔽了具體的調用細節,與Spring MVC 注解的結合更是極大提高了效率(沒有重複造輪子,又設計一套新注解。Hystrix 支援 fallback(降級)的概念,在熔斷器打開或發生異常時可以執行預設的代碼。如果要對某個
@FeignClient
啟用 fallback,隻需要設定 fallback 屬性即可。
@FeignClient(name = "USER", fallbackFactory = UserServiceFallback.class)
public interface UserService {
@GetMapping("/users/{id}")
User getUser(@PathVariable("id") String id);
}
注:如果你是spring-boot項目這樣就可以了,非常簡單。
3. spring-cloud-openfeign源碼
針對spring-boot項目,本節依賴的版本為:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-feign</artifactId>
<version>1.4.4.RELEASE</version>
</dependency>
3.1 相關配置
主要有:FeignClientProperties、FeignClientEncodingProperties、FeignHttpClientProperties,更多
feign.client.config.defalut.connectTimeout=5000
#局部配置
feign.client.config.user.connectTimeout=5000
feign.hystrix.enabled=true
3.2 EnableFeignClients和FeignAutoConfiguration
第一步:隐式模式(使用者不需要做什麼,但你要知道),spirng boot會自動加載Feign的配置類FeignAutoConfiguration(spring-cloud-netflix-core-1.4.4.RELEASE.jar/META-INF/spring.factories),為Feign提供運作所需要的環境(各種相關對象)
第二步:應用系統啟動類中添加@EnableFeignClients,它的作用是自動掃描注冊标記為 @FeignClient 的使用者定義的接口,動态建立實作類(準确的應該叫代理類)并注入到Ioc容器中。
在調用接口時,會根據接口上的注解資訊來建立RequestTemplate,結合實際調用時的參數來建立Request,最後完成調用。
@Import({FeignClientsRegistrar.class})
public @interface EnableFeignClients {
String[] value() default {};
String[] basePackages() default {};
Class<?>[] basePackageClasses() default {};
Class<?>[] defaultConfiguration() default {};
Class<?>[] clients() default {};
}
//用于處理 FeignClient 的全局配置和被 @FeignClient 标記的接口
class FeignClientsRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, BeanClassLoaderAware {
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
// 處理預設配置類(EnableFeignClients.defaultConfiguration屬性)
this.registerDefaultConfiguration(metadata, registry);
// 注冊被 @FeignClient 标記的接口
this.registerFeignClients(metadata, registry);
}
private void registerDefaultConfiguration(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
Map<String, Object> defaultAttrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName(), true);
if (defaultAttrs != null && defaultAttrs.containsKey("defaultConfiguration")) {
String name;
if (metadata.hasEnclosingClass()) {
name = "default." + metadata.getEnclosingClassName();
} else {
name = "default." + metadata.getClassName();
}
this.registerClientConfiguration(registry, name, defaultAttrs.get("defaultConfiguration"));
}
}
public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
// classpath scan工具
ClassPathScanningCandidateComponentProvider scanner = this.getScanner();
scanner.setResourceLoader(this.resourceLoader);
Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName());
// 利用FeignClient作為過濾條件
AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter(FeignClient.class);
Class<?>[] clients = attrs == null ? null : (Class[])((Class[])attrs.get("clients"));
...
// 注冊
this.registerFeignClient(registry, annotationMetadata, attributes);
}
private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
String className = annotationMetadata.getClassName();
// 拿到FeignClientFactoryBean的BeanDefinitionBuilder
BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(FeignClientFactoryBean.class);
this.validate(attributes);
definition.addPropertyValue("url", this.getUrl(attributes));
definition.addPropertyValue("path", this.getPath(attributes));
String name = this.getName(attributes);
definition.addPropertyValue("name", name);
definition.addPropertyValue("type", className);
definition.addPropertyValue("decode404", attributes.get("decode404"));
definition.addPropertyValue("fallback", attributes.get("fallback"));
definition.setAutowireMode(2);
String alias = name + "FeignClient";
AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
beanDefinition.setPrimary(true);
String qualifier = this.getQualifier(attributes);
if (StringUtils.hasText(qualifier)) {
alias = qualifier;
}
BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, new String[]{alias});
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}
}
//自動配置類
@Configuration
@ConditionalOnClass(Feign.class)
@EnableConfigurationProperties({FeignClientProperties.class, FeignHttpClientProperties.class})
public class FeignAutoConfiguration {
@Autowired(required = false)
private List<FeignClientSpecification> configurations = new ArrayList<>();
@Bean
public HasFeatures feignFeature() {
return HasFeatures.namedFeature("Feign", Feign.class);
}
@Bean
public FeignContext feignContext() {
//加載FeignClientsConfiguration配置類
FeignContext context = new FeignContext();
context.setConfigurations(this.configurations);
return context;
}
@Configuration
@ConditionalOnClass(name = "feign.hystrix.HystrixFeign")
protected static class HystrixFeignTargeterConfiguration {
@Bean
@ConditionalOnMissingBean
public Targeter feignTargeter() {
return new HystrixTargeter();
}
}
@Configuration
@ConditionalOnMissingClass("feign.hystrix.HystrixFeign")
protected static class DefaultFeignTargeterConfiguration {
@Bean
@ConditionalOnMissingBean
public Targeter feignTargeter() {
return new DefaultTargeter();
}
}
// the following configuration is for alternate feign clients if
// ribbon is not on the class path.
// see corresponding configurations in FeignRibbonClientAutoConfiguration
// for load balanced ribbon clients.
@Configuration
@ConditionalOnClass(ApacheHttpClient.class)
@ConditionalOnMissingClass("com.netflix.loadbalancer.ILoadBalancer")
@ConditionalOnMissingBean(CloseableHttpClient.class)
@ConditionalOnProperty(value = "feign.httpclient.enabled", matchIfMissing = true)
protected static class HttpClientFeignConfiguration {
private final Timer connectionManagerTimer = new Timer(
"FeignApacheHttpClientConfiguration.connectionManagerTimer", true);
@Autowired(required = false)
private RegistryBuilder registryBuilder;
private CloseableHttpClient httpClient;
@Bean
@ConditionalOnMissingBean(HttpClientConnectionManager.class)
public HttpClientConnectionManager connectionManager(
ApacheHttpClientConnectionManagerFactory connectionManagerFactory,
FeignHttpClientProperties httpClientProperties) {
final HttpClientConnectionManager connectionManager = connectionManagerFactory
.newConnectionManager(httpClientProperties.isDisableSslValidation(), httpClientProperties.getMaxConnections(),
httpClientProperties.getMaxConnectionsPerRoute(),
httpClientProperties.getTimeToLive(),
httpClientProperties.getTimeToLiveUnit(), registryBuilder);
this.connectionManagerTimer.schedule(new TimerTask() {
@Override
public void run() {
connectionManager.closeExpiredConnections();
}
}, 30000, httpClientProperties.getConnectionTimerRepeat());
return connectionManager;
}
@Bean
public CloseableHttpClient httpClient(ApacheHttpClientFactory httpClientFactory,
HttpClientConnectionManager httpClientConnectionManager,
FeignHttpClientProperties httpClientProperties) {
RequestConfig defaultRequestConfig = RequestConfig.custom()
.setConnectTimeout(httpClientProperties.getConnectionTimeout())
.setRedirectsEnabled(httpClientProperties.isFollowRedirects())
.build();
this.httpClient = httpClientFactory.createBuilder().
setConnectionManager(httpClientConnectionManager).
setDefaultRequestConfig(defaultRequestConfig).build();
return this.httpClient;
}
@Bean
@ConditionalOnMissingBean(Client.class)
public Client feignClient(HttpClient httpClient) {
return new ApacheHttpClient(httpClient);
}
@PreDestroy
public void destroy() throws Exception {
connectionManagerTimer.cancel();
if(httpClient != null) {
httpClient.close();
}
}
}
@Configuration
@ConditionalOnClass(OkHttpClient.class)
@ConditionalOnMissingClass("com.netflix.loadbalancer.ILoadBalancer")
@ConditionalOnMissingBean(okhttp3.OkHttpClient.class)
@ConditionalOnProperty(value = "feign.okhttp.enabled")
protected static class OkHttpFeignConfiguration {
private okhttp3.OkHttpClient okHttpClient;
@Bean
@ConditionalOnMissingBean(ConnectionPool.class)
public ConnectionPool httpClientConnectionPool(FeignHttpClientProperties httpClientProperties,
OkHttpClientConnectionPoolFactory connectionPoolFactory) {
Integer maxTotalConnections = httpClientProperties.getMaxConnections();
Long timeToLive = httpClientProperties.getTimeToLive();
TimeUnit ttlUnit = httpClientProperties.getTimeToLiveUnit();
return connectionPoolFactory.create(maxTotalConnections, timeToLive, ttlUnit);
}
@Bean
public okhttp3.OkHttpClient client(OkHttpClientFactory httpClientFactory,
ConnectionPool connectionPool, FeignHttpClientProperties httpClientProperties) {
Boolean followRedirects = httpClientProperties.isFollowRedirects();
Integer connectTimeout = httpClientProperties.getConnectionTimeout();
Boolean disableSslValidation = httpClientProperties.isDisableSslValidation();
this.okHttpClient = httpClientFactory.createBuilder(disableSslValidation).
connectTimeout(connectTimeout, TimeUnit.MILLISECONDS).
followRedirects(followRedirects).
connectionPool(connectionPool).build();
return this.okHttpClient;
}
@PreDestroy
public void destroy() {
if(okHttpClient != null) {
okHttpClient.dispatcher().executorService().shutdown();
okHttpClient.connectionPool().evictAll();
}
}
@Bean
@ConditionalOnMissingBean(Client.class)
public Client feignClient() {
return new OkHttpClient(this.okHttpClient);
}
}
}
@Configuration
public class FeignClientsConfiguration {
@Bean
@ConditionalOnMissingBean
public Decoder feignDecoder() {
return new ResponseEntityDecoder(new SpringDecoder(this.messageConverters));
}
@Bean
@ConditionalOnMissingBean
public Encoder feignEncoder() {
return new SpringEncoder(this.messageConverters);
}
@Bean
@ConditionalOnMissingBean
public Contract feignContract(ConversionService feignConversionService) {
return new SpringMvcContract(this.parameterProcessors, feignConversionService);
}
@Configuration
@ConditionalOnClass({ HystrixCommand.class, HystrixFeign.class })
protected static class HystrixFeignConfiguration {
@Bean
@Scope("prototype")
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "feign.hystrix.enabled", matchIfMissing = false)
public Feign.Builder feignHystrixBuilder() {
return HystrixFeign.builder();
}
}
@Bean
@ConditionalOnMissingBean
public Retryer feignRetryer() {
return Retryer.NEVER_RETRY;
}
@Bean
@Scope("prototype")
@ConditionalOnMissingBean
public Feign.Builder feignBuilder(Retryer retryer) {
return Feign.builder().retryer(retryer);
}
@Bean
@ConditionalOnMissingBean(FeignLoggerFactory.class)
public FeignLoggerFactory feignLoggerFactory() {
return new DefaultFeignLoggerFactory(logger);
}
}
3.2 FeignClientFactoryBean
FeignClientFactoryBean是核心,基于每個FeignClient實作了用戶端Contract,而feign.target方法就是執行個體化用戶端Contract。這裡介紹幾組關鍵的類:
Targeter提供了對target接口(Feign.Builder.target的封裝)
- DefaultTargeter調用的feign.target(未做任何處理)
- HystrixTargeter調用的HystrixFeign.Builder.target(內建了Hystrix)
Client接口提供了execute
- Client.Default是對Client的實作(基于jdk的get/post)
- HttpClientFeignConfiguration.feignClient()是封裝了LoadBalancerFeignClient和apache HttpClient
- OkHttpFeignConfiguration.feignClient()是封裝了LoadBalancerFeignClient和okhttp
- LoadBalancerFeignClient提供了負載均衡,它是在FeignRibbonClientAutoConfiguration中通過@Import
其實讀spring cloud-feign源碼的技巧就是深入研究FeignClientFactoryBean的依賴,基本上花些時間都可以看懂!
class FeignClientFactoryBean implements FactoryBean<Object>, InitializingBean, ApplicationContextAware {
//getObject() 傳回的是一個SynchronousMethodHandler對象
public Object getObject() throws Exception {
FeignContext context = (FeignContext)this.applicationContext.getBean(FeignContext.class);
Builder builder = this.feign(context);
String url;
// 如果FeignClient沒有指定URL(配置的是service)
if (!StringUtils.hasText(this.url)) {
if (!this.name.startsWith("http")) {
url = "http://" + this.name;
} else {
url = this.name;
}
url = url + this.cleanPath();
// 結合ribbon使得用戶端具備負載均衡的能力,預設擷取的是LoadBalancerFeignClient
return this.loadBalance(builder, context, new HardCodedTarget(this.type, this.name, url));
} else {
if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
this.url = "http://" + this.url;
}
url = this.url + this.cleanPath();
Client client = (Client)this.getOptional(context, Client.class);
if (client != null) {
if (client instanceof LoadBalancerFeignClient) {
client = ((LoadBalancerFeignClient)client).getDelegate();
}
builder.client(client);
}
return this.targeter.target(this, builder, context, new HardCodedTarget(this.type, this.name, url));
}
}
protected <T> T loadBalance(Builder builder, FeignContext context, HardCodedTarget<T> target) {
//得到的是 LoadBalancerFeignClient
Client client = (Client)this.getOptional(context, Client.class);
if (client != null) {
builder.client(client);
// HystrixTargeter
return this.targeter.target(this, builder, context, target);
} else {
throw new IllegalStateException("No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-ribbon?");
}
}
}
//Targeter其實就是Feign.Builder的包裝
class HystrixTargeter implements Targeter {
public <T> T target(FeignClientFactoryBean factory, Builder feign, FeignContext context, HardCodedTarget<T> target) {
if (factory.getFallback() != Void.TYPE && feign instanceof feign.hystrix.HystrixFeign.Builder) {
Object fallbackInstance = context.getInstance(factory.getName(), factory.getFallback());
if (fallbackInstance == null) {
throw new IllegalStateException(String.format("No fallback instance of type %s found for feign client %s", factory.getFallback(), factory.getName()));
} else if (!target.type().isAssignableFrom(factory.getFallback())) {
throw new IllegalStateException(String.format("Incompatible fallback instance. Fallback of type %s is not assignable to %s for feign client %s", factory.getFallback(), target.type(), factory.getName()));
} else {
feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder)feign;
return builder.target(target, fallbackInstance);
}
} else {
return feign.target(target);
}
}
}
4. feign-ribbon及feign-hystrix源碼
針對非spring-boot項目,本節依賴的版本為:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-feign</artifactId>
<version>1.4.4.RELEASE</version>
</dependency>
<dependency>
<groupId>com.netflix.eureka</groupId>
<artifactId>eureka-client</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon-eureka</artifactId>
<version>2.2.2</version>
<exclusions>
<exclusion>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-ribbon</artifactId>
<version>9.5.1</version>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-hystrix</artifactId>
<version>9.5.1</version>
</dependency>
4.1 相關配置
項目的resource目錄下定義eureka-client.properties(在eureka用戶端執行個體化時通過PropertiesInstanceConfig會讀取)
//初始化Eureka Client
private synchronized void initEurekaClient() {
DockerInstanceConfig instanceConfig = new DockerInstanceConfig();
if (this.applicationInfoManager == null) {
InstanceInfo instanceInfo = (new EurekaConfigBasedInstanceInfoProvider(instanceConfig)).get();
this.applicationInfoManager = new ApplicationInfoManager(instanceConfig, instanceInfo);
}
this.applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.STARTING);
if (this.eurekaClient == null) {
this.eurekaClient = new DiscoveryClient(applicationInfoManager, new DefaultEurekaClientConfig());
}
DiscoveryManager.getInstance().setDiscoveryClient((DiscoveryClient) this.eurekaClient);
this.applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.UP);
}
//手動建立用戶端contract
private T createObjectNative() {
return HystrixFeign.builder()
.client(RibbonClient.builder().delegate(new DefaultSupport()).build())
.encoder(new JacksonEncoder(JacksonInstanceManager.getInstance()))
.decoder(new JacksonDecoder(JacksonInstanceManager.getInstance()))
.requestInterceptor(request -> {
.options(new Request.Options(60000, 60000))
.retryer(Retryer.NEVER_RETRY)
.target(contract, "http://" + serverName);
}
eureka.serviceUrl.default=http://xxxx.cn/eureka eureka.region=default eureka.name=kxtx-test eureka.vipAddress= eureka.port=8081 eureka.preferSameZone=true eureka.shouldUseDns=false eureka.us-east-1.availabilityZones=default hystrix.command.default.execution.timeout.enabled=false kxtx-gps.ribbon.NFLoadBalancerClassName=com.netflix.loadbalancer.DynamicServerListLoadBalancer kxtx-gps.ribbon.NIWSServerListClassName=com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList kxtx-gps.ribbon.DeploymentContextBasedVipAddresses=kxtx-gps
4. 2 主要類及其層次結構
4.3 Feign及HystrixFeign
Feign建立FeignInvocationHandler,HystrixFeign會建立HystrixInvocationHandler,重點要關注ReflectiveFeign。
//用戶端contract
public abstract class Feign {
public static class Builder {
//調用target,此時用戶端Contract已完成建構
public <T> T target(Class<T> apiType, String url) {
return this.target(new HardCodedTarget(apiType, url));
}
public <T> T target(Target<T> target) {
return this.build().newInstance(target);
}
//HystrixFeign.Builder執行邏輯
public Feign build() {
Factory synchronousMethodHandlerFactory = new Factory(this.client, this.retryer, this.requestInterceptors, this.logger, this.logLevel, this.decode404);
ParseHandlersByName handlersByName = new ParseHandlersByName(this.contract, this.options, this.encoder, this.decoder, this.errorDecoder, synchronousMethodHandlerFactory);
return new ReflectiveFeign(handlersByName, this.invocationHandlerFactory);
}
}
}
//內建了Hystrix的用戶端contract
public final class HystrixFeign {
//Feign的入口很關鍵
public static HystrixFeign.Builder builder() {
return new HystrixFeign.Builder();
}
public static final class Builder extends feign.Feign.Builder {
//最終build執行邏輯
Feign build(final FallbackFactory<?> nullableFallbackFactory) {
//與Hystrix的內建,下面專門講
super.invocationHandlerFactory(new InvocationHandlerFactory() {
public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
return new HystrixInvocationHandler(target, dispatch, Builder.this.setterFactory, nullableFallbackFactory);
}
});//調用父類
super.contract(new HystrixDelegatingContract(this.contract));
return super.build();//調用父類
}
}
}
//用戶端contract的真正實作類
public class ReflectiveFeign extends Feign {
private final InvocationHandlerFactory factory;//很重要的(下面會講)
//通過Feign.Builder.target完成建構
public <T> T newInstance(Target<T> target) {
//基于contract建立一系列SynchronousMethodHandler
Map<String, MethodHandler> nameToHandler = this.targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList();
Method[] var5 = target.type().getMethods();
int var6 = var5.length;
//存至dispatch
for(int var7 = 0; var7 < var6; ++var7) {
Method method = var5[var7];
if (method.getDeclaringClass() != Object.class) {
if (Util.isDefault(method)) {
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
//從nameToHandler.get擷取SynchronousMethodHandler
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
}
//建立HystrixInvocationHandler(下面會講)
InvocationHandler handler = this.factory.create(target, methodToHandler);
//再次生成代理類
T proxy = Proxy.newProxyInstance(target.type().getClassLoader(), new Class[]{target.type()}, handler);
Iterator var12 = defaultMethodHandlers.iterator();
while(var12.hasNext()) {
DefaultMethodHandler defaultMethodHandler = (DefaultMethodHandler)var12.next();
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}
}
//SynchronousMethodHandler的上級代理類(包裹了SynchronousMethodHandler)
public interface InvocationHandlerFactory {
InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch);
/**
* Like {@link InvocationHandler#invoke(Object, java.lang.reflect.Method, Object[])}, except for a
* single method.
*/
interface MethodHandler {
Object invoke(Object[] argv) throws Throwable;
}
//ReflectiveFeign.factory預設生成的代理是FeignInvocationHandler
static final class Default implements InvocationHandlerFactory {
@Override
public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
return new ReflectiveFeign.FeignInvocationHandler(target, dispatch);
}
}
}
4.4 SynchronousMethodHandler
SynchronousMethodHandle也是一個代理類,最底層的(最終執行)。
//真正執行http請求的類
final class SynchronousMethodHandler implements MethodHandler {
@Override
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
// 執行請求
return executeAndDecode(template);
} catch (RetryableException e) {
retryer.continueOrPropagate(e);
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
//通過ParseHandlersByName.apply生成該類
static class Factory {
public MethodHandler create(Target<?> target, MethodMetadata md, feign.RequestTemplate.Factory buildTemplateFromArgs, Options options, Decoder decoder, ErrorDecoder errorDecoder) {
return new SynchronousMethodHandler(target, this.client, this.retryer, this.requestInterceptors, this.logger, this.logLevel, md, buildTemplateFromArgs, options, decoder, errorDecoder, this.decode404);
}
}
}
4.5 與Hystrix的內建
其實ReflectiveFeign.factory就是HystrixInvocationHandler,在HystrixFeign.Builder.build中被建構,HystrixInvocationHandler其實就是将使用者的任務(SynchronousMethodHandler)嵌入HystrixCommand中。
public class ReflectiveFeign extends Feign {
//其實是HystrixInvocationHandler
private final InvocationHandlerFactory factory;
}
//ReflectiveFeign.factory的另一個代理類的實作
final class HystrixInvocationHandler implements InvocationHandler {
//源類
private final Target<?> target;
//methodToHandler變量,其實就是method和SynchronousMethodHandler映射關系
private final Map<Method, MethodHandler> dispatch;
//執行調用
public Object invoke(Object proxy, final Method method, final Object[] args) throws Throwable {
if (!"equals".equals(method.getName())) {
if ("hashCode".equals(method.getName())) {
return this.hashCode();
} else if ("toString".equals(method.getName())) {
return this.toString();
} else {
//HystrixCommand就是Hystrix-core裡面的,後續文章會講
HystrixCommand<Object> hystrixCommand = new HystrixCommand<Object>((Setter)this.setterMethodMap.get(method)) {
protected Object run() throws Exception {
try {
return ((MethodHandler)HystrixInvocationHandler.this.dispatch.get(method)).invoke(args);
} catch (Exception var2) {
throw var2;
} catch (Throwable var3) {
throw (Error)var3;
}
}
protected Object getFallback() {
if (HystrixInvocationHandler.this.fallbackFactory == null) {
return super.getFallback();
} else {
try {
Object fallback = HystrixInvocationHandler.this.fallbackFactory.create(this.getExecutionException());
Object result = ((Method)HystrixInvocationHandler.this.fallbackMethodMap.get(method)).invoke(fallback, args);
if (HystrixInvocationHandler.this.isReturnsHystrixCommand(method)) {
return ((HystrixCommand)result).execute();
} else if (HystrixInvocationHandler.this.isReturnsObservable(method)) {
return ((Observable)result).toBlocking().first();
} else if (HystrixInvocationHandler.this.isReturnsSingle(method)) {
return ((Single)result).toObservable().toBlocking().first();
} else if (HystrixInvocationHandler.this.isReturnsCompletable(method)) {
((Completable)result).await();
return null;
} else {
return result;
}
} catch (IllegalAccessException var3) {
throw new AssertionError(var3);
} catch (InvocationTargetException var4) {
throw new AssertionError(var4.getCause());
}
}
}
};
if (this.isReturnsHystrixCommand(method)) {
return hystrixCommand;
} else if (this.isReturnsObservable(method)) {
return hystrixCommand.toObservable();
} else if (this.isReturnsSingle(method)) {
return hystrixCommand.toObservable().toSingle();
} else {
return this.isReturnsCompletable(method) ? hystrixCommand.toObservable().toCompletable() : hystrixCommand.execute();
}
}
} else {
try {
Object otherHandler = args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return this.equals(otherHandler);
} catch (IllegalArgumentException var5) {
return false;
}
}
}
}
4.6 與ribbon的內建
這時你可能會問Feign是怎樣實作負載均衡的呢?
這個不難了解,肯定是內建ribbon實作的,其實建構用戶端Contract時就有這樣的API:HystrixFeign.Builder.client(Client client) ,而client就實作了負載均衡。client的建立是通過RibbonClient.builder().delegate(new Client.Default(null, null)).build()完成的。
public interface Client {
Response execute(Request request, Options options) throws IOException;
//真正執行get/post的Client
public static class Default implements Client {
@Override
public Response execute(Request request, Options options) throws IOException {
HttpURLConnection connection = convertAndSend(request, options);
return convertResponse(connection).toBuilder().request(request).build();
}
HttpURLConnection convertAndSend(Request request, Options options) throws IOException {
final HttpURLConnection
connection =
(HttpURLConnection) new URL(request.url()).openConnection();
if (connection instanceof HttpsURLConnection) {
HttpsURLConnection sslCon = (HttpsURLConnection) connection;
if (sslContextFactory != null) {
sslCon.setSSLSocketFactory(sslContextFactory);
}
if (hostnameVerifier != null) {
sslCon.setHostnameVerifier(hostnameVerifier);
}
}
connection.setConnectTimeout(options.connectTimeoutMillis());
connection.setReadTimeout(options.readTimeoutMillis());
connection.setAllowUserInteraction(false);
connection.setInstanceFollowRedirects(true);
connection.setRequestMethod(request.method());
Collection<String> contentEncodingValues = request.headers().get(CONTENT_ENCODING);
boolean
gzipEncodedRequest =
contentEncodingValues != null && contentEncodingValues.contains(ENCODING_GZIP);
boolean
deflateEncodedRequest =
contentEncodingValues != null && contentEncodingValues.contains(ENCODING_DEFLATE);
boolean hasAcceptHeader = false;
Integer contentLength = null;
for (String field : request.headers().keySet()) {
if (field.equalsIgnoreCase("Accept")) {
hasAcceptHeader = true;
}
for (String value : request.headers().get(field)) {
if (field.equals(CONTENT_LENGTH)) {
if (!gzipEncodedRequest && !deflateEncodedRequest) {
contentLength = Integer.valueOf(value);
connection.addRequestProperty(field, value);
}
} else {
connection.addRequestProperty(field, value);
}
}
}
// Some servers choke on the default accept string.
if (!hasAcceptHeader) {
connection.addRequestProperty("Accept", "*/*");
}
if (request.body() != null) {
if (contentLength != null) {
connection.setFixedLengthStreamingMode(contentLength);
} else {
connection.setChunkedStreamingMode(8196);
}
connection.setDoOutput(true);
OutputStream out = connection.getOutputStream();
if (gzipEncodedRequest) {
out = new GZIPOutputStream(out);
} else if (deflateEncodedRequest) {
out = new DeflaterOutputStream(out);
}
try {
out.write(request.body());
} finally {
try {
out.close();
} catch (IOException suppressed) { // NOPMD
}
}
}
return connection;
}
}
//具備負載均衡的Client
public class RibbonClient implements Client {
//delegate就是Client.Default
private final Client delegate;
private final LBClientFactory lbClientFactory;
//具備負載均衡的get/post
public Response execute(Request request, Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
RibbonRequest ribbonRequest = new RibbonRequest(this.delegate, request, uriWithoutHost);
//執行請求AbstractLoadBalancerAwareClient.executeWithLoadBalancer
return ((RibbonResponse)this.lbClient(clientName).executeWithLoadBalancer(ribbonRequest, new RibbonClient.FeignOptionsClientConfig(options))).toResponse();
} catch (ClientException var7) {
propagateFirstIOException(var7);
throw new RuntimeException(var7);
}
}
//RibbonClient的構造者模式
public static final class Builder {
//指定Client
public RibbonClient.Builder delegate(Client delegate) {
this.delegate = delegate;
return this;
}
public RibbonClient build() {
//指定了負載均衡模式:LBClientFactory.Default
return new RibbonClient((Client)(this.delegate != null ? this.delegate : new Default((SSLSocketFactory)null, (HostnameVerifier)null)), (LBClientFactory)(this.lbClientFactory != null ? this.lbClientFactory : new feign.ribbon.LBClientFactory.Default()));
}
}
}
//負載均衡用戶端
public interface LBClientFactory {
public static final class Default implements LBClientFactory {
public Default() {
}
//建立(懶加載設計:基于服務名USER)
public LBClient create(String clientName) {
//加載檔案eureka-client.properties中USER的配置
IClientConfig config = ClientFactory.getNamedConfig(clientName, LBClientFactory.DisableAutoRetriesByDefaultClientConfig.class);
//這牽連到與eureka的互動是非常最複雜的,後續講
ILoadBalancer lb = ClientFactory.getNamedLoadBalancer(clientName);
return LBClient.create(lb, config);
}
}
}
//真正實作負載均衡
public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T extends IResponse> extends LoadBalancerContext implements IClient<S, T>, IClientConfigAware {
//執行請求
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
RequestSpecificRetryHandler handler = this.getRequestSpecificRetryHandler(request, requestConfig);
LoadBalancerCommand command = LoadBalancerCommand.builder().withLoadBalancerContext(this).withRetryHandler(handler).withLoadBalancerURI(request.getUri()).build();
try {
return (IResponse)command.submit(new ServerOperation<T>() {
public Observable<T> call(Server server) {
//使用eureka中的具體server(ip+port)建構真實的url,不再是微服務名
URI finalUri = AbstractLoadBalancerAwareClient.this.reconstructURIWithServer(server, request.getUri());
ClientRequest requestForServer = request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
} catch (Exception var5) {
return Observable.error(var5);
}
}
}).toBlocking().single();
} catch (Exception var7) {
Throwable t = var7.getCause();
if (t instanceof ClientException) {
throw (ClientException)t;
} else {
throw new ClientException(var7);
}
}
}
}
4.7 與eureka的內建
你也許會問FeignClient.name為服務名(不是url時)它是如何關聯到eureka的呢?
其實在LBClientFactory.create中有這樣一段: ClientFactory.getNamedLoadBalancer(clientName),這個其實就是讀取了eureka-client.properties檔案資訊,并根據服務名加載配置,并連接配接eureka拉取ServerList。
public class ClientFactory {
//如果不存在則建立一個執行個體
public static synchronized ILoadBalancer getNamedLoadBalancer(String name) {
return getNamedLoadBalancer(name, DefaultClientConfigImpl.class);
}
//同上
public static synchronized ILoadBalancer getNamedLoadBalancer(String name, Class<? extends IClientConfig> configClass) {
ILoadBalancer lb = namedLBMap.get(name);
if (lb != null) {
return lb;
} else {
try {
lb = registerNamedLoadBalancerFromProperties(name, configClass);
} catch (ClientException e) {
throw new RuntimeException("Unable to create load balancer", e);
}
return lb;
}
}
//同上
public static ILoadBalancer registerNamedLoadBalancerFromclientConfig(String name, IClientConfig clientConfig) throws ClientException {
if (namedLBMap.get(name) != null) {
throw new ClientException("LoadBalancer for name " + name + " already exists");
}
ILoadBalancer lb = null;
try {
//擷取配置項NFLoadBalancerClassName:DynamicServerListLoadBalancer
String loadBalancerClassName = (String) clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerClassName);
//執行個體化DynamicServerListLoadBalancer
lb = (ILoadBalancer) ClientFactory.instantiateInstanceWithClientConfig(loadBalancerClassName, clientConfig);
namedLBMap.put(name, lb);
logger.info("Client: {} instantiated a LoadBalancer: {}", name, lb);
return lb;
} catch (Throwable e) {
throw new ClientException("Unable to instantiate/associate LoadBalancer with Client:" + name, e);
}
}
}
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
//instantiateInstanceWithClientConfig中執行它
public DynamicServerListLoadBalancer() {
super();
}
//instantiateInstanceWithClientConfig中執行它
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
try {
super.initWithNiwsConfig(clientConfig);
//擷取配置項NIWSServerListClassName:DiscoveryEnabledNIWSServerList
String niwsServerListClassName = clientConfig.getPropertyAsString(
CommonClientConfigKey.NIWSServerListClassName,
DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS);
//執行個體化DiscoveryEnabledNIWSServerList
ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory
.instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig);
this.serverListImpl = niwsServerListImpl;
if (niwsServerListImpl instanceof AbstractServerList) {
AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl)
.getFilterImpl(clientConfig);
niwsFilter.setLoadBalancerStats(getLoadBalancerStats());
this.filter = niwsFilter;
}
String serverListUpdaterClassName = clientConfig.getPropertyAsString(
CommonClientConfigKey.ServerListUpdaterClassName,
DefaultClientConfigImpl.DEFAULT_SERVER_LIST_UPDATER_CLASS
);
this.serverListUpdater = (ServerListUpdater) ClientFactory
.instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig);
restOfInit(clientConfig);
} catch (Exception e) {
throw new RuntimeException(
"Exception while initializing NIWSDiscoveryLoadBalancer:"
+ clientConfig.getClientName()
+ ", niwsClientConfig:" + clientConfig, e);
}
}
}
//eureka服務發現
public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{
//instantiateInstanceWithClientConfig中調用
public DiscoveryEnabledNIWSServerList() {
//eureka用戶端
this.eurekaClientProvider = new LegacyEurekaClientProvider();
}
//instantiateInstanceWithClientConfig中調用
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
clientName = clientConfig.getClientName();
vipAddresses = clientConfig.resolveDeploymentContextbasedVipAddresses();
if (vipAddresses == null &&
ConfigurationManager.getConfigInstance().getBoolean("DiscoveryEnabledNIWSServerList.failFastOnNullVip", true)) {
throw new NullPointerException("VIP address for client " + clientName + " is null");
}
isSecure = Boolean.parseBoolean(""+clientConfig.getProperty(CommonClientConfigKey.IsSecure, "false"));
prioritizeVipAddressBasedServers = Boolean.parseBoolean(""+clientConfig.getProperty(CommonClientConfigKey.PrioritizeVipAddressBasedServers, prioritizeVipAddressBasedServers));
datacenter = ConfigurationManager.getDeploymentContext().getDeploymentDatacenter();
targetRegion = (String) clientConfig.getProperty(CommonClientConfigKey.TargetRegion);
shouldUseIpAddr = clientConfig.getPropertyAsBoolean(CommonClientConfigKey.UseIPAddrForServer, DefaultClientConfigImpl.DEFAULT_USEIPADDRESS_FOR_SERVER);
// override client configuration and use client-defined port
if(clientConfig.getPropertyAsBoolean(CommonClientConfigKey.ForceClientPortConfiguration, false)){
if(isSecure){
if(clientConfig.containsProperty(CommonClientConfigKey.SecurePort)){
overridePort = clientConfig.getPropertyAsInteger(CommonClientConfigKey.SecurePort, DefaultClientConfigImpl.DEFAULT_PORT);
shouldUseOverridePort = true;
}else{
logger.warn(clientName + " set to force client port but no secure port is set, so ignoring");
}
}else{
if(clientConfig.containsProperty(CommonClientConfigKey.Port)){
overridePort = clientConfig.getPropertyAsInteger(CommonClientConfigKey.Port, DefaultClientConfigImpl.DEFAULT_PORT);
shouldUseOverridePort = true;
}else{
logger.warn(clientName + " set to force client port but no port is set, so ignoring");
}
}
}
}
}
5. 功能上(異步支援)
至少應該能傳回Future吧,先做這個吧,後續再弄異步回調支援
6. 快速開始(QuickStart)
6.1 第一步,自定義contract
@RequestLine("POST /kxtx-gps/pay/getStatus")
Future<Map<String, Object>> getPayStatusSupplyAsync(Map<String, List<String>> batchNos);
6.2 第二步,執行個體化contract及調用
List<ContractBean> contractBeans = Lists.newArrayList();
contractBeans.add(new ContractBean("kxtx-gps", PayContract.class));
ContractRegistryCenter registryCenter = new ContractRegistryCenter();
registryCenter.setContracts(contractBeans);
return registryCenter;
//異步
Future<Map<String, Object>> result2 = payContract.getPayStatusSupplyAsync(Maps.newHashMap("orderIds", Lists.newArrayList("09fd82b3fb084438a245d564dc8af965")));
System.out.println("end。");
7. 尚需完善(不足)
上面的方案的确是抛棄了很多Feign很多特效,比如原本知道斷路器方面就不行,那要怎麼彌補呢?
首先我們看看源碼,HystrixFeign》HystrixInvocationHandler.invoke,這其實就是一個指令模式,使用到了rxjava進階庫,那問題來了?
rxjava簡直就是雌雄(同步/異步)同體,如果它有提供對外api(Single、Completable),那是不是就可以放棄上面的的做法,自己去相容實作呢?
答案是肯定的,不過這個要對rxjava足夠的了解。看下面關鍵代碼:
//引用HystrixInvocationHandler的148行
if (isReturnsHystrixCommand(method)) {
return hystrixCommand;
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return hystrixCommand.toObservable();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return hystrixCommand.toObservable().toSingle();
} else if (isReturnsCompletable(method)) {
return hystrixCommand.toObservable().toCompletable();
}
return hystrixCommand.execute();
源碼