1. 概述
本文主要分享 NettyRoutingFilter 的代碼實作。
NettyRoutingFilter ,Netty 路由網關過濾器。其根據
http://
或
https://
字首( Scheme )過濾處理,使用基于 Netty 實作的 HttpClient 請求後端 Http 服務。
NettyWriteResponseFilter ,與 NettyRoutingFilter 成對使用的網關過濾器。其将 NettyRoutingFilter 請求後端 Http 服務的響應寫回用戶端。
大體流程如下 :

另外,Spring Cloud Gateway 實作了 WebClientHttpRoutingFilter / WebClientWriteResponseFilter ,功能上和 NettyRoutingFilter / NettyWriteResponseFilter 相同,差别在于基于
org.springframework.cloud.gateway.filter.WebClient
實作的 HttpClient 請求後端 Http 服務。在 《Spring-Cloud-Gateway 源碼解析 —— 過濾器 (4.8) 之 WebClientHttpRoutingFilter》 ,我們會詳細解析。
2. NettyRoutingFilter
org.springframework.cloud.gateway.filter.NettyRoutingFilter
,Netty 路由網關過濾器。
構造方法,代碼如下 :
public class NettyRoutingFilter implements GlobalFilter, Ordered {
private final HttpClient httpClient;
public NettyRoutingFilter(HttpClient httpClient) {
this.httpClient = httpClient;
}
}
-
屬性,基于 Netty 實作的 HttpClient 。通過該屬性,請求後端的 Http 服務。httpClient
#getOrder()
方法,代碼如下 :
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
- 傳回順序為
。在 《Spring-Cloud-Gateway 源碼解析 —— 過濾器 (4.1) 之 GatewayFilter 一覽》「3. GlobalFilter」 ,我們列舉了所有 GlobalFilter 的順序。Integer.MAX_VALUE
#filter(ServerWebExchange, GatewayFilterChain)
方法,代碼如下 :
1: @Override
2: public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
3: // 獲得 requestUrl
4: URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
5:
6: // 判斷是否能夠處理
7: String scheme = requestUrl.getScheme();
8: if (isAlreadyRouted(exchange) || (!scheme.equals("http") && !scheme.equals("https"))) {
9: return chain.filter(exchange);
10: }
11:
12: // 設定已經路由
13: setAlreadyRouted(exchange);
14:
15: ServerHttpRequest request = exchange.getRequest();
16:
17: // Request Method
18: final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString());
19:
20: // 獲得 url
21: final String url = requestUrl.toString();
22:
23: // Request Header
24: final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
25: request.getHeaders().forEach(httpHeaders::set);
26:
27: // 請求
28: return this.httpClient.request(method, url, req -> {
29: final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach)
30: .failOnClientError(false) // // 是否請求失敗,抛出異常
31: .headers(httpHeaders);
32:
33: // Request Form
34: if (MediaType.APPLICATION_FORM_URLENCODED.includes(request.getHeaders().getContentType())) {
35: return exchange.getFormData()
36: .flatMap(map -> proxyRequest.sendForm(form -> {
37: for (Map.Entry<String, List<String>> entry: map.entrySet()) {
38: for (String value : entry.getValue()) {
39: form.attr(entry.getKey(), value);
40: }
41: }
42: }).then())
43: .then(chain.filter(exchange));
44: }
45:
46: // Request Body
47: return proxyRequest.sendHeaders() //I shouldn't need this
48: .send(request.getBody()
49: .map(DataBuffer::asByteBuffer) // Flux<DataBuffer> => ByteBuffer
50: .map(Unpooled::wrappedBuffer)); // ByteBuffer => Flux<DataBuffer>
51: }).doOnNext(res -> {
52: ServerHttpResponse response = exchange.getResponse();
53: // Response Header
54: // put headers and status so filters can modify the response
55: HttpHeaders headers = new HttpHeaders();
56: res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
57: response.getHeaders().putAll(headers);
58:
59: // Response Status
60: response.setStatusCode(HttpStatus.valueOf(res.status().code()));
61:
62: // 設定 Response 到 CLIENT_RESPONSE_ATTR
63: // Defer committing the response until all route filters have run
64: // Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter
65: exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
66: }).then(chain.filter(exchange));
67: }
- 第 4 行 :獲得
。requestUrl
- 第 7 至 10 行 :判斷 ForwardRoutingFilter 是否能夠處理該請求,需要滿足兩個條件 :
-
或者http://
字首( Scheme ) 。https://
- 調用
方法,判斷該請求暫未被其他 Routing 網關處理。代碼如下 :ServerWebExchangeUtils#isAlreadyRouted(ServerWebExchange)
-
public static boolean isAlreadyRouted(ServerWebExchange exchange) {
return exchange.getAttributeOrDefault(GATEWAY_ALREADY_ROUTED_ATTR, false);
}
- 第 13 行 :設定該請求已經被處理。代碼如下 :
public static void setAlreadyRouted(ServerWebExchange exchange) {
exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true);
}
- 第 18 行 :建立 Netty Request Method 對象。
傳回的不是request#getMethod()
,是以需要進行轉換。io.netty.handler.codec.http.HttpMethod
- 第 21 行 :獲得
。url
- 第 24 至 25 行 :建立 Netty Request Header 對象(
),将請求的 Header 設定給它。io.netty.handler.codec.http.DefaultHttpHeaders
- ——— 第 28 至 50 行 :調用
方法,請求後端 Http 服務。HttpClient#request(HttpMethod, String, Function)
- 第 29 至 31 行 :建立 Netty Request 對象(
)。reactor.ipc.netty.http.client.HttpClientRequest
- 第 29 行 :TODO 【3024】 NettyPipeline.SendOptions::flushOnEach
- 第 30 行 :設定請求失敗( 後端服務傳回響應狀體碼
)時,不抛出異常。相關代碼如下 :>= 400
// HttpClientOperations#checkResponseCode(HttpResponse response)
// ... 省略無關代碼
if (code >= 400) {
if (clientError) {
if (log.isDebugEnabled()) {
log.debug("{} Received Request Error, stop reading: {}",
channel(),
response.toString());
}
Exception ex = new HttpClientException(uri(), response);
parentContext().fireContextError(ex);
receive().subscribe();
return false;
}
return true;
}
-
-
- 通過設定
,第 51 行可以調用clientError = false
方法,統一訂閱處理傳回的Mono#doNext(Consumer)
對象。reactor.ipc.netty.http.client.HttpClientResponse
- 通過設定
- 第 31 行 :設定 Netty Request 對象的 Header 。
-
- 第 34 至 44 行 :【TODO 3025】目前是一個 BUG ,在 2.0.X 版本修複。見 FormIntegrationTests#formUrlencodedWorks() 單元測試的注釋說明。
- 第 47 至 50 行 :請求後端的 Http 服務。
- 第 47 行 :發送請求 Header 。
- 第 48 至 50 行 :發送請求 Body 。其中中間的
的過程為#map(...)
。Flux<DataBuffer> => ByteBuffer => Flux<DataBuffer>
- ——— 第 51 至 65 行 :請求後端 Http 服務完成,将 Netty Response 指派給響應
。response
- 第 53 至 57 行 :建立
對象,将 Netty Response Header 設定給它,而後設定回給響應org.springframework.http.HttpHeaders
。response
- 第 60 行 :設定響應
的狀态碼。response
- 第 65 行 :設定 Netty Response 到
。後續 NettyWriteResponseFilter 将 Netty Response 寫回給用戶端。CLIENT_RESPONSE_ATTR
- ——— 第 66 行 :送出過濾器鍊繼續過濾。
3. NettyWriteResponseFilter
org.springframework.cloud.gateway.filter.NettyWriteResponseFilter
,Netty 回寫響應網關過濾器。
#getOrder()
方法,代碼如下 :
public static final int WRITE_RESPONSE_FILTER_ORDER = -1;
@Override
public int getOrder() {
return WRITE_RESPONSE_FILTER_ORDER;
}
- 傳回順序為
。在 《Spring-Cloud-Gateway 源碼解析 —— 過濾器 (4.1) 之 GatewayFilter 一覽》「3. GlobalFilter」 ,我們列舉了所有 GlobalFilter 的順序。-1
#filter(ServerWebExchange, GatewayFilterChain)
方法,代碼如下 :
1: @Override
2: public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
3: // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added
4: // until the WebHandler is run
5: return chain.filter(exchange).then(Mono.defer(() -> {
6: // 獲得 Response
7: HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);
8: // HttpClientResponse clientResponse = getAttribute(exchange, CLIENT_RESPONSE_ATTR, HttpClientResponse.class);
9: if (clientResponse == null) {
10: return Mono.empty();
11: }
12: log.trace("NettyWriteResponseFilter start");
13: ServerHttpResponse response = exchange.getResponse();
14:
15: // 将 Netty Response 寫回給用戶端。
16: NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
17: //TODO: what if it's not netty
18: final Flux<NettyDataBuffer> body = clientResponse.receive()
19: .retain() // ByteBufFlux => ByteBufFlux
20: .map(factory::wrap); // ByteBufFlux => Flux<NettyDataBuffer>
21: return response.writeWith(body);
22: }));
23: }
- 第 5 行 :調用
方法,實作 After Filter 邏輯。#then(Mono)
- 第 7 至 11 行 :從
中,獲得 Netty Response 。CLIENT_RESPONSE_ATTR
- 第 15 至 21 行 :将 Netty Response 寫回給用戶端。因為
需要的參數類型是org.springframework.http.server.reactive#writeWith(Publisher<? extends DataBuffer>)
,是以【第 18 至 20 行】的轉換過程是Publisher<? extends DataBuffer>
。ByteBufFlux => Flux<NettyDataBuffer>
- 第 19 行 :TODO 【3024】ByteBufFlux#retain()