天天看點

spring cloud gateway 二次開發5、處理 requestBody6、處理 reponse封包(解決截斷及亂碼問題)

最近spring cloud gateway 二次開發時看到文章,裡面處理擷取傳回體内容有用到,轉載過來一起學習

1、ServerListUpdater 服務清單更新

gateway使用ribbon作為服務調用的負載均衡中間件,根據配置的 IRule 對拉取到的服務清單進行負載

而這些真正提供服務的執行個體是有動态上下線的情況存在的,為了保證輪詢到的服務執行個體能正常通路,ribbon中有一個接口

ServerListUpdater 會定期對服務清單進行更新

在使用 Eureka 作為注冊中心的時候,ServerListUpdater有兩個實作類:

  • PollingServerListUpdater :定時從注冊中心拉取服務清單,如果沒有配置,預設為30秒
  • EurekaNotificationServerListUpdater :注冊中心中的服務有變動時,通知用戶端,EurekaNotificationServerListUpdater是通過添加了一個監聽器,當收到注冊中心的通知後,做出相應的動作

PollingServerListUpdater 也是 預設的 ServerListUpdater 配置

分别看一下它們的代碼實作

PollingServerListUpdater

// 從注冊中心拉取服務清單的間隔
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;

 @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {

            // 這裡封裝了一個進行服務拉取的任務
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        //這裡是進行服務清單更新的動作
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };
           
            // 使用 scheduled 進行定時拉取
            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,//這個就是剛才配置的拉取間隔
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }
           

EurekaNotificationServerListUpdater

@Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            this.updateListener = new EurekaEventListener() {

               // 監聽 Eureka釋出的事件,然後拉取最新的清單
                @Override
                public void onEvent(EurekaEvent event) {
                    if (event instanceof CacheRefreshedEvent) {
                        if (!updateQueued.compareAndSet(false, true)) {  // if an update is already queued
                            logger.info("an update action is already queued, returning as no-op");
                            return;
                        }

                        if (!refreshExecutor.isShutdown()) {
                            try {
                                refreshExecutor.submit(new Runnable() {
                                    @Override
                                    public void run() {
                                        try {
                                            //這裡是進行服務清單更新的動作
                                            updateAction.doUpdate();
                                            lastUpdated.set(System.currentTimeMillis());
                                        } catch (Exception e) {
                                            logger.warn("Failed to update serverList", e);
                                        } finally {
                                            updateQueued.set(false);
                                        }
                                    }
                                });  // fire and forget
                            } catch (Exception e) {
                                logger.warn("Error submitting update task to executor, skipping one round of updates", e);
                                updateQueued.set(false);  // if submit fails, need to reset updateQueued to false
                            }
                        }
                        else {
                            logger.debug("stopping EurekaNotificationServerListUpdater, as refreshExecutor has been shut down");
                            stop();
                        }
                    }
                }
            };
        
      ...以下代碼省略

        } else {
            logger.info("Update listener already registered, no-op");
        }
    }
           

可以看到,這兩個 ServerListUpdater 實作類在更新服務清單的時候,都做了同一個動作updateAction.doUpdate()

進入這個方法,發現它是一個定義在ServerListUpdater 中的一個接口

public interface ServerListUpdater {

    /**
     * an interface for the updateAction that actually executes a server list update
     */
    public interface UpdateAction {
        void doUpdate();
    }
 ...以下代碼省略

}
           

而它的方法實作是在 負載均衡器 DynamicServerListLoadBalancer 中定義的

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);

   ......代碼省略......

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            // UpdateAction的方法實作,調用了另一個方法
            updateListOfServers();
        }
    };

    
    
    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            //真正進行清單更新的地方
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

......代碼省略......

}
           

可以看到,這個 serverListImpl 就是在負載均衡器的ServerList屬性,使用這個接口的getUpdatedListOfServers方法進行清單更新,因為我的項目裡使用的是自己的注冊中心,沒有用Eureka,是以也寫了一個實作類,參照了使用Eureka的情況下的預設類 DiscoveryEnabledNIWSServerList,讓我們看看這個類的代碼

@Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
        //其實在更新的時候,調用的也是從注冊中心拉取清單的方法
        return obtainServersViaDiscovery();
    }
    
   //從Eureka拉取服務
    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        ......代碼省略......

        return serverList;
    }
           

由此我們知道,無論配置哪個 ServerListUpdater,在更新服務清單的時候,都是調用ServerList接口進行一次服務拉取,然後更新本地的清單,隻是觸發的時機不同:

  • PollingServerListUpdater 30秒拉取一次(時間可以修改)
  • EurekaNotificationServerListUpdater 當服務更新時,通知用戶端

那麼這兩種方法分别有什麼弊端呢?

先說說PollingServerListUpdater,如果在拉取的間隔中,有服務下線了,極端情況下,原來所有的執行個體都不可用,換成了新的一批執行個體,要等到下一次拉取的時間點才會更新,這樣會造成最久30秒的時間服務不可用。 比如:原來5個執行個體 A、B、C、D、E變成了 F、G、H、I、J,但是由于并沒有到更新的時間點,ribbon儲存的還是老的服務執行個體,而這時它們都已下線,無法提供服務。

那如果我們換成,一旦服務變更(這裡是上下線都會通知)就通知用戶端的 EurekaNotificationServerListUpdater 會怎麼樣呢,這樣好像可以實時的替換成最新的可用執行個體,保證服務不會打到失效的執行個體上。可是這會有另一個問題,也是在極端情況下,如果這次通知由于網絡問題,沒有通知到用戶端,那麼這次變動過後,如果一直沒有服務變更,用戶端就再也不會進行服務的拉取,這個時候造成不可用的時間就難以預估了。比如:在9:00的時候,A、B、C、D、E服務全部下線,F、G、H、I、J上線,Eureka通知用戶端,但網絡抖動,用戶端沒有收到,或者用戶端收到了,拉取時候失敗,并沒有更新本地的清單,這樣隻有等到下次收到通知時才會去拉取,假設接下來服務很穩定,12:00的時候才有一次更新,這樣就有3個小時的服務不可用。

那麼有沒有什麼辦法既可以擁有2個實作類的優勢,又摒棄它們的弊端呢?有的!

小孩子才做選擇,我們大人是全都要!那就是自己寫一個ServerListUpdater的實作類,然後:

  • 1 啟動一個定時任務,定時從注冊中心拉取,而這個拉取間隔也可以根據自己的預估進行修改。
  • 2 注冊一個監聽器,可以收到注冊中心的服務變更通知
  • 3 在配置中,把預設的 ServerListUpdater改成自己寫的 ServerListUpdater,這樣ribbonConfig類在看到有ServerListUpdater的實作類的情況下,就不會加載 Eureka的 ServerListUpdater了。

實作類的代碼太長,就不貼了,邏輯很清晰,可以參考PollingServerListUpdater 和 EurekaNotificationServerListUpdater,把他們的代碼照着寫就行

接下來就是配置,在配置類中添加上自己的實作類

/**
     * 服務更新通知機制
     * @param notificationService
     * @param clientConfig
     * @return
     */
    @Bean
    public ServerListUpdater ribbonServerListUpdater(NotificationServiceImpl notificationService, IClientConfig clientConfig) {
        return new PollingNotificationServerListUpdater(notificationService, clientConfig);
    }
           

2、 Ribbon重試配置

gateway使用ribbon作為服務調用的負載均衡中間件,最終使用的都是ribbon的幾個元件實作:

  • ServerList:拉取服務清單接口
  • ServerListFilter:對ServerList伺服器清單進行過濾
  • ServerListUpdater: 更新服務清單的接口
  • IPing: 健康檢查
  • IRule :負載均衡規則,如輪詢、随機等
  • ILoadBalancer:負載均衡器,組合以上元件最終實作負載

對于以上接口的實作先暫時不寫,這次寫的是gateway使用以上元件進行負載均衡的2種實作方式:

一 使用gateway自己的過濾器 LoadBalancerClientFilter 組合以上接口 實作

二 通過對 RestTemplate 增加 LoadBalancerInterceptor攔截器實作

那麼項目在啟動的時候是如何在這兩種方式當中選擇的呢?

我們先看第一種方式,LoadBalancerClientFilter的加載條件

