一、前言
- 响应式编程是啥?
- 为啥要有响应式编程?
- 响应式流的核心机制是什么?
- Spring 响应式编程能解决我们平时开发的什么痛点?
- Spring 响应式编程有哪些应用场景?
- Spring 响应式编程未来的趋势如何?
开篇六连问,等咱们熟悉完再来真香也不迟,我们废话少说,直接来畅游 Spring 响应式编程的世界。
二、响应式编程是啥?
在计算中,响应式编程或反应式编程(Reactive programming)是一种面向数据串流和变化传播的声明式编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
有点抽象?没有关系,老周这就来说道说道。核心的一点响应式编程是声明式编程范式,对命令式编程进行替代的一个范例,这种替代的存在是因为响应式编程解决了命令式编程的限制。大多数开发者都是命令式编程起步的,你写的代码就是一行接一行的指令,按照它们的顺序一次一条地出现。一个任务被执行,程序就需要等到它执行完了,才能执行下一个任务。每一步,数据都需要完全获取到了才能被处理,因此它需要作为一个整体来处理。
命令式编程有个最大的弊端是:当正在执行的任务被阻塞了,特别是一个 IO 任务,例如将数据写入到数据库或从远程服务器获取数据,那么调用该任务的线程将无法做任何事情,直到任务完成。说白了,阻塞的线程就是一种浪费,在如今的环境,线程的资源是那么的宝贵。
相反,响应式编程是函数式和声明式的。响应式编程涉及描述通过该数据流的 pipeline 或 stream,而不是描述的一组按顺序执行的步骤。响应式流处理数据时只要数据是可用的就进行处理,而不是需要将数据作为一个整体进行提供。
三、为啥要有响应式编程?
我们上面也说了命令式编程会线程阻塞,而响应式编程是声明式编程范式的,是对命令式编程进行替代的一个范例。
对于命令式编程的同步阻塞,其实业界是有一些处理方案的,比如在 Java 中,为了实现异步非阻塞,一般会采用回调和 Future 这两种机制,但这两种机制都存在一定局限性。
3.1 回调机制
我们来看下面这个图:
服务 B 的 methodB() 方法调用服务 A 的 methodA() 方法,然后服务 A 的 methodA() 方法执行完毕后,再主动调用服务 B 的 callback() 方法。
回调体现的是一种双向的调用方式,实现了服务 A 和服务 B 之间的解耦。在这个 callback 回调方法中,回调的执行是由任务的结果来触发的,所以我们就可以异步来执行某项任务,从而使得调用链路不发生任何的阻塞。
回调的最大问题是复杂性,一旦在执行流程中包含了多层的异步执行和回调,那么就会形成一种嵌套结构,给代码的开发和调试带来很大的挑战。所以回调很难大规模地组合起来使用,因为很快就会导致代码难以理解和维护,从而造成所谓的“回调地狱”问题。之前公司就遇到代码“回调地狱”问题,十几层的回调,后面的人进来维护估计会吐。
3.2 Future 机制
我们再来看看 Future 这种机制,有一个需要处理的任务,然后把这个任务提交到 Future,Future 就会在一定时间内完成这个任务,而在这段时间内我们可以去做其他事情。下面我们来看看来自 Doug Lea 大神在 Java 中的 Future 接口设计:
我们可以看到,大神在上面的设计来达到一定的异步执行效果。但从本质上讲,Future 以及由 Future 所衍生出来的 CompletableFuture 等各种优化方案就是一种多线程技术。多线程假设一些线程可以共享一个 CPU,而 CPU 时间能在多个线程之间共享,这一点就引入了“上下文切换”的概念。
如果想要恢复线程,就需要涉及加载和保存寄存器等一系列计算密集型的操作。因此,大量线程之间的相互协作同样会导致资源利用效率低下。
3.3 响应式编程实现方法
3.3.1 数据流与响应式
数据流就是数据像水流一样源源不断的输入过来,而系统的响应能力就体现在对这些数据流的即时响应过程上。我们可以不采用传统的同步调用方式来处理数据,而是由处于数据库上游的各层组件自动来执行事件,从web到service再到dao层,这个过程就像水流一样,整个数据传递链路都应该是采用事件驱动的方式来进行运作的,这个过程都应该是异步非阻塞的,这就是响应式编程的核心特点。
相较传统开发所普遍采用的“拉”模式,在响应式编程下,基于事件的触发和订阅机制,这就形成了一种类似“推”的工作方式。说白了,就类似现在的 Kafka 等消息引擎,大部分都采用事件驱动的 pub/sub 模式的架构。这种模式的最大优势是生成事件和消费事件的过程是异步执行的,意味着资源之间的竞争关系较少,故服务器的响应能力也就越高。
3.3.2 响应式宣言
响应式宣言是一份构建现代云扩展架构的处方。这个框架主要使用消息驱动的方法来构建系统,在形式上可以达到弹性和韧性,最后可以产生响应性的价值。所谓弹性和韧性,通俗来说就像是橡皮筋,弹性是指橡皮筋可以拉长,而韧性指在拉长后可以缩回原样。
- 响应性: :只要有可能,系统就会及时地做出响应。即时响应是可用性和实用性的基石,而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。即时响应的系统专注于提供快速而一致的响应时间,确立可靠的反馈上限,以提供一致的服务质量。这种一致的行为转而将简化错误处理、建立最终用户的信任并促使用户与系统作进一步的互动。
- 韧性:系统在出现失败时依然保持即时响应性。这不仅适用于高可用的、任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。回弹性是通过复制、遏制、隔离以及委托来实现的。失败的扩散被遏制在了每个组件内部,与其他组件相互隔离,从而确保系统某部分的失败不会危及整个系统,并能独立恢复。每个组件的恢复都被委托给了另一个(外部的)组件,此外,在必要时可以通过复制来保证高可用性。(因此)组件的客户端不再承担组件失败的处理。
- 弹性: 系统在不断变化的工作负载之下依然保持即时响应性。反应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。这意味着设计上并没有争用点和中央瓶颈,得以进行组件的分片或者复制,并在它们之间分布输入(负载)。通过提供相关的实时性能指标,反应式系统能支持预测式以及反应式的伸缩算法。这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性。
- 消息驱动:反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。这一边界还提供了将失败作为消息委托出去的手段。使用显式的消息传递,可以通过在系统中塑造并监视消息流队列,并在必要时应用回压,从而实现负载管理、 弹性以及流量控制。使用位置透明的消息传递作为通信的手段, 得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。非阻塞的通信使得接收者可以只在活动时才消耗资源,从而减少系统开销。
问题:消息驱动与上面提到的事件驱动有啥区别呢?
响应式宣言指出了两者的区别:“消息驱动”中消息数据被送往明确的目的地址,有固定导向;“事件驱动”是事件向达到某个给定状态的组件发出的信号,没有固定导向,只有被观察的数据。
- 在一个消息驱动系统中,可寻址的接收者等待消息的到来然后响应消息,否则保持休眠状态,消息驱动系统专注于可寻址的接收者。响应式系统更加关注分布式系统的通信和协作以达到解耦、异步的特性,满足系统的弹性和容错性,所以响应式系统更倾向于使用消息驱动模式。
- 在一个事件驱动系统中,通知的监听者被绑定到消息源上。这样当消息被发出时,它就会被调用,所以,响应式编程更倾向于事件驱动。
WebFlux 简介
WebFlux 是 Spring Framework5.0 中引入的一种新的反应式Web框架。通过Reactor项目实现Reactive Streams规范,完全异步和非阻塞框架。本身不会加快程序执行速度,但在高并发情况下借助异步IO能够以少量而稳定的线程处理更高的吞吐,规避文件IO/网络IO阻塞带来的线程堆积。
1.1 WebFlux 的特性
WebFlux 具有以下特性:
- 异步非阻塞 - 可以举一个上传例子。相对于 Spring MVC 是同步阻塞IO模型,Spring WebFlux这样处理:线程发现文件数据没传输好,就先做其他事情,当文件准备好时通知线程来处理(这里就是输入非阻塞方式),当接收完并写入磁盘(该步骤也可以采用异步非阻塞方式)完毕后再通知线程来处理响应(这里就是输出非阻塞方式)。
- 响应式函数编程 - 相对于Java8 Stream 同步、阻塞的Pull模式,Spring Flux 采用Reactor Stream 异步、非阻塞Push模式。书写采用 Java lambda 方式,接近自然语言形式且容易理解。
- 不拘束于Servlet - 可以运行在传统的Servlet 容器(3.1+版本),还能运行在Netty、Undertow等NIO容器中。
1.2 WebFlux 的设计目标
- 适用高并发
- 高吞吐量
- 可伸缩性
二、Spring WebFlux 组件介绍
2.1 HTTPHandler
一个简单的处理请求和响应的抽象,用来适配不同HTTP服务容器的API。
2.2 WebHandler
一个用于处理业务请求抽象接口,定义了一系列处理行为。相关核心实现类如下;
2.3 DispatcherHandler
请求处理的总控制器,实际工作是由多个可配置的组件来处理。
WebFlux是兼容Spring MVC 基于@Controller,@RequestMapping等注解的编程开发方式的,可以做到平滑切换。
2.4 Functional Endpoints
这是一个轻量级函数编程模型。是基于@Controller,@RequestMapping等注解的编程模型的替代方案,提供一套函数式API 用于创建Router,Handler和Filter。调用处理组件如下:
简单的RouterFuntion 路由注册和业务处理过程:
java复制代码@Bean
public RouterFunction<ServerResponse> initRouterFunction() {
return RouterFunctions.route()
.GET("/hello/{name}", serverRequest -> {
String name = serverRequest.pathVariable("name");
return ServerResponse.ok().bodyValue(name);
}).build();
}
请求转发处理过程:
2.5 Reactive Stream
这是一个重要的组件,WebFlux 就是利用Reactor 来重写了传统Spring MVC 逻辑。其中Flux和Mono 是Reactor中两个关键概念。掌握了这两个概念才能理解WebFlux工作方式。
Flux和Mono 都实现了Reactor的Publisher接口,属于时间发布者,对消费者提供订阅接口,当有事件发生的时候,Flux或者Mono会通过回调消费者的相应的方法来通知消费者相应的事件。这就是所谓的响应式编程模型。
Mono工作流程图
只会在发送出单个结果后完成。
Flux工作流程图
发送出零个或者多个,可能无限个结果后才完成。
java复制代码对于流式媒体类型:application/stream+json 或者 text/event-stream ,可以让调用端获得服务器滚动结果。
对于非流类型:application/json WebFlux 默认JSON编码器会将序列化的JSON 一次性刷新到网络,这并不意味着阻塞,因为结果Flux<?> 是以反应式方式写入网络的,没有任何障碍。
三、WebFlux 工作原理
3.1 组件装配过程
流程相关源码解析-WebFluxAutoConfiguration
java复制代码@Configuration
//条件装配 只有启动的类型是REACTIVE时加载
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
//只有存在 WebFluxConfigurer实例 时加载
@ConditionalOnClass(WebFluxConfigurer.class)
//在不存在 WebFluxConfigurationSupport实例时 加载
@ConditionalOnMissingBean({ WebFluxConfigurationSupport.class })
//在之后装配
@AutoConfigureAfter({ ReactiveWebServerFactoryAutoConfiguration.class,
CodecsAutoConfiguration.class, ValidationAutoConfiguration.class })
//自动装配顺序
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
public class WebFluxAutoConfiguration {
@Configuration
@EnableConfigurationProperties({ ResourceProperties.class, WebFluxProperties.class })
//接口编程 在装配WebFluxConfig 之前要先 装配EnableWebFluxConfiguration
@Import({ EnableWebFluxConfiguration.class })
public static class WebFluxConfig implements WebFluxConfigurer {
//隐藏部分源码
/**
* Configuration equivalent to {@code @EnableWebFlux}.
*/
}
@Configuration
public static class EnableWebFluxConfiguration
extends DelegatingWebFluxConfiguration {
//隐藏部分代码
}
@Configuration
@ConditionalOnEnabledResourceChain
static class ResourceChainCustomizerConfiguration {
//隐藏部分代码
}
private static class ResourceChainResourceHandlerRegistrationCustomizer
implements ResourceHandlerRegistrationCustomizer {
//隐藏部分代码
}
WebFluxAutoConfiguration 自动装配时先自动装配EnableWebFluxConfiguration 而EnableWebFluxConfiguration->DelegatingWebFluxConfiguration ->WebFluxConfigurationSupport。
最终WebFluxConfigurationSupport 不仅配置DispatcherHandler 还同时配置了其他很多WebFlux核心组件包括 异常处理器WebExceptionHandler,映射处理器处理器HandlerMapping,请求适配器HandlerAdapter,响应处理器HandlerResultHandler 等。
DispatcherHandler 创建初始化过程如下;
java复制代码public class WebFluxConfigurationSupport implements ApplicationContextAware {
//隐藏部分代码
@Nullable
public final ApplicationContext getApplicationContext() {
return this.applicationContext;
}
//隐藏部分代码
@Bean
public DispatcherHandler webHandler() {
return new DispatcherHandler();
}
java复制代码public class DispatcherHandler implements WebHandler, ApplicationContextAware {
@Nullable
private List<HandlerMapping> handlerMappings;
@Nullable
private List<HandlerAdapter> handlerAdapters;
@Nullable
private List<HandlerResultHandler> resultHandlers;
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
initStrategies(applicationContext);
}
protected void initStrategies(ApplicationContext context) {
//注入handlerMappings
Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerMapping.class, true, false);
ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
AnnotationAwareOrderComparator.sort(mappings);
this.handlerMappings = Collections.unmodifiableList(mappings);
//注入handlerAdapters
Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerAdapter.class, true, false);
this.handlerAdapters = new ArrayList<>(adapterBeans.values());
AnnotationAwareOrderComparator.sort(this.handlerAdapters);
//注入resultHandlers
Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerResultHandler.class, true, false);
this.resultHandlers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(this.resultHandlers);
}
**流程相关源码解析-**HTTPHandlerAutoConfiguration
上面已讲解过WebFlux 核心组件装载过程,那么这些组件又是什么时候注入到对应的容器上下文中的呢?其实是在刷新容器上下文时注入进去的。
org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#onRefresh
java复制代码public class ReactiveWebServerApplicationContext extends GenericReactiveWebApplicationContext
implements ConfigurableWebServerApplicationContext {
@Override
protected void onRefresh() {
super.onRefresh();
try {
createWebServer();
}
catch (Throwable ex) {
throw new ApplicationContextException("Unable to start reactive web server", ex);
}
}
private void createWebServer() {
WebServerManager serverManager = this.serverManager;
if (serverManager == null) {
String webServerFactoryBeanName = getWebServerFactoryBeanName();
ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);
boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit();
// 这里创建容器管理时注入httpHandler
this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit);
getBeanFactory().registerSingleton("webServerGracefulShutdown",
new WebServerGracefulShutdownLifecycle(this.serverManager));
// 注册一个 web容器启动服务类,该类继承了SmartLifecycle
getBeanFactory().registerSingleton("webServerStartStop",
new WebServerStartStopLifecycle(this.serverManager));
}
initPropertySources();
}
protected HttpHandler getHttpHandler() {
String[] beanNames = getBeanFactory().getBeanNamesForType(HttpHandler.class);
if (beanNames.length == 0) {
throw new ApplicationContextException(
"Unable to start ReactiveWebApplicationContext due to missing HttpHandler bean.");
}
if (beanNames.length > 1) {
throw new ApplicationContextException(
"Unable to start ReactiveWebApplicationContext due to multiple HttpHandler beans : "
+ StringUtils.arrayToCommaDelimitedString(beanNames));
}
//容器上下文获取httpHandler
return getBeanFactory().getBean(beanNames[0], HttpHandler.class);
}
而这个HTTPHandler 是由HTTPHandlerAutoConfiguration装配进去的。
java复制代码@Configuration
@ConditionalOnClass({ DispatcherHandler.class, HttpHandler.class })
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
@ConditionalOnMissingBean(HttpHandler.class)
@AutoConfigureAfter({ WebFluxAutoConfiguration.class })
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
public class HttpHandlerAutoConfiguration {
@Configuration
public static class AnnotationConfig {
private ApplicationContext applicationContext;
public AnnotationConfig(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
//构建WebHandler
@Bean
public HttpHandler httpHandler() {
return WebHttpHandlerBuilder.applicationContext(this.applicationContext)
.build();
}
}
流程相关源码解析-web容器
org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer 。在创建WebServerManager 容器管理器时会获取对应web容器实例,并注入响应的HTTPHandler。
java复制代码class WebServerManager {
private final ReactiveWebServerApplicationContext applicationContext;
private final DelayedInitializationHttpHandler handler;
private final WebServer webServer;
WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory,
Supplier<HttpHandler> handlerSupplier, boolean lazyInit) {
this.applicationContext = applicationContext;
Assert.notNull(factory, "Factory must not be null");
this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit);
this.webServer = factory.getWebServer(this.handler);
}
}
以Tomcat 容器为例展示创建过程,使用的是 TomcatHTTPHandlerAdapter 来连接Servlet 请求到HTTPHandler组件。
java复制代码public class TomcatReactiveWebServerFactory extends AbstractReactiveWebServerFactory implements ConfigurableTomcatWebServerFactory {
//隐藏部分代码
@Override
public WebServer getWebServer(HttpHandler httpHandler) {
if (this.disableMBeanRegistry) {
Registry.disableRegistry();
}
Tomcat tomcat = new Tomcat();
File baseDir = (this.baseDirectory != null) ? this.baseDirectory : createTempDir("tomcat");
tomcat.setBaseDir(baseDir.getAbsolutePath());
Connector connector = new Connector(this.protocol);
connector.setThrowOnFailure(true);
tomcat.getService().addConnector(connector);
customizeConnector(connector);
tomcat.setConnector(connector);
tomcat.getHost().setAutoDeploy(false);
configureEngine(tomcat.getEngine());
for (Connector additionalConnector : this.additionalTomcatConnectors) {
tomcat.getService().addConnector(additionalConnector);
}
TomcatHttpHandlerAdapter servlet = new TomcatHttpHandlerAdapter(httpHandler);
prepareContext(tomcat.getHost(), servlet);
return getTomcatWebServer(tomcat);
}
}
最后Spring容器加载后通过SmartLifecycle实现类WebServerStartStopLifecycle 来启动Web容器。
WebServerStartStopLifecycle 注册过程详见:org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer
3.2 完整请求处理流程
(引用自:blog.csdn.net)
该图给出了一个HTTP请求处理的调用链路。是采用Reactor Stream 方式书写,只有最终调用 subscirbe 才真正执行业务逻辑。基于WebFlux 开发时要避免controller 中存在阻塞逻辑。列举下面例子可以看到Spring MVC 和Spring Webflux 之间的请求处理区别。
java复制代码@RestControllerpublic
class TestController {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@GetMapping("sync")
public String sync() {
logger.info("sync method start");
String result = this.execute();
logger.info("sync method end");
return result;
}
@GetMapping("async/mono")
public Mono<String> asyncMono() {
logger.info("async method start");
Mono<String> result = Mono.fromSupplier(this::execute);
logger.info("async method end");
return result;
}
private String execute() {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}
}
日志输出
java复制代码2021-05-31 20:14:52.384 INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController : sync method start
2021-05-31 20:14:57.385 INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController : sync method end
2021-05-31 20:15:09.659 INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController : async method start
2021-05-31 20:15:09.660 INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController : async method end
从上面例子可以看出sync() 方法阻塞了请求,而asyncMono() 没有阻塞请求并立刻返回的。asyncMono() 方法具体业务逻辑 被包裹在了Mono 中Supplier中的了。当execute 处理完业务逻辑后通过回调方式响应给浏览器。
四、存储支持
一旦控制层使用了 Spring Webflux 则安全认证层、数据访问层都必须使用 Reactive API 才真正实现异步非阻塞。
NOSQL Database
- MongoDB (org.springframework.boot:spring-boot-starter-data-mongodb-reactive)。
- Redis(org.springframework.boot:spring-boot-starter-data-redis-reactive)。
Relational Database
- H2 (io.r2dbc:r2dbc-h2)
- MariaDB (org.mariadb:r2dbc-mariadb)
- Microsoft SQL Server (io.r2dbc:r2dbc-mssql)
- MySQL (dev.miku:r2dbc-mysql)
- jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)
- Postgres (io.r2dbc:r2dbc-postgresql)
- Oracle (com.oracle.database.r2dbc:oracle-r2dbc)
五、总结
关于Spring MVC 和Spring WebFlux 测评很多,本文引用下做简单说明。参考:《Spring: Blocking vs non-blocking: R2DBC vs JDBC and WebFlux vs Web MVC》。
基本依赖
java复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<!-- r2dbc 连接池 -->
<dependency>
<groupId >io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
</dependency>
<!--r2dbc mysql 库-->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc- mysql</artifactId>
</dependency>
<!--自动配置需要引入一个嵌入式数据库类型对象-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>
<!-- 反应方程式 web 框架 webflux-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
相同数据下效果如下**;**
Spring MVC + JDBC 在低并发下表现最好,但 WebFlux + R2DBC 在高并发下每个处理请求使用的内存最少。
Spring WebFlux + R2DBC 在高并发下,吞吐量表现优异。