天天看點

頭發掉完那天終于把二進制網絡協定SpringRSocket給搞懂了!

SpringRSocket

前面我們讨論了如何使用WebFlux建構響應式Web服務。

WebFlux和WebMVC一樣,都是基于HTTP實作請求-響應式的互動方式。這種互動方式很簡單,但不夠靈活,也無法滿足日常開發中的所有需求。

首先,HTTP提供的請求-響應模式對于很多應用場景來說是不合适的。

典型的例子是消息推送,如果用戶端需要擷取最新的推送消息,就必須使用輪詢。

也就是說用戶端不停地發送請求到伺服器來檢查更新,這無疑造成了大量的資源浪費。

請求-響應模式的另外一個問題是,如果某個請求的響應時間過長,會導緻其他請求無法得到及時響應。因為整個處理過程是同步阻塞的,是以在高并發場景下,性能和響應性上也存在瓶頸。

當然,我們可以使用伺服器發送事件(Server-Sent Events,SSE)實作從服務端向用戶端推送消息,不過SSE也是一個建構在HTTP上的處理機制,一般隻用來傳送文本,提供的功能比較有限。

那麼,在網絡協定層上有沒有辦法來提供更加豐富的互動方式呢?答案是肯定的,這就是本節要讨論的RSocket協定。

RSocket協定

RSocket協定誕生于2015年,是一個與語言無關的二進制網絡協定,用來解決單一的請求-響應模式以及現有網絡傳輸協定所存在的問題。

RSocket以異步消息的方式提供了四種互動模式,除了請求-響應(request/response)模式之外,還包括請求-響應流(request/stream)、即發-即忘(fire-and-forget)和通道(channel)這三種新的互動模式,各個互動模型的功能特性如下所示。請求-響應模式:這是最典型也最常見的互動模式。發送方在發送消息給接收方之後,等待與之對應的響應消息。

請求-響應流模式:發送方的每個請求消息,都對應于接收方的一個消息流作為響應。

即發-即忘模式:發送方的請求消息沒有與之對應的響應。

通道模式:在發送方和接收方之間建立一個雙向傳輸的通道。

RSocket專門設計來和響應式程式設計技術風格的應用程式配合使用,在使用RSocket協定時,響應式程式設計體系中的資料流機制仍然有效。

為了更好地了解RSocket協定,讓我們将它與HTTP進行對比。

在前面,我們已經提到Servlet是基于HTTP的一套Java API規範,将HTTP請求轉化為一個ServletRequest對象,并将處理結果封裝成一個ServletResponse對象進行傳回。HTTP為了相容各種應用方式,本身有一定的複雜性,性能一般。

而RSocket采用的是自定義二進制協定,本身的定位就是高性能通信協定,性能上比HTTP高出一個數量級。

在互動模式上,與HTTP的單向請求-響應方式不同,RSocket倡導的是對等通信。可以了解為它不是僅對傳統互動模式的簡單改進,而是在兩端之間可以自由地互相發送和處理請求。RSocket協定的互動方式可以參考圖5-6。

頭發掉完那天終于把二進制網絡協定SpringRSocket給搞懂了!

圖5-6 RSocket協定的互動方式

RSocket程式設計模型

從程式設計模型上講,RSocket協定提供了一個同名的RSocket接口,如代碼清單5-28所示。

代碼清單5-28 RSocket接口實作代碼

public interface RSocket extends Availability, Closeable {

//推送元資訊,資料可以自定義

Mono<Void> metadataPush(Payload payload);

//請求-響應模式,發送一個請求并接收一個響應

Mono<Payload> requestResponse(Payload payload);

//即發-即忘模式,對請求-響應模式的優化,在不需要響應時非常有用

Mono<Void> fireAndForget(Payload payload);

//請求-響應流模式,類似于傳回集合的請求-響應模式,集合将以流的方式傳回,而不是

等到查詢完成

Flux<Payload> requestStream(Payload payload);

//通道模式,允許任意互動模型的雙向消息流

Flux<Payload> requestChannel(Publisher<Payload> payloads);

}

顯然,RSocket接口通過四個方法分别實作了四種互動模式,這裡的Payload代表的就是一種消息對象,由兩部分組成,即元資訊metadata和資料data,其概念類似于常見的消息通信中的消息頭和消息體。

注意到這些方法的傳回值都是Mono或Flux,說明RSocket協定是和響應式程式設計技術完全整合在一起的。我們發現fireAndForget()方法傳回的是一個Mono<Void>流,符合即發-即忘模式的語義。而requestStream()作為請求-響應流模式的實作,與requestResponse()的差別在于它的傳回值是一個Flux流,而不是一個Mono對象。