@Configuration
@ConditionalOnClass({ LoadBalancerClient.class, RibbonAutoConfiguration.class,
        DispatcherHandler.class })
@AutoConfigureAfter(RibbonAutoConfiguration.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class GatewayLoadBalancerClientAutoConfiguration {

    // GlobalFilter beans

    @Bean
    @ConditionalOnBean(LoadBalancerClient.class)
    @ConditionalOnMissingBean(LoadBalancerClientFilter.class)
    public LoadBalancerClientFilter loadBalancerClientFilter(LoadBalancerClient client,
            LoadBalancerProperties properties) {
        return new LoadBalancerClientFilter(client, properties);
    }

}
           

是通過 GatewayLoadBalancerClientAutoConfiguration 配置類來加載的,而GatewayLoadBalancerClientAutoConfiguration的加載條件是,當存在

LoadBalancerClient.class ,RibbonAutoConfiguration.class , DispatcherHandler.class的時候會加載,

1 LoadBalancerClient.class 隻有一個預設實作 RibbonLoadBalancerClient,是肯定會加載的

我們看剩下2個類

2 RibbonAutoConfiguration 配置類 是當我們在項目中引入 spring-cloud-starter-netflix-ribbon 依賴的時候進行加載

3 剩下最後一個DispatcherHandler 這個是關鍵,我在排查問題的時候,一直忽略這個類,看這名字感覺一定會加載,沒想到這個才是重點所在

DispatcherHandler 是在 WebFluxConfigurationSupport 中定義的

public class WebFluxConfigurationSupport implements ApplicationContextAware {

     ...中間代碼省略

    @Bean
    public DispatcherHandler webHandler() {
        return new DispatcherHandler();
    }
 ...以下代碼省略
}
           

WebFluxConfigurationSupport 被 DelegatingWebFluxConfiguration 繼承

@Configuration
public class DelegatingWebFluxConfiguration extends WebFluxConfigurationSupport {
           

可以看到,DelegatingWebFluxConfiguration 又被 EnableWebFluxConfiguration 繼承

而 EnableWebFluxConfiguration 被定義在 WebFluxAutoConfiguration 這個配置類中

@Configuration
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
@ConditionalOnClass(WebFluxConfigurer.class)
@ConditionalOnMissingBean({ WebFluxConfigurationSupport.class })
@AutoConfigureAfter({ ReactiveWebServerFactoryAutoConfiguration.class, CodecsAutoConfiguration.class,
        ValidationAutoConfiguration.class })
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
public class WebFluxAutoConfiguration {

...中間代碼省略

/**
     * Configuration equivalent to {@code @EnableWebFlux}.
     */
    @Configuration
    public static class EnableWebFluxConfiguration extends DelegatingWebFluxConfiguration {

        ...中間代碼省略

    }

...代碼省略

}
           

終于到了關鍵的地方了

WebFluxAutoConfiguration 的第二行

@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)

表明了它的加載條件,隻有當 springboot 的啟動環境 WebApplicationType 為 REACTIVE才會進行加載

而這個類型怎麼确定呢?

當我們依賴 spring-boot-starter-web 的時候,WebApplicationType 為 SERVLET

依賴 spring-boot-starter-webflux 并且 沒有依賴 spring-boot-starter-web 的時候,WebApplicationType 為 REACTIVE

如果沒有依賴,springboot2.0以上預設為 REACTIVE

既然這兩種方式都能實作負載均衡,那有什麼差別的,關鍵在于想要進行錯誤重試的時候

如果使用 LoadBalancerClientFilter方式實作負載均衡,重試的時候要在配置檔案中這麼配置

spring:
  cloud:
    gateway:
      default-filters:
        - name: Retry
          args:
            retries: 3
           

這樣是對所有路由生效,或者指定單個路由

spring:
  gateway:
    routes:
      - id: test
        uri: lb://test
        predicates:
          - Path=/test/*
        filters:
          - name: Retry
            args:
              retries: 3
           

而使用 LoadBalancerInterceptor攔截器方式實作負載均衡,配置檔案中這麼配置

ribbon:
  MaxAutoRetries: 0
  MaxAutoRetriesNextServer: 2
  ReadTimeout: 2000
  ConnectTimeout: 1500
  OkToRetryOnAllOperations: true
           

3、動态路由注意事項

網關開發的過程中,因為有對某些服務進行動态的上下線的需求,是以進行了動态路由的開發,網上也有例子,實作方式就不贅述了,但這裡有2個注意事項。

這兩個錯誤很嚴重,一旦出現了之後,和路由有關的功能都無法再使用,需要重新開機項目

一 路由資訊裡的斷言器資訊不能為空

如果在添加路由資訊的時候沒有傳斷言器,比如這樣

{
    "id":"test",
    "predicates":[],
    "filters":[],
    "uri":"lb://test",
    "order":0
}
           

會抛出異常

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:653)
    at java.util.ArrayList.get(ArrayList.java:429)
    at org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator.combinePredicates(RouteDefinitionRouteLocator.java:221)
    at org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator.convertToRoute(RouteDefinitionRouteLocator.java:143)
           

我們看路由資訊類的時候,可以發現斷言器是一個List,RouteDefinitionRouteLocator下邊有這個方法,會先取List中第一個斷言器,然後與之後的斷言器做組合

private AsyncPredicate<ServerWebExchange> combinePredicates(
            RouteDefinition routeDefinition) {
        List<PredicateDefinition> predicates = routeDefinition.getPredicates();
        //取出第一個斷言器
        AsyncPredicate<ServerWebExchange> predicate = lookup(routeDefinition,
                predicates.get(0));

       //取出後續斷言器
        for (PredicateDefinition andPredicate : predicates.subList(1,
                predicates.size())) {
            AsyncPredicate<ServerWebExchange> found = lookup(routeDefinition,
                    andPredicate);
       //第一個斷言器和後續斷言器做 and 操作
            predicate = predicate.and(found);
        }

        return predicate;
    }
           

如果沒有填斷言器資訊,就會報數組越界異常

二 斷言器的名字不能随便取

比如這裡我取了一個 zuibuxing 的斷言器名字

{
    "id":"test",
    "predicates":[
        {
            "name":"zuibuxing",
            "args":{
                "_genkey_0":"/test/**"
            }
        }
    ],
    "filters":[

    ],
    "uri":"lb://test",
    "order":0
}
           

會報這個錯誤

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: Unable to find RoutePredicateFactory with name zuibuxing
Caused by: java.lang.IllegalArgumentException: Unable to find RoutePredicateFactory with name zuibuxing
    at org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator.lookup(RouteDefinitionRouteLocator.java:240)
    at org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator.combinePredicates(RouteDefinitionRouteLocator.java:220)
    at org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator.convertToRoute(RouteDefinitionRouteLocator.java:143)
           

說找不到名字為 zuibuxing 的斷言器,這是因為儲存自定義路由的時候,RouteDefinitionRouteLocato 會按照 name 去斷言器Map 裡去尋找

private final Map<String, RoutePredicateFactory> predicates = new LinkedHashMap<>();

... 中間代碼省略 ....

//按照名字尋找斷言器
private AsyncPredicate<ServerWebExchange> lookup(RouteDefinition route,
            PredicateDefinition predicate) {
        RoutePredicateFactory<Object> factory = this.predicates.get(predicate.getName());
        if (factory == null) {
            throw new IllegalArgumentException(
                    "Unable to find RoutePredicateFactory with name "
                            + predicate.getName());
        }

        ...以下代碼省略
    }
           

RouteDefinitionRouteLocator 在這個Map中儲存所有實作了 RoutePredicateFactory 的類,name是類名去掉RoutePredicateFactory,比如 AfterRoutePredicateFactory 斷言器的名字是 After,這裡有兩個特殊的斷言器 CloudFoundryRouteService,ReadBodyPredicateFactory,因為它們的類名沒有以 RoutePredicateFactory 結尾,是以就用本身類名作為 斷言器name

如果是随便寫的,會導緻找不到斷言器報錯,是以我們隻能填gateway自己的斷言器,或者自己實作了 RoutePredicateFactory 的斷言器

4、配置預設filter

在開發網關的時候,我們可以為我們的所有路由器定義同樣的filter,這樣就不用為每個route單獨進行配置,有幾種實作方式:

一:通過配置繼承了AbstractGatewayFilterFactory的bean來進行

在配置檔案中增加default-filters

spring:
  application:
    name: gateway
  profiles:
    active: prod
  cloud:
    gateway:
      default-filters:
        - name: ModifyRequestBody
        - name: MyLog
      routes:
        - id: route1
          uri: lb://route1
          predicates:
            - Path=/route1/**
        - id: route2
          uri: lb://route2
          predicates:
            - Path=/route2/**
           

ModifyRequestBody 和 MyLog 是過濾器的名字,他們都繼承了AbstractGatewayFilterFactory,這樣gateway在啟動的時候,就會把配置的名字加上GatewayFilterFactory的字尾作為bean的名字進行加載,在這個例子中就是 ModifyRequestBodyGatewayFilterFactory 和 MyLogGatewayFilterFactory 這個兩個bean,其中ModifyRequestBodyGatewayFilterFactory是gateway自帶的類,MyLogGatewayFilterFactory是我自己實作的類,大家可以通過這樣的方式自定義自己的過濾器。

但這種方式有一個問題,要注意一下,衆所周知,路由器除了配置檔案,還可以通過java 代碼的方式進行配置,如下:

@Bean
    public RouteLocator routeLocator(RouteLocatorBuilder builder) {
        return builder.routes()
                .route(r -> r.path("/test/**")
                        .uri("lb://test")
                        .order(0)
                        .id("test")
                )
                .build();
    }
           

或者我們也可以自己進行動态路由的配置,網上有很多例子,我就不貼出來了

而上邊那種通過yml或properties檔案配置預設過濾器的方式,對 java代碼配置的路由器不生效

隻對配置檔案和動态配置的路由生效(這個坑我進行了一上午的嘗試對比才發現)

因為通過使用代碼定義路由使用的是 RouteLocator 這個類

而通過配置檔案或者外部存儲器配置路由使用的是 RouteDefinitionLocator 的實作類進行的,有 4個實作類:

  • PropertiesRouteDefinitionLocator ,從配置檔案( 例如,YML / Properties 等 ) 讀取
  • RouteDefinitionRepository ,從存儲器( 例如,記憶體 / Redis / MySQL 等 )讀取(預設使用的是從記憶體)
  • DiscoveryClientRouteDefinitionLocator ,從注冊中心( 例如,Eureka / Zookeeper 等 )讀取
  • CompositeRouteDefinitionLocator ,組合多種 RouteDefinitionLocator 的實作

這裡應該是,配置gateway自帶的過濾器還是生效的,但是自己繼承了AbstractGatewayFilterFactory的類就不生效,因為之前的項目配置過自帶的Hystrix過濾器生效了

如果想讓自己的過濾器對所有的路由生效,就用到這第二種方式

5、處理 requestBody

這次是需要在請求的入口和出口分别列印封包資訊。處理GET請求的時候還好,POST請求有時候參數放在requestBody中,而且2.X的版本之後,spring cloud使用 spring5 webflux方式程式設計,在filter中處理過一次的requestBody,下遊訂閱者無法接收,網上找了很多都是對某個具體路由在編碼中進行配置,如下

.route("rewrite_request_upper", r -> r.host("*.rewriterequestupper.org")
                        .filters(f -> f.prefixPath("/httpbin")
                                .addResponseHeader("X-TestHeader", "rewrite_request_upper")
                                .modifyRequestBody(String.class, String.class,
                                        (exchange, s) -> {
                                            return Mono.just(s.toUpperCase()+s.toUpperCase());
                                        })
                        ).uri(uri)
                )
           

而我的項目中使用了動态路由,未來路由的增減是很常見的事,這種方式沒法對所有的路由進行請求攔截,很不靈活。

最後參看源碼,發現了一個ModifyRequestBodyGatewayFilterFactory類,裡邊有對requestBody的處理邏輯,然後把源碼照搬了過來,由于我隻需要列印日志,稍微改一下就行,别的代碼就保留了。

/**
 * 參照 ModifyRequestBodyGatewayFilterFactory 寫的一個處理 requestBody的filter
 *
 * @author huangting
 */
@Component
public class RequestHandlerFilter implements GlobalFilter, Ordered {
    private static final Logger logger = LoggerFactory.getLogger(RequestHandlerFilter.class);

    private static final String METHOD_POST = "POST";
    private static final String METHOD_GET = "GET";


    @Override
    public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {

        ServerHttpRequest request = exchange.getRequest();
        //POST和get的處理邏輯不一樣
        if (METHOD_POST.equals(exchange.getRequest().getMethodValue())) {
            ServerRequest serverRequest = new DefaultServerRequest(exchange);

            Mono<String> modifiedBody = serverRequest.bodyToMono(String.class).flatMap(requestBody -> {
                //列印請求封包
                logRequestLog(request, requestBody);

                return Mono.just(requestBody);
            });

            BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
            HttpHeaders headers = new HttpHeaders();
            headers.putAll(exchange.getRequest().getHeaders());

            // the new content type will be computed by bodyInserter
            // and then set in the request decorator
            headers.remove(HttpHeaders.CONTENT_LENGTH);

            CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
            return bodyInserter.insert(outputMessage, new BodyInserterContext())
                    // .log("modify_request", Level.INFO)
                    .then(Mono.defer(() -> {
                        ServerHttpRequest decorator = decorate(exchange, headers, outputMessage);
                        return chain.filter(exchange.mutate().request(decorator).build());
                    }));

        } else {
            //列印請求封包
            logRequestLog(request, null);
            chain.filter(exchange);
        }
        return chain.filter(exchange);
    }


    ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers, CachedBodyOutputMessage outputMessage) {
        return new ServerHttpRequestDecorator(exchange.getRequest()) {
            @Override
            public HttpHeaders getHeaders() {
                long contentLength = headers.getContentLength();
                HttpHeaders httpHeaders = new HttpHeaders();
                httpHeaders.putAll(super.getHeaders());
                if (contentLength > 0) {
                    httpHeaders.setContentLength(contentLength);
                } else {
                    // TODO: this causes a 'HTTP/1.1 411 Length Required' // on
                    // httpbin.org
                    httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
                }
                return httpHeaders;
            }

            @Override
            public Flux<DataBuffer> getBody() {
                return outputMessage.getBody();
            }
        };
    }


    /**
     * 列印請求日志
     *
     * @param request
     */
    public void logRequestLog(ServerHttpRequest request, String requestBody) {
        String requestParam = request.getQueryParams().toString();
        logger.info("請求封包 URL:{},Method:{},headers:{},param:{},requestbody:{}", request.getURI().getPath(), request.getMethod(), request.getHeaders(), requestParam, requestBody);
    }


    @Override
    public int getOrder() {
        // -1 is response write filter, must be called before that
        return -3;
    }
}
           

