天天看點

Spring-Cloud-Gateway 源碼解析 —— 過濾器 (4.7) 之 NettyRoutingFilter1. 概述2. NettyRoutingFilter3. NettyWriteResponseFilter

1. 概述

本文主要分享 NettyRoutingFilter 的代碼實作。

NettyRoutingFilter ,Netty 路由網關過濾器。其根據 

http://

 或 

https://

 字首( Scheme )過濾處理,使用基于 Netty 實作的 HttpClient 請求後端 Http 服務。

NettyWriteResponseFilter ,與 NettyRoutingFilter 成對使用的網關過濾器。其将 NettyRoutingFilter 請求後端 Http 服務的響應寫回用戶端。

大體流程如下 :

Spring-Cloud-Gateway 源碼解析 —— 過濾器 (4.7) 之 NettyRoutingFilter1. 概述2. NettyRoutingFilter3. NettyWriteResponseFilter

另外,Spring Cloud Gateway 實作了 WebClientHttpRoutingFilter / WebClientWriteResponseFilter ,功能上和 NettyRoutingFilter / NettyWriteResponseFilter 相同,差别在于基于 

org.springframework.cloud.gateway.filter.WebClient

 實作的 HttpClient 請求後端 Http 服務。在 《Spring-Cloud-Gateway 源碼解析 —— 過濾器 (4.8) 之 WebClientHttpRoutingFilter》 ,我們會詳細解析。

2. NettyRoutingFilter

org.springframework.cloud.gateway.filter.NettyRoutingFilter

 ,Netty 路由網關過濾器。

構造方法,代碼如下 :

public class NettyRoutingFilter implements GlobalFilter, Ordered {

	private final HttpClient httpClient;

	public NettyRoutingFilter(HttpClient httpClient) {
		this.httpClient = httpClient;
	}

}
           
  • httpClient

     屬性,基于 Netty 實作的 HttpClient 。通過該屬性,請求後端的 Http 服務。

#getOrder()

 方法,代碼如下 :

@Override
public int getOrder() {
    return Ordered.LOWEST_PRECEDENCE;
}
           
  • 傳回順序為 

    Integer.MAX_VALUE

     。在 《Spring-Cloud-Gateway 源碼解析 —— 過濾器 (4.1) 之 GatewayFilter 一覽》「3. GlobalFilter」 ,我們列舉了所有 GlobalFilter 的順序。

#filter(ServerWebExchange, GatewayFilterChain)

 方法,代碼如下 :

1: @Override
 2: public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
 3: 	// 獲得 requestUrl
 4: 	URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
 5: 
 6: 	// 判斷是否能夠處理
 7: 	String scheme = requestUrl.getScheme();
 8: 	if (isAlreadyRouted(exchange) || (!scheme.equals("http") && !scheme.equals("https"))) {
 9: 		return chain.filter(exchange);
10: 	}
11: 
12: 	// 設定已經路由
13: 	setAlreadyRouted(exchange);
14: 
15: 	ServerHttpRequest request = exchange.getRequest();
16: 
17: 	// Request Method
18: 	final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString());
19: 
20: 	// 獲得 url
21: 	final String url = requestUrl.toString();
22: 
23: 	// Request Header
24: 	final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
25: 	request.getHeaders().forEach(httpHeaders::set);
26: 
27: 	// 請求
28: 	return this.httpClient.request(method, url, req -> {
29: 		final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach)
30: 				.failOnClientError(false) // // 是否請求失敗,抛出異常
31: 				.headers(httpHeaders);
32: 
33: 		// Request Form
34: 		if (MediaType.APPLICATION_FORM_URLENCODED.includes(request.getHeaders().getContentType())) {
35: 			return exchange.getFormData()
36: 					.flatMap(map -> proxyRequest.sendForm(form -> {
37: 						for (Map.Entry<String, List<String>> entry: map.entrySet()) {
38: 							for (String value : entry.getValue()) {
39: 								form.attr(entry.getKey(), value);
40: 							}
41: 						}
42: 					}).then())
43: 					.then(chain.filter(exchange));
44: 		}
45: 
46: 		// Request Body
47: 		return proxyRequest.sendHeaders() //I shouldn't need this
48: 				.send(request.getBody()
49: 						.map(DataBuffer::asByteBuffer) // Flux<DataBuffer> => ByteBuffer
50: 						.map(Unpooled::wrappedBuffer)); // ByteBuffer => Flux<DataBuffer>
51: 	}).doOnNext(res -> {
52: 		ServerHttpResponse response = exchange.getResponse();
53: 		// Response Header
54: 		// put headers and status so filters can modify the response
55: 		HttpHeaders headers = new HttpHeaders();
56: 		res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
57: 		response.getHeaders().putAll(headers);
58: 
59: 		// Response Status
60: 		response.setStatusCode(HttpStatus.valueOf(res.status().code()));
61: 
62: 		// 設定 Response 到 CLIENT_RESPONSE_ATTR
63: 		// Defer committing the response until all route filters have run
64: 		// Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter
65: 		exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
66: 	}).then(chain.filter(exchange));
67: }
           
  • 第 4 行 :獲得 

    requestUrl

     。
  • 第 7 至 10 行 :判斷 ForwardRoutingFilter 是否能夠處理該請求,需要滿足兩個條件 :
    • http://

       或者 

      https://

       字首( Scheme ) 。
    • 調用 

      ServerWebExchangeUtils#isAlreadyRouted(ServerWebExchange)

       方法,判斷該請求暫未被其他 Routing 網關處理。代碼如下 :