另外,與其他方法不同,requestChannel()方法傳回的并不是一個Payload消息對象,而是一個代表響應式流的Publisher對象,意味着這種模式下的輸入/輸出都是響應式流,也就是說可以實作用戶端和服務端之間的雙向互動。這點也和通道模式的定義一緻。

在使用RSocket接口時,開發人員可以完全實作它所提供的所有接口,但通常不推薦也不需要這麼做。在rsocket-core開發包中,針對RSocket接口有一個抽象的實作類AbstractRSocket,對上述方法做了簡單的實作封裝。在使用過程中,我們可以基于這個AbstractRSocket類來具體提供針對某一種互動模式的實作邏輯,而不需要實作RSocket接口中的所有方法。

接下來,我們将基于AbstractRSocket抽象類來展示使用RSocket協定的基本方法。和開發Web服務一樣,我們需要分别建構伺服器端和用戶端元件。

想要在應用程式中使用RSocket協定,我們需要引入如代碼清單5-29所示的依賴包。

代碼清單5-29 RSocket協定相關依賴包

<dependency>

<groupId>io.rsocket</groupId>

<artifactId>rsocket-core</artifactId>

</dependency>

<dependency>

<groupId>io.rsocket</groupId>

<artifactId>rsocket-transport-netty</artifactId>

</dependency>

在引入依賴包之後,代碼清單5-30展示了如何實作一個RSocket伺服器。

代碼清單5-30 RSocket伺服器建構代碼

RSocketFactory

.receive()

.acceptor((setupPayload, reactiveSocket) -> {

return Mono.just(new ServerJsonRSocket());

})

.transport(TcpServerTransport.create("localhost", 9090))

.start()

.subscribe();可以看到這裡使用了一個ServerJsonRSocket來對具體的請求進行處理,而這個Server-JsonRSocket就擴充了前面提到的AbstractRSocket抽象類,如代碼清單5-31所示。

代碼清單5-31 ServerJsonRSocket實作代碼

public class ServerJsonRSocket extends AbstractRSocket {

@Override

public Mono<Payload> requestResponse(Payload payload) {

String name = payload.getDataUtf8();

payload.release();

return Mono.just(DefaultPayload.create("{\"" + name +"\":

\"Hello\"}"));

}

}

這裡複寫了AbstractRSocket的requestResponse()接口,進而實作了請求-響應式的互動方式。現在,啟動這個Server類,系統就會在指定的端口等待請求的到來。

接下來,我們建構用戶端元件,如代碼清單5-32所示。

代碼清單5-32 RSocket用戶端建構代碼

RSocket rSocket = RSocketFactory.connect()

.dataMimeType("json")

.transport(TcpClientTransport.create("localhost",9090))

.start()

.block();

Payload payload = rSocket

.requestResponse(DefaultPayload.create("Spring"))

.block();

payload.release();通常,我們不會直接使用RSocket開發庫進行應用程式的開發,而是借助于特定的開發架構。接下來,我們将基于Spring RSocket來實作一個案例。

Spring RSocket案例分析

本小節将通過一個案例來展示Spring RSocket的使用方法。

我們将建構兩個獨立的服務rsocket-server和rsocket-client。其中,前者作為服務端對外暴露基于RSocket協定的通路入口;而後者則作為用戶端來消費服務。

從2.3版本開始,Spring Boot内置了spring-boot-starter-rsocket元件,簡化了RSocket服務暴露和引用的實作過程。想要在Spring Boot中使用RSocket協定,我們需要在rsocket-server和rsocket-client引入這一依賴,

如代碼清單5-33所示

代碼清單5-33 spring-boot-starter-rsocket依賴包

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-rsocket</artifactId>

</dependency>

然後,我們來建構一個請求-響應式互動方式。為此,我們在rsocketserver中添加一個簡單的ProviderController,如代碼清單5-34所示。

代碼清單5-34 ProviderController實作代碼

@Controller

public class ProviderController {

private Logger log =LoggerFactory.getLogger(ProviderController.class);

@MessageMapping("/request-response")

public Mono<String> requestResponse(String request) {

log.info("receive request:{}", request);

return Mono.just("Hello " + request);

}

}

注意到這裡引入了一個新的注解@MessageMapping,跟@RequestMapping注解類似,@MessageMapping是Spring提供的,用來指定WebSocket、RSocket等協定中消息處理的目的地。然後,我們輸入了一個String類型的參數并傳回一個Mono對象,符合請求-響應互動模式的定義。

