最近spring cloud gateway 二次開發時看到文章,裡面處理擷取傳回體内容有用到,轉載過來一起學習
1、ServerListUpdater 服務清單更新
gateway使用ribbon作為服務調用的負載均衡中間件,根據配置的 IRule 對拉取到的服務清單進行負載
而這些真正提供服務的執行個體是有動态上下線的情況存在的,為了保證輪詢到的服務執行個體能正常通路,ribbon中有一個接口
ServerListUpdater 會定期對服務清單進行更新
在使用 Eureka 作為注冊中心的時候,ServerListUpdater有兩個實作類:
- PollingServerListUpdater :定時從注冊中心拉取服務清單,如果沒有配置,預設為30秒
- EurekaNotificationServerListUpdater :注冊中心中的服務有變動時,通知用戶端,EurekaNotificationServerListUpdater是通過添加了一個監聽器,當收到注冊中心的通知後,做出相應的動作
PollingServerListUpdater 也是 預設的 ServerListUpdater 配置
分别看一下它們的代碼實作
PollingServerListUpdater
// 從注冊中心拉取服務清單的間隔
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
// 這裡封裝了一個進行服務拉取的任務
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
//這裡是進行服務清單更新的動作
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
// 使用 scheduled 進行定時拉取
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,//這個就是剛才配置的拉取間隔
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
EurekaNotificationServerListUpdater
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
this.updateListener = new EurekaEventListener() {
// 監聽 Eureka釋出的事件,然後拉取最新的清單
@Override
public void onEvent(EurekaEvent event) {
if (event instanceof CacheRefreshedEvent) {
if (!updateQueued.compareAndSet(false, true)) { // if an update is already queued
logger.info("an update action is already queued, returning as no-op");
return;
}
if (!refreshExecutor.isShutdown()) {
try {
refreshExecutor.submit(new Runnable() {
@Override
public void run() {
try {
//這裡是進行服務清單更新的動作
updateAction.doUpdate();
lastUpdated.set(System.currentTimeMillis());
} catch (Exception e) {
logger.warn("Failed to update serverList", e);
} finally {
updateQueued.set(false);
}
}
}); // fire and forget
} catch (Exception e) {
logger.warn("Error submitting update task to executor, skipping one round of updates", e);
updateQueued.set(false); // if submit fails, need to reset updateQueued to false
}
}
else {
logger.debug("stopping EurekaNotificationServerListUpdater, as refreshExecutor has been shut down");
stop();
}
}
}
};
...以下代碼省略
} else {
logger.info("Update listener already registered, no-op");
}
}
可以看到,這兩個 ServerListUpdater 實作類在更新服務清單的時候,都做了同一個動作updateAction.doUpdate()
進入這個方法,發現它是一個定義在ServerListUpdater 中的一個接口
public interface ServerListUpdater {
/**
* an interface for the updateAction that actually executes a server list update
*/
public interface UpdateAction {
void doUpdate();
}
...以下代碼省略
}
而它的方法實作是在 負載均衡器 DynamicServerListLoadBalancer 中定義的
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);
......代碼省略......
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
// UpdateAction的方法實作,調用了另一個方法
updateListOfServers();
}
};
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
//真正進行清單更新的地方
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
......代碼省略......
}
可以看到,這個 serverListImpl 就是在負載均衡器的ServerList屬性,使用這個接口的getUpdatedListOfServers方法進行清單更新,因為我的項目裡使用的是自己的注冊中心,沒有用Eureka,是以也寫了一個實作類,參照了使用Eureka的情況下的預設類 DiscoveryEnabledNIWSServerList,讓我們看看這個類的代碼
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
//其實在更新的時候,調用的也是從注冊中心拉取清單的方法
return obtainServersViaDiscovery();
}
//從Eureka拉取服務
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
......代碼省略......
return serverList;
}
由此我們知道,無論配置哪個 ServerListUpdater,在更新服務清單的時候,都是調用ServerList接口進行一次服務拉取,然後更新本地的清單,隻是觸發的時機不同:
- PollingServerListUpdater 30秒拉取一次(時間可以修改)
- EurekaNotificationServerListUpdater 當服務更新時,通知用戶端
那麼這兩種方法分别有什麼弊端呢?
先說說PollingServerListUpdater,如果在拉取的間隔中,有服務下線了,極端情況下,原來所有的執行個體都不可用,換成了新的一批執行個體,要等到下一次拉取的時間點才會更新,這樣會造成最久30秒的時間服務不可用。 比如:原來5個執行個體 A、B、C、D、E變成了 F、G、H、I、J,但是由于并沒有到更新的時間點,ribbon儲存的還是老的服務執行個體,而這時它們都已下線,無法提供服務。
那如果我們換成,一旦服務變更(這裡是上下線都會通知)就通知用戶端的 EurekaNotificationServerListUpdater 會怎麼樣呢,這樣好像可以實時的替換成最新的可用執行個體,保證服務不會打到失效的執行個體上。可是這會有另一個問題,也是在極端情況下,如果這次通知由于網絡問題,沒有通知到用戶端,那麼這次變動過後,如果一直沒有服務變更,用戶端就再也不會進行服務的拉取,這個時候造成不可用的時間就難以預估了。比如:在9:00的時候,A、B、C、D、E服務全部下線,F、G、H、I、J上線,Eureka通知用戶端,但網絡抖動,用戶端沒有收到,或者用戶端收到了,拉取時候失敗,并沒有更新本地的清單,這樣隻有等到下次收到通知時才會去拉取,假設接下來服務很穩定,12:00的時候才有一次更新,這樣就有3個小時的服務不可用。
那麼有沒有什麼辦法既可以擁有2個實作類的優勢,又摒棄它們的弊端呢?有的!
小孩子才做選擇,我們大人是全都要!那就是自己寫一個ServerListUpdater的實作類,然後:
- 1 啟動一個定時任務,定時從注冊中心拉取,而這個拉取間隔也可以根據自己的預估進行修改。
- 2 注冊一個監聽器,可以收到注冊中心的服務變更通知
- 3 在配置中,把預設的 ServerListUpdater改成自己寫的 ServerListUpdater,這樣ribbonConfig類在看到有ServerListUpdater的實作類的情況下,就不會加載 Eureka的 ServerListUpdater了。
實作類的代碼太長,就不貼了,邏輯很清晰,可以參考PollingServerListUpdater 和 EurekaNotificationServerListUpdater,把他們的代碼照着寫就行
接下來就是配置,在配置類中添加上自己的實作類
/**
* 服務更新通知機制
* @param notificationService
* @param clientConfig
* @return
*/
@Bean
public ServerListUpdater ribbonServerListUpdater(NotificationServiceImpl notificationService, IClientConfig clientConfig) {
return new PollingNotificationServerListUpdater(notificationService, clientConfig);
}
2、 Ribbon重試配置
gateway使用ribbon作為服務調用的負載均衡中間件,最終使用的都是ribbon的幾個元件實作:
- ServerList:拉取服務清單接口
- ServerListFilter:對ServerList伺服器清單進行過濾
- ServerListUpdater: 更新服務清單的接口
- IPing: 健康檢查
- IRule :負載均衡規則,如輪詢、随機等
- ILoadBalancer:負載均衡器,組合以上元件最終實作負載
對于以上接口的實作先暫時不寫,這次寫的是gateway使用以上元件進行負載均衡的2種實作方式:
一 使用gateway自己的過濾器 LoadBalancerClientFilter 組合以上接口 實作
二 通過對 RestTemplate 增加 LoadBalancerInterceptor攔截器實作
那麼項目在啟動的時候是如何在這兩種方式當中選擇的呢?
我們先看第一種方式,LoadBalancerClientFilter的加載條件
@Configuration
@ConditionalOnClass({ LoadBalancerClient.class, RibbonAutoConfiguration.class,
DispatcherHandler.class })
@AutoConfigureAfter(RibbonAutoConfiguration.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class GatewayLoadBalancerClientAutoConfiguration {
// GlobalFilter beans
@Bean
@ConditionalOnBean(LoadBalancerClient.class)
@ConditionalOnMissingBean(LoadBalancerClientFilter.class)
public LoadBalancerClientFilter loadBalancerClientFilter(LoadBalancerClient client,
LoadBalancerProperties properties) {
return new LoadBalancerClientFilter(client, properties);
}
}
是通過 GatewayLoadBalancerClientAutoConfiguration 配置類來加載的,而GatewayLoadBalancerClientAutoConfiguration的加載條件是,當存在
LoadBalancerClient.class ,RibbonAutoConfiguration.class , DispatcherHandler.class的時候會加載,
1 LoadBalancerClient.class 隻有一個預設實作 RibbonLoadBalancerClient,是肯定會加載的
我們看剩下2個類
2 RibbonAutoConfiguration 配置類 是當我們在項目中引入 spring-cloud-starter-netflix-ribbon 依賴的時候進行加載
3 剩下最後一個DispatcherHandler 這個是關鍵,我在排查問題的時候,一直忽略這個類,看這名字感覺一定會加載,沒想到這個才是重點所在
DispatcherHandler 是在 WebFluxConfigurationSupport 中定義的
public class WebFluxConfigurationSupport implements ApplicationContextAware {
...中間代碼省略
@Bean
public DispatcherHandler webHandler() {
return new DispatcherHandler();
}
...以下代碼省略
}
WebFluxConfigurationSupport 被 DelegatingWebFluxConfiguration 繼承
@Configuration
public class DelegatingWebFluxConfiguration extends WebFluxConfigurationSupport {
可以看到,DelegatingWebFluxConfiguration 又被 EnableWebFluxConfiguration 繼承
而 EnableWebFluxConfiguration 被定義在 WebFluxAutoConfiguration 這個配置類中
@Configuration
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
@ConditionalOnClass(WebFluxConfigurer.class)
@ConditionalOnMissingBean({ WebFluxConfigurationSupport.class })
@AutoConfigureAfter({ ReactiveWebServerFactoryAutoConfiguration.class, CodecsAutoConfiguration.class,
ValidationAutoConfiguration.class })
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
public class WebFluxAutoConfiguration {
...中間代碼省略
/**
* Configuration equivalent to {@code @EnableWebFlux}.
*/
@Configuration
public static class EnableWebFluxConfiguration extends DelegatingWebFluxConfiguration {
...中間代碼省略
}
...代碼省略
}
終于到了關鍵的地方了
WebFluxAutoConfiguration 的第二行
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
表明了它的加載條件,隻有當 springboot 的啟動環境 WebApplicationType 為 REACTIVE才會進行加載
而這個類型怎麼确定呢?
當我們依賴 spring-boot-starter-web 的時候,WebApplicationType 為 SERVLET
依賴 spring-boot-starter-webflux 并且 沒有依賴 spring-boot-starter-web 的時候,WebApplicationType 為 REACTIVE
如果沒有依賴,springboot2.0以上預設為 REACTIVE
既然這兩種方式都能實作負載均衡,那有什麼差別的,關鍵在于想要進行錯誤重試的時候
如果使用 LoadBalancerClientFilter方式實作負載均衡,重試的時候要在配置檔案中這麼配置
spring:
cloud:
gateway:
default-filters:
- name: Retry
args:
retries: 3
這樣是對所有路由生效,或者指定單個路由
spring:
gateway:
routes:
- id: test
uri: lb://test
predicates:
- Path=/test/*
filters:
- name: Retry
args:
retries: 3
而使用 LoadBalancerInterceptor攔截器方式實作負載均衡,配置檔案中這麼配置
ribbon:
MaxAutoRetries: 0
MaxAutoRetriesNextServer: 2
ReadTimeout: 2000
ConnectTimeout: 1500
OkToRetryOnAllOperations: true
3、動态路由注意事項
網關開發的過程中,因為有對某些服務進行動态的上下線的需求,是以進行了動态路由的開發,網上也有例子,實作方式就不贅述了,但這裡有2個注意事項。
這兩個錯誤很嚴重,一旦出現了之後,和路由有關的功能都無法再使用,需要重新開機項目
一 路由資訊裡的斷言器資訊不能為空
如果在添加路由資訊的時候沒有傳斷言器,比如這樣
{
"id":"test",
"predicates":[],
"filters":[],
"uri":"lb://test",
"order":0
}
會抛出異常
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator.combinePredicates(RouteDefinitionRouteLocator.java:221)
at org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator.convertToRoute(RouteDefinitionRouteLocator.java:143)
我們看路由資訊類的時候,可以發現斷言器是一個List,RouteDefinitionRouteLocator下邊有這個方法,會先取List中第一個斷言器,然後與之後的斷言器做組合
private AsyncPredicate<ServerWebExchange> combinePredicates(
RouteDefinition routeDefinition) {
List<PredicateDefinition> predicates = routeDefinition.getPredicates();
//取出第一個斷言器
AsyncPredicate<ServerWebExchange> predicate = lookup(routeDefinition,
predicates.get(0));
//取出後續斷言器
for (PredicateDefinition andPredicate : predicates.subList(1,
predicates.size())) {
AsyncPredicate<ServerWebExchange> found = lookup(routeDefinition,
andPredicate);
//第一個斷言器和後續斷言器做 and 操作
predicate = predicate.and(found);
}
return predicate;
}
如果沒有填斷言器資訊,就會報數組越界異常
二 斷言器的名字不能随便取
比如這裡我取了一個 zuibuxing 的斷言器名字
{
"id":"test",
"predicates":[
{
"name":"zuibuxing",
"args":{
"_genkey_0":"/test/**"
}
}
],
"filters":[
],
"uri":"lb://test",
"order":0
}
會報這個錯誤
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: Unable to find RoutePredicateFactory with name zuibuxing
Caused by: java.lang.IllegalArgumentException: Unable to find RoutePredicateFactory with name zuibuxing
at org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator.lookup(RouteDefinitionRouteLocator.java:240)
at org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator.combinePredicates(RouteDefinitionRouteLocator.java:220)
at org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator.convertToRoute(RouteDefinitionRouteLocator.java:143)
說找不到名字為 zuibuxing 的斷言器,這是因為儲存自定義路由的時候,RouteDefinitionRouteLocato 會按照 name 去斷言器Map 裡去尋找
private final Map<String, RoutePredicateFactory> predicates = new LinkedHashMap<>();
... 中間代碼省略 ....
//按照名字尋找斷言器
private AsyncPredicate<ServerWebExchange> lookup(RouteDefinition route,
PredicateDefinition predicate) {
RoutePredicateFactory<Object> factory = this.predicates.get(predicate.getName());
if (factory == null) {
throw new IllegalArgumentException(
"Unable to find RoutePredicateFactory with name "
+ predicate.getName());
}
...以下代碼省略
}
RouteDefinitionRouteLocator 在這個Map中儲存所有實作了 RoutePredicateFactory 的類,name是類名去掉RoutePredicateFactory,比如 AfterRoutePredicateFactory 斷言器的名字是 After,這裡有兩個特殊的斷言器 CloudFoundryRouteService,ReadBodyPredicateFactory,因為它們的類名沒有以 RoutePredicateFactory 結尾,是以就用本身類名作為 斷言器name
如果是随便寫的,會導緻找不到斷言器報錯,是以我們隻能填gateway自己的斷言器,或者自己實作了 RoutePredicateFactory 的斷言器
4、配置預設filter
在開發網關的時候,我們可以為我們的所有路由器定義同樣的filter,這樣就不用為每個route單獨進行配置,有幾種實作方式:
一:通過配置繼承了AbstractGatewayFilterFactory的bean來進行
在配置檔案中增加default-filters
spring:
application:
name: gateway
profiles:
active: prod
cloud:
gateway:
default-filters:
- name: ModifyRequestBody
- name: MyLog
routes:
- id: route1
uri: lb://route1
predicates:
- Path=/route1/**
- id: route2
uri: lb://route2
predicates:
- Path=/route2/**
ModifyRequestBody 和 MyLog 是過濾器的名字,他們都繼承了AbstractGatewayFilterFactory,這樣gateway在啟動的時候,就會把配置的名字加上GatewayFilterFactory的字尾作為bean的名字進行加載,在這個例子中就是 ModifyRequestBodyGatewayFilterFactory 和 MyLogGatewayFilterFactory 這個兩個bean,其中ModifyRequestBodyGatewayFilterFactory是gateway自帶的類,MyLogGatewayFilterFactory是我自己實作的類,大家可以通過這樣的方式自定義自己的過濾器。
但這種方式有一個問題,要注意一下,衆所周知,路由器除了配置檔案,還可以通過java 代碼的方式進行配置,如下:
@Bean
public RouteLocator routeLocator(RouteLocatorBuilder builder) {
return builder.routes()
.route(r -> r.path("/test/**")
.uri("lb://test")
.order(0)
.id("test")
)
.build();
}
或者我們也可以自己進行動态路由的配置,網上有很多例子,我就不貼出來了
而上邊那種通過yml或properties檔案配置預設過濾器的方式,對 java代碼配置的路由器不生效
隻對配置檔案和動态配置的路由生效(這個坑我進行了一上午的嘗試對比才發現)
因為通過使用代碼定義路由使用的是 RouteLocator 這個類
而通過配置檔案或者外部存儲器配置路由使用的是 RouteDefinitionLocator 的實作類進行的,有 4個實作類:
- PropertiesRouteDefinitionLocator ,從配置檔案( 例如,YML / Properties 等 ) 讀取
- RouteDefinitionRepository ,從存儲器( 例如,記憶體 / Redis / MySQL 等 )讀取(預設使用的是從記憶體)
- DiscoveryClientRouteDefinitionLocator ,從注冊中心( 例如,Eureka / Zookeeper 等 )讀取
- CompositeRouteDefinitionLocator ,組合多種 RouteDefinitionLocator 的實作
這裡應該是,配置gateway自帶的過濾器還是生效的,但是自己繼承了AbstractGatewayFilterFactory的類就不生效,因為之前的項目配置過自帶的Hystrix過濾器生效了
如果想讓自己的過濾器對所有的路由生效,就用到這第二種方式
5、處理 requestBody
這次是需要在請求的入口和出口分别列印封包資訊。處理GET請求的時候還好,POST請求有時候參數放在requestBody中,而且2.X的版本之後,spring cloud使用 spring5 webflux方式程式設計,在filter中處理過一次的requestBody,下遊訂閱者無法接收,網上找了很多都是對某個具體路由在編碼中進行配置,如下
.route("rewrite_request_upper", r -> r.host("*.rewriterequestupper.org")
.filters(f -> f.prefixPath("/httpbin")
.addResponseHeader("X-TestHeader", "rewrite_request_upper")
.modifyRequestBody(String.class, String.class,
(exchange, s) -> {
return Mono.just(s.toUpperCase()+s.toUpperCase());
})
).uri(uri)
)
而我的項目中使用了動态路由,未來路由的增減是很常見的事,這種方式沒法對所有的路由進行請求攔截,很不靈活。
最後參看源碼,發現了一個ModifyRequestBodyGatewayFilterFactory類,裡邊有對requestBody的處理邏輯,然後把源碼照搬了過來,由于我隻需要列印日志,稍微改一下就行,别的代碼就保留了。
/**
* 參照 ModifyRequestBodyGatewayFilterFactory 寫的一個處理 requestBody的filter
*
* @author huangting
*/
@Component
public class RequestHandlerFilter implements GlobalFilter, Ordered {
private static final Logger logger = LoggerFactory.getLogger(RequestHandlerFilter.class);
private static final String METHOD_POST = "POST";
private static final String METHOD_GET = "GET";
@Override
public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
//POST和get的處理邏輯不一樣
if (METHOD_POST.equals(exchange.getRequest().getMethodValue())) {
ServerRequest serverRequest = new DefaultServerRequest(exchange);
Mono<String> modifiedBody = serverRequest.bodyToMono(String.class).flatMap(requestBody -> {
//列印請求封包
logRequestLog(request, requestBody);
return Mono.just(requestBody);
});
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
// the new content type will be computed by bodyInserter
// and then set in the request decorator
headers.remove(HttpHeaders.CONTENT_LENGTH);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext())
// .log("modify_request", Level.INFO)
.then(Mono.defer(() -> {
ServerHttpRequest decorator = decorate(exchange, headers, outputMessage);
return chain.filter(exchange.mutate().request(decorator).build());
}));
} else {
//列印請求封包
logRequestLog(request, null);
chain.filter(exchange);
}
return chain.filter(exchange);
}
ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers, CachedBodyOutputMessage outputMessage) {
return new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
} else {
// TODO: this causes a 'HTTP/1.1 411 Length Required' // on
// httpbin.org
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}
@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
}
/**
* 列印請求日志
*
* @param request
*/
public void logRequestLog(ServerHttpRequest request, String requestBody) {
String requestParam = request.getQueryParams().toString();
logger.info("請求封包 URL:{},Method:{},headers:{},param:{},requestbody:{}", request.getURI().getPath(), request.getMethod(), request.getHeaders(), requestParam, requestBody);
}
@Override
public int getOrder() {
// -1 is response write filter, must be called before that
return -3;
}
}
需要注意的是 ,我的spring cloud 是 2.1.3,Greenwich.SR3的版本,CachedBodyOutputMessage 和 DefaultServerRequest這裡兩個類的權限變成了spring私有的了,需要把他們copy出來作為自己項目中的類,以上的代碼有用到
6、處理 reponse封包(解決截斷及亂碼問題)
因為網關是請求的出入口,防止各調用方及服務方互相之間扯皮,響應封包也需要打出來,而這裡有一個問題,如果響應過大的話,Flux會進行截斷,這樣有2個問題,一個是每次處理都會打一次部分封包,不過這個可以通過doOnComplete()來解決,第二就是達到一定長度出現亂碼,後來檢視api,有一個合并的方法,問題解決,代碼如下
/**
* 處理響應的 的filter
* @author huangting
*/
@Component
public class ResponseHandlerFilter implements GlobalFilter, Ordered {
private static final Logger logger = LoggerFactory.getLogger(ResponseHandlerFilter.class);
private static final String START_TIME = "startTime";
@Autowired
private MetricService metricService;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String ip = IpUtil.getRemoteHost(request);
//執行完成後 進行調用耗時埋點
exchange.getAttributes().put(START_TIME, System.currentTimeMillis());
//原始響應類
ServerHttpResponse originalResponse = exchange.getResponse();
DataBufferFactory bufferFactory = originalResponse.bufferFactory();
//重新包裝的響應類
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
return super.writeWith(fluxBody.buffer().map(dataBuffer -> {
//如果響應過大,會進行截斷,出現亂碼,然後看api DefaultDataBufferFactory有個join方法可以合并所有的流,亂碼的問題解決
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer join = dataBufferFactory.join(dataBuffer);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
//釋放掉記憶體
DataBufferUtils.release(join);
//列印響應日志
logResponse(exchange, new String(content, StandardCharsets.UTF_8));
return bufferFactory.wrap(content);
}));
}
};
return chain.filter(exchange.mutate().response(decoratedResponse).build())
.then(Mono.fromRunnable(() -> {
Long startTime = exchange.getAttribute(START_TIME);
if (startTime != null) {
Long executeTime = (System.currentTimeMillis() - startTime);
//influxDB埋點
metricService.pointRequestLatency(ip, request.getURI().getPath(), executeTime);
}
}));
}
/**
* 列印響應封包
*
* @param exchange
*/
public void logResponse(ServerWebExchange exchange, String response) {
ServerHttpRequest request = exchange.getRequest();
logger.info("響應封包 URL:{},Method:{},headers:{},response:{}", request.getURI().getPath(), request.getMethod(), exchange.getResponse().getHeaders(), response);
}
@Override
public int getOrder() {
// -1 is response write filter, must be called before that
return -3;
}
}
最近發現一個問題,當接口的調用傳回值為空的時候,并不會進入 writeWith 裡邊的map方法,是以當Flux 進行訂閱時,map裡的 logResponse 沒有執行,是以先定義一個預設的 AtomicReference<String> responseBody 變量,當傳回值不為空時,對它進行更新;然後把 logResponse 方法下移,放到 return語句中,這樣就能保證列印方法總會被執行,修改後的 filter 方法代碼如下
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String ip = IpUtil.getRemoteHost(request);
//執行完成後 進行調用耗時埋點
exchange.getAttributes().put(START_TIME, System.currentTimeMillis());
//原始響應類
ServerHttpResponse originalResponse = exchange.getResponse();
DataBufferFactory bufferFactory = originalResponse.bufferFactory();
//初始化一個 預設的 responseBody
AtomicReference<String> responseBody= new AtomicReference<>("no-responseBody");
//重新包裝的響應類
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
return super.writeWith(fluxBody.buffer().map(dataBuffer -> {
//如果響應過大,會進行截斷,出現亂碼,然後看api DefaultDataBufferFactory有個join方法可以合并所有的流,亂碼的問題解決
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer join = dataBufferFactory.join(dataBuffer);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
//釋放掉記憶體
DataBufferUtils.release(join);
//如果有傳回值,将 responseBody 覆寫
responseBody.set(new String(content, StandardCharsets.UTF_8));
return bufferFactory.wrap(content);
}));
}
};
return chain.filter(exchange.mutate().response(decoratedResponse).build())
.then(Mono.fromRunnable(() -> {
//列印響應日志
logResponse(exchange, responseBody.get());
Long startTime = exchange.getAttribute(START_TIME);
if (startTime != null) {
Long executeTime = (System.currentTimeMillis() - startTime);
//influxDB埋點
metricService.pointRequestLatency(ip, request.getURI().getPath(), executeTime);
}
}));
}
文章轉載 : 醉不醒 的spring cloud gateway 二次開發
連結:https://www.jianshu.com/p/a5fc8039d236