SpringRSocket
前面我們讨論了如何使用WebFlux建構響應式Web服務。
WebFlux和WebMVC一樣,都是基于HTTP實作請求-響應式的互動方式。這種互動方式很簡單,但不夠靈活,也無法滿足日常開發中的所有需求。
首先,HTTP提供的請求-響應模式對于很多應用場景來說是不合适的。
典型的例子是消息推送,如果用戶端需要擷取最新的推送消息,就必須使用輪詢。
也就是說用戶端不停地發送請求到伺服器來檢查更新,這無疑造成了大量的資源浪費。
請求-響應模式的另外一個問題是,如果某個請求的響應時間過長,會導緻其他請求無法得到及時響應。因為整個處理過程是同步阻塞的,是以在高并發場景下,性能和響應性上也存在瓶頸。
當然,我們可以使用伺服器發送事件(Server-Sent Events,SSE)實作從服務端向用戶端推送消息,不過SSE也是一個建構在HTTP上的處理機制,一般隻用來傳送文本,提供的功能比較有限。
那麼,在網絡協定層上有沒有辦法來提供更加豐富的互動方式呢?答案是肯定的,這就是本節要讨論的RSocket協定。
RSocket協定
RSocket協定誕生于2015年,是一個與語言無關的二進制網絡協定,用來解決單一的請求-響應模式以及現有網絡傳輸協定所存在的問題。
RSocket以異步消息的方式提供了四種互動模式,除了請求-響應(request/response)模式之外,還包括請求-響應流(request/stream)、即發-即忘(fire-and-forget)和通道(channel)這三種新的互動模式,各個互動模型的功能特性如下所示。請求-響應模式:這是最典型也最常見的互動模式。發送方在發送消息給接收方之後,等待與之對應的響應消息。
請求-響應流模式:發送方的每個請求消息,都對應于接收方的一個消息流作為響應。
即發-即忘模式:發送方的請求消息沒有與之對應的響應。
通道模式:在發送方和接收方之間建立一個雙向傳輸的通道。
RSocket專門設計來和響應式程式設計技術風格的應用程式配合使用,在使用RSocket協定時,響應式程式設計體系中的資料流機制仍然有效。
為了更好地了解RSocket協定,讓我們将它與HTTP進行對比。
在前面,我們已經提到Servlet是基于HTTP的一套Java API規範,将HTTP請求轉化為一個ServletRequest對象,并将處理結果封裝成一個ServletResponse對象進行傳回。HTTP為了相容各種應用方式,本身有一定的複雜性,性能一般。
而RSocket采用的是自定義二進制協定,本身的定位就是高性能通信協定,性能上比HTTP高出一個數量級。
在互動模式上,與HTTP的單向請求-響應方式不同,RSocket倡導的是對等通信。可以了解為它不是僅對傳統互動模式的簡單改進,而是在兩端之間可以自由地互相發送和處理請求。RSocket協定的互動方式可以參考圖5-6。
圖5-6 RSocket協定的互動方式
RSocket程式設計模型
從程式設計模型上講,RSocket協定提供了一個同名的RSocket接口,如代碼清單5-28所示。
代碼清單5-28 RSocket接口實作代碼
public interface RSocket extends Availability, Closeable {
//推送元資訊,資料可以自定義
Mono<Void> metadataPush(Payload payload);
//請求-響應模式,發送一個請求并接收一個響應
Mono<Payload> requestResponse(Payload payload);
//即發-即忘模式,對請求-響應模式的優化,在不需要響應時非常有用
Mono<Void> fireAndForget(Payload payload);
//請求-響應流模式,類似于傳回集合的請求-響應模式,集合将以流的方式傳回,而不是
等到查詢完成
Flux<Payload> requestStream(Payload payload);
//通道模式,允許任意互動模型的雙向消息流
Flux<Payload> requestChannel(Publisher<Payload> payloads);
}
顯然,RSocket接口通過四個方法分别實作了四種互動模式,這裡的Payload代表的就是一種消息對象,由兩部分組成,即元資訊metadata和資料data,其概念類似于常見的消息通信中的消息頭和消息體。
注意到這些方法的傳回值都是Mono或Flux,說明RSocket協定是和響應式程式設計技術完全整合在一起的。我們發現fireAndForget()方法傳回的是一個Mono<Void>流,符合即發-即忘模式的語義。而requestStream()作為請求-響應流模式的實作,與requestResponse()的差別在于它的傳回值是一個Flux流,而不是一個Mono對象。
另外,與其他方法不同,requestChannel()方法傳回的并不是一個Payload消息對象,而是一個代表響應式流的Publisher對象,意味着這種模式下的輸入/輸出都是響應式流,也就是說可以實作用戶端和服務端之間的雙向互動。這點也和通道模式的定義一緻。
在使用RSocket接口時,開發人員可以完全實作它所提供的所有接口,但通常不推薦也不需要這麼做。在rsocket-core開發包中,針對RSocket接口有一個抽象的實作類AbstractRSocket,對上述方法做了簡單的實作封裝。在使用過程中,我們可以基于這個AbstractRSocket類來具體提供針對某一種互動模式的實作邏輯,而不需要實作RSocket接口中的所有方法。
接下來,我們将基于AbstractRSocket抽象類來展示使用RSocket協定的基本方法。和開發Web服務一樣,我們需要分别建構伺服器端和用戶端元件。
想要在應用程式中使用RSocket協定,我們需要引入如代碼清單5-29所示的依賴包。
代碼清單5-29 RSocket協定相關依賴包
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
</dependency>
在引入依賴包之後,代碼清單5-30展示了如何實作一個RSocket伺服器。
代碼清單5-30 RSocket伺服器建構代碼
RSocketFactory
.receive()
.acceptor((setupPayload, reactiveSocket) -> {
return Mono.just(new ServerJsonRSocket());
})
.transport(TcpServerTransport.create("localhost", 9090))
.start()
.subscribe();可以看到這裡使用了一個ServerJsonRSocket來對具體的請求進行處理,而這個Server-JsonRSocket就擴充了前面提到的AbstractRSocket抽象類,如代碼清單5-31所示。
代碼清單5-31 ServerJsonRSocket實作代碼
public class ServerJsonRSocket extends AbstractRSocket {
@Override
public Mono<Payload> requestResponse(Payload payload) {
String name = payload.getDataUtf8();
payload.release();
return Mono.just(DefaultPayload.create("{\"" + name +"\":
\"Hello\"}"));
}
}
這裡複寫了AbstractRSocket的requestResponse()接口,進而實作了請求-響應式的互動方式。現在,啟動這個Server類,系統就會在指定的端口等待請求的到來。
接下來,我們建構用戶端元件,如代碼清單5-32所示。
代碼清單5-32 RSocket用戶端建構代碼
RSocket rSocket = RSocketFactory.connect()
.dataMimeType("json")
.transport(TcpClientTransport.create("localhost",9090))
.start()
.block();
Payload payload = rSocket
.requestResponse(DefaultPayload.create("Spring"))
.block();
payload.release();通常,我們不會直接使用RSocket開發庫進行應用程式的開發,而是借助于特定的開發架構。接下來,我們将基于Spring RSocket來實作一個案例。
Spring RSocket案例分析
本小節将通過一個案例來展示Spring RSocket的使用方法。
我們将建構兩個獨立的服務rsocket-server和rsocket-client。其中,前者作為服務端對外暴露基于RSocket協定的通路入口;而後者則作為用戶端來消費服務。
從2.3版本開始,Spring Boot内置了spring-boot-starter-rsocket元件,簡化了RSocket服務暴露和引用的實作過程。想要在Spring Boot中使用RSocket協定,我們需要在rsocket-server和rsocket-client引入這一依賴,
如代碼清單5-33所示
代碼清單5-33 spring-boot-starter-rsocket依賴包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
然後,我們來建構一個請求-響應式互動方式。為此,我們在rsocketserver中添加一個簡單的ProviderController,如代碼清單5-34所示。
代碼清單5-34 ProviderController實作代碼
@Controller
public class ProviderController {
private Logger log =LoggerFactory.getLogger(ProviderController.class);
@MessageMapping("/request-response")
public Mono<String> requestResponse(String request) {
log.info("receive request:{}", request);
return Mono.just("Hello " + request);
}
}
注意到這裡引入了一個新的注解@MessageMapping,跟@RequestMapping注解類似,@MessageMapping是Spring提供的,用來指定WebSocket、RSocket等協定中消息處理的目的地。然後,我們輸入了一個String類型的參數并傳回一個Mono對象,符合請求-響應互動模式的定義。
為了通路這個RSocket端點,我們接下來實作rsocket-client。SpringRSocket為開發人員提供了一個用于發起遠端請求的RSocketRequester接口。
該接口提供了一組非常有用的工具方法,包括用于實作請求路由的route()方法。同時,Spring RSocket還提供了一個專門用于建構RSocketRequester的Builder接口。想要通過Builder接口來建立一個RSocket-Requester對象,可以使用如代碼清單5-35所示的實作方法。
代碼清單5-35 RSocketRequester對象建立代碼
@Autowired
RSocketRequester.Builder builder;
RSocketRequester requester = builder
.dataMimeType(MimeTypeUtils.TEXT_PLAIN)
.connect(TcpClientTransport.create(7000))
.block();
建立RSocketRequester更常見的做法是準備一個配置類,并在該類中通過Builder接口設定RSocketRequester的一些屬性,如代碼清單5-36所示。
代碼清單5-36 RSocketRequester配置代碼
@Configuration
public class RSocketClientConfiguration {
@Bean
public RSocketRequester
getRSocketRequester(RSocketRequester.Builder builder) {
return builder
.rsocketConnector(rSocketConnector ->
rSocketConnector
.reconnect(Retry.fixedDelay(2,
Duration.ofSeconds(2))))
.dataMimeType(MediaType.APPLICATION_JSON)
.transport(TcpClientTransport.create(7000));
}
}
這裡我們設定了請求的重試時間、序列化格式以及伺服器端口。基于這個RSocket-Requester對象,我們就可以通過它的route()方法路由到前面通過@MessageMapping注解建構的/request-response端點。為此,我們也在rsocket-client中添加一個ConsumerController,如代碼清單5-37所示。
代碼清單5-37 ConsumerController實作代碼
@RestController
public class ConsumerController {
private Logger log =
LoggerFactory.getLogger(ConsumerController.class);
@Autowired
private RSocketRequester rSocketRequester;
@GetMapping(value = "/requestResponse/{message}")
public Mono<String> requestResponse(@PathVariable("message")
String message) { log.info("send request:{}", message);
return rSocketRequester
.route("/request-response")
.data(message)
.retrieveMono(String.class);
}
}
這裡,我們通過data()方法傳遞請求參數,并通過retrieveMono()方法擷取響應結果。上述請求-響應方式的處理結果顯而易見。而如果想要實作即發-即忘的互動方式,那麼對于伺服器端而言,隻需要傳回一個空對象即可,如代碼清單5-38所示。
代碼清單5-38 即發-即忘互動模式服務端實作代碼
@MessageMapping("/fire-and-forget")
public Mono<Void> fireAndForget(String request) {
log.info("receive request:{}", request);
return Mono.empty();
}
而對于用戶端而言,我們也可以簡單通過一個send()方法來發送請求而不擷取結果,如代碼清單5-39所示。
代碼清單5-39 即發-即忘互動模式用戶端實作代碼
@GetMapping(value = "/fireAndForget/{message}")
public Mono<Void> fireAndForget(@PathVariable("message") String
message) {
log.info("send request:{}", message);
return rSocketRequester
.route("/fire-and-forget")
.data(message) .send();
}
接下來,我們再來看一個請求-響應流的示例,伺服器端實作如代碼清單5-40所示。
代碼清單5-40 請求-響應流互動模式服務端實作代碼
MessageMapping("/request-stream")
public Flux<String> requestStream(String request) {
log.info("receive request:{}", request);
return Flux.just("hello", request, "welcome");
}
這裡我們根據傳入的請求參數request建構了包含三個元素的Flux傳回對象。是以,在用戶端中就需要使用retrieveFlux()方法來擷取響應結果,如代碼清單5-41所示。
代碼清單5-41 請求-響應流互動模式用戶端實作代碼
@GetMapping(value = "/requestStream/{message}", produces =
MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> requestStream(@PathVariable("message") String
message) {
return rSocketRequester
.route("/request-stream")
.data(message)
.retrieveFlux(String.class);
}
請注意,這裡我們在@GetMapping注解中指定了produces參數為MediaType.TEXT_EVENT_STREAM_VALUE,表明伺服器傳回的資料是由若幹個文本型的流元素組成,每個元素用\n\n分割,如代碼清單5-42所示。
代碼清單5-42 請求-響應流互動模式輸出結果
data:hello
data:tianyalan
data:welcome
本文給大家講解的内容是springweb服務應用響應式Web開發元件:Spring RSocket
- 下文給大家講解的是springweb服務應用響應式Web開發元件:響應式Web實戰經驗