為了通路這個RSocket端點,我們接下來實作rsocket-client。SpringRSocket為開發人員提供了一個用于發起遠端請求的RSocketRequester接口。

該接口提供了一組非常有用的工具方法,包括用于實作請求路由的route()方法。同時,Spring RSocket還提供了一個專門用于建構RSocketRequester的Builder接口。想要通過Builder接口來建立一個RSocket-Requester對象,可以使用如代碼清單5-35所示的實作方法。

代碼清單5-35 RSocketRequester對象建立代碼

@Autowired

RSocketRequester.Builder builder;

RSocketRequester requester = builder

.dataMimeType(MimeTypeUtils.TEXT_PLAIN)

.connect(TcpClientTransport.create(7000))

.block();

建立RSocketRequester更常見的做法是準備一個配置類,并在該類中通過Builder接口設定RSocketRequester的一些屬性,如代碼清單5-36所示。

代碼清單5-36 RSocketRequester配置代碼

@Configuration

public class RSocketClientConfiguration {

@Bean

public RSocketRequester

getRSocketRequester(RSocketRequester.Builder builder) {

return builder

.rsocketConnector(rSocketConnector ->

rSocketConnector

.reconnect(Retry.fixedDelay(2,

Duration.ofSeconds(2))))

.dataMimeType(MediaType.APPLICATION_JSON)

.transport(TcpClientTransport.create(7000));

}

}

這裡我們設定了請求的重試時間、序列化格式以及伺服器端口。基于這個RSocket-Requester對象,我們就可以通過它的route()方法路由到前面通過@MessageMapping注解建構的/request-response端點。為此,我們也在rsocket-client中添加一個ConsumerController,如代碼清單5-37所示。

代碼清單5-37 ConsumerController實作代碼

@RestController

public class ConsumerController {

private Logger log =

LoggerFactory.getLogger(ConsumerController.class);

@Autowired

private RSocketRequester rSocketRequester;

@GetMapping(value = "/requestResponse/{message}")

public Mono<String> requestResponse(@PathVariable("message")

String message) { log.info("send request:{}", message);

return rSocketRequester

.route("/request-response")

.data(message)

.retrieveMono(String.class);

}

}

這裡,我們通過data()方法傳遞請求參數,并通過retrieveMono()方法擷取響應結果。上述請求-響應方式的處理結果顯而易見。而如果想要實作即發-即忘的互動方式,那麼對于伺服器端而言,隻需要傳回一個空對象即可,如代碼清單5-38所示。

代碼清單5-38 即發-即忘互動模式服務端實作代碼

@MessageMapping("/fire-and-forget")

public Mono<Void> fireAndForget(String request) {

log.info("receive request:{}", request);

return Mono.empty();

}

而對于用戶端而言,我們也可以簡單通過一個send()方法來發送請求而不擷取結果,如代碼清單5-39所示。

代碼清單5-39 即發-即忘互動模式用戶端實作代碼

@GetMapping(value = "/fireAndForget/{message}")

public Mono<Void> fireAndForget(@PathVariable("message") String

message) {

log.info("send request:{}", message);

return rSocketRequester

.route("/fire-and-forget")

.data(message) .send();

}

接下來,我們再來看一個請求-響應流的示例,伺服器端實作如代碼清單5-40所示。

代碼清單5-40 請求-響應流互動模式服務端實作代碼

MessageMapping("/request-stream")

public Flux<String> requestStream(String request) {

log.info("receive request:{}", request);

return Flux.just("hello", request, "welcome");

}

這裡我們根據傳入的請求參數request建構了包含三個元素的Flux傳回對象。是以,在用戶端中就需要使用retrieveFlux()方法來擷取響應結果,如代碼清單5-41所示。

代碼清單5-41 請求-響應流互動模式用戶端實作代碼

@GetMapping(value = "/requestStream/{message}", produces =

MediaType.TEXT_EVENT_STREAM_VALUE)

public Flux<String> requestStream(@PathVariable("message") String

message) {

return rSocketRequester

.route("/request-stream")

.data(message)

.retrieveFlux(String.class);

}

請注意,這裡我們在@GetMapping注解中指定了produces參數為MediaType.TEXT_EVENT_STREAM_VALUE,表明伺服器傳回的資料是由若幹個文本型的流元素組成,每個元素用\n\n分割,如代碼清單5-42所示。

代碼清單5-42 請求-響應流互動模式輸出結果

data:hello

data:tianyalan

data:welcome

本文給大家講解的内容是springweb服務應用響應式Web開發元件:Spring RSocket

  • 下文給大家講解的是springweb服務應用響應式Web開發元件:響應式Web實戰經驗

繼續閱讀