需要注意的是 ,我的spring cloud 是 2.1.3,Greenwich.SR3的版本,CachedBodyOutputMessage 和 DefaultServerRequest這裡兩個類的權限變成了spring私有的了,需要把他們copy出來作為自己項目中的類,以上的代碼有用到

6、處理 reponse封包(解決截斷及亂碼問題)

因為網關是請求的出入口,防止各調用方及服務方互相之間扯皮,響應封包也需要打出來,而這裡有一個問題,如果響應過大的話,Flux會進行截斷,這樣有2個問題,一個是每次處理都會打一次部分封包,不過這個可以通過doOnComplete()來解決,第二就是達到一定長度出現亂碼,後來檢視api,有一個合并的方法,問題解決,代碼如下

/**
 *  處理響應的 的filter
 * @author huangting
 */
@Component
public class ResponseHandlerFilter implements GlobalFilter, Ordered {
    private static final Logger logger = LoggerFactory.getLogger(ResponseHandlerFilter.class);
    private static final String START_TIME = "startTime";

    @Autowired
    private MetricService metricService;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

        ServerHttpRequest request = exchange.getRequest();
        String ip = IpUtil.getRemoteHost(request);

        //執行完成後 進行調用耗時埋點
        exchange.getAttributes().put(START_TIME, System.currentTimeMillis());