public static boolean isAlreadyRouted(ServerWebExchange exchange) {
    return exchange.getAttributeOrDefault(GATEWAY_ALREADY_ROUTED_ATTR, false);
}
           
  • 第 13 行 :設定該請求已經被處理。代碼如下 : 
public static void setAlreadyRouted(ServerWebExchange exchange) {
    exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true);
}
           
  • 第 18 行 :建立 Netty Request Method 對象。

    request#getMethod()

     傳回的不是 

    io.netty.handler.codec.http.HttpMethod

     ,是以需要進行轉換。
  • 第 21 行 :獲得 

    url

     。
  • 第 24 至 25 行 :建立 Netty Request Header 對象( 

    io.netty.handler.codec.http.DefaultHttpHeaders

     ),将請求的 Header 設定給它。
  • ——— 第 28 至 50 行 :調用 

    HttpClient#request(HttpMethod, String, Function)

     方法,請求後端 Http 服務。
  • 第 29 至 31 行 :建立 Netty Request 對象( 

    reactor.ipc.netty.http.client.HttpClientRequest

     )。
    • 第 29 行 :TODO 【3024】 NettyPipeline.SendOptions::flushOnEach
    • 第 30 行 :設定請求失敗( 後端服務傳回響應狀體碼 

      >= 400

       )時,不抛出異常。相關代碼如下 :
// HttpClientOperations#checkResponseCode(HttpResponse response)
// ... 省略無關代碼

if (code >= 400) {
	if (clientError) {
		if (log.isDebugEnabled()) {
			log.debug("{} Received Request Error, stop reading: {}",
					channel(),
					response.toString());
		}
		Exception ex = new HttpClientException(uri(), response);
		parentContext().fireContextError(ex);
		receive().subscribe();
		return false;
	}
	return true;
}
           
      • 通過設定 

        clientError = false

         ,第 51 行可以調用 

        Mono#doNext(Consumer)

         方法,統一訂閱處理傳回的 

        reactor.ipc.netty.http.client.HttpClientResponse

         對象。
    • 第 31 行 :設定 Netty Request 對象的 Header 。
  • 第 34 至 44 行 :【TODO 3025】目前是一個 BUG ,在 2.0.X 版本修複。見 FormIntegrationTests#formUrlencodedWorks() 單元測試的注釋說明。
  • 第 47 至 50 行 :請求後端的 Http 服務。
    • 第 47 行 :發送請求 Header 。
    • 第 48 至 50 行 :發送請求 Body 。其中中間的 

      #map(...)

       的過程為 

      Flux<DataBuffer> => ByteBuffer => Flux<DataBuffer>

       。
  • ——— 第 51 至 65 行 :請求後端 Http 服務完成,将 Netty Response 指派給響應 

    response

     。
  • 第 53 至 57 行 :建立 

    org.springframework.http.HttpHeaders

     對象,将 Netty Response Header 設定給它,而後設定回給響應 

    response

     。
  • 第 60 行 :設定響應 

    response

     的狀态碼。
  • 第 65 行 :設定 Netty Response 到 

    CLIENT_RESPONSE_ATTR

     。後續 NettyWriteResponseFilter 将 Netty Response 寫回給用戶端。
  • ——— 第 66 行 :送出過濾器鍊繼續過濾。

3. NettyWriteResponseFilter

org.springframework.cloud.gateway.filter.NettyWriteResponseFilter

 ,Netty 回寫響應網關過濾器。

#getOrder()

 方法,代碼如下 :

public static final int WRITE_RESPONSE_FILTER_ORDER = -1;
    
@Override
public int getOrder() {
    return WRITE_RESPONSE_FILTER_ORDER;
}
           
  • 傳回順序為 

    -1

     。在 《Spring-Cloud-Gateway 源碼解析 —— 過濾器 (4.1) 之 GatewayFilter 一覽》「3. GlobalFilter」 ,我們列舉了所有 GlobalFilter 的順序。

#filter(ServerWebExchange, GatewayFilterChain)

 方法,代碼如下 :

1: @Override
 2: public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
 3: 	// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added
 4: 	// until the WebHandler is run
 5: 	return chain.filter(exchange).then(Mono.defer(() -> {
 6: 	    // 獲得 Response
 7: 		HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);
 8: 		// HttpClientResponse clientResponse = getAttribute(exchange, CLIENT_RESPONSE_ATTR, HttpClientResponse.class);
 9: 		if (clientResponse == null) {
10: 			return Mono.empty();
11: 		}
12: 		log.trace("NettyWriteResponseFilter start");
13: 		ServerHttpResponse response = exchange.getResponse();
14: 
15: 		// 将 Netty Response 寫回給用戶端。
16: 		NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
17: 		//TODO: what if it's not netty
18: 		final Flux<NettyDataBuffer> body = clientResponse.receive()
19: 				.retain() // ByteBufFlux => ByteBufFlux
20: 				.map(factory::wrap); // ByteBufFlux  => Flux<NettyDataBuffer>
21: 		return response.writeWith(body);
22: 	}));
23: }
           
  • 第 5 行 :調用 

    #then(Mono)

     方法,實作 After Filter 邏輯。
  • 第 7 至 11 行 :從 

    CLIENT_RESPONSE_ATTR

     中,獲得 Netty Response 。
  • 第 15 至 21 行 :将 Netty Response 寫回給用戶端。因為 

    org.springframework.http.server.reactive#writeWith(Publisher<? extends DataBuffer>)

     需要的參數類型是 

    Publisher<? extends DataBuffer>

     ,是以【第 18 至 20 行】的轉換過程是 

    ByteBufFlux => Flux<NettyDataBuffer>

     。
    • 第 19 行 :TODO 【3024】ByteBufFlux#retain()

繼續閱讀