        //原始響應類
        ServerHttpResponse originalResponse = exchange.getResponse();
        DataBufferFactory bufferFactory = originalResponse.bufferFactory();
        //重新包裝的響應類
        ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
            @Override
            public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                Flux<? extends DataBuffer> fluxBody = Flux.from(body);
                return super.writeWith(fluxBody.buffer().map(dataBuffer -> {
                    //如果響應過大,會進行截斷,出現亂碼,然後看api DefaultDataBufferFactory有個join方法可以合并所有的流,亂碼的問題解決
                    DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
                    DataBuffer join = dataBufferFactory.join(dataBuffer);
                    byte[] content = new byte[join.readableByteCount()];
                    join.read(content);
                    //釋放掉記憶體
                    DataBufferUtils.release(join);

                    //列印響應日志
                    logResponse(exchange, new String(content, StandardCharsets.UTF_8));

                    return bufferFactory.wrap(content);
                }));
            }
        };

        return chain.filter(exchange.mutate().response(decoratedResponse).build())
                .then(Mono.fromRunnable(() -> {
                    Long startTime = exchange.getAttribute(START_TIME);
                    if (startTime != null) {
                        Long executeTime = (System.currentTimeMillis() - startTime);
                        //influxDB埋點
                        metricService.pointRequestLatency(ip, request.getURI().getPath(), executeTime);
                    }
                }));
    }


    /**
     * 列印響應封包
     *
     * @param exchange
     */
    public void logResponse(ServerWebExchange exchange, String response) {
        ServerHttpRequest request = exchange.getRequest();
        logger.info("響應封包 URL:{},Method:{},headers:{},response:{}", request.getURI().getPath(), request.getMethod(), exchange.getResponse().getHeaders(), response);
    }


    @Override
    public int getOrder() {
        // -1 is response write filter, must be called before that
        return -3;
    }
}
           

最近發現一個問題,當接口的調用傳回值為空的時候,并不會進入 writeWith 裡邊的map方法,是以當Flux 進行訂閱時,map裡的 logResponse 沒有執行,是以先定義一個預設的 AtomicReference<String> responseBody 變量,當傳回值不為空時,對它進行更新;然後把 logResponse 方法下移,放到 return語句中,這樣就能保證列印方法總會被執行,修改後的 filter 方法代碼如下

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

        ServerHttpRequest request = exchange.getRequest();
        String ip = IpUtil.getRemoteHost(request);

        //執行完成後 進行調用耗時埋點
        exchange.getAttributes().put(START_TIME, System.currentTimeMillis());

        //原始響應類
        ServerHttpResponse originalResponse = exchange.getResponse();
        DataBufferFactory bufferFactory = originalResponse.bufferFactory();

        //初始化一個 預設的 responseBody
        AtomicReference<String> responseBody= new AtomicReference<>("no-responseBody");

        //重新包裝的響應類
        ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
            @Override
            public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                Flux<? extends DataBuffer> fluxBody = Flux.from(body);
                return super.writeWith(fluxBody.buffer().map(dataBuffer -> {
                    //如果響應過大,會進行截斷,出現亂碼,然後看api DefaultDataBufferFactory有個join方法可以合并所有的流,亂碼的問題解決
                    DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
                    DataBuffer join = dataBufferFactory.join(dataBuffer);
                    byte[] content = new byte[join.readableByteCount()];
                    join.read(content);
                    //釋放掉記憶體
                    DataBufferUtils.release(join);

                    //如果有傳回值,将 responseBody 覆寫
                    responseBody.set(new String(content, StandardCharsets.UTF_8));

                    return bufferFactory.wrap(content);
                }));
            }
        };

        return chain.filter(exchange.mutate().response(decoratedResponse).build())
                .then(Mono.fromRunnable(() -> {
                    //列印響應日志
                    logResponse(exchange, responseBody.get());

                    Long startTime = exchange.getAttribute(START_TIME);
                    if (startTime != null) {
                        Long executeTime = (System.currentTimeMillis() - startTime);
                        //influxDB埋點
                        metricService.pointRequestLatency(ip, request.getURI().getPath(), executeTime);
                    }
                }));
    }
           

文章轉載 : 醉不醒 的spring cloud gateway 二次開發

連結:https://www.jianshu.com/p/a5fc8039d236

繼續閱讀