天天看点

Spring Boot 集成 WebFlux 开发 Reactive Web 应用

Spring Boot 集成 WebFlux 开发 Reactive Web 应用

《Spring Boot 实战开发》—— 基于 Gradle + Kotlin的企业级应用开发最佳实践

IBM的研究称,整个人类文明所获得的全部数据中,有90%是过去两年内产生的。在此背景下,包括NoSQL,Hadoop, Spark, Storm, Kylin在内的大批新技术应运而生。其中以RxJava和Reactor为代表的响应式(Reactive)编程技术针对的就是经典的大数据4V( Volume,Variety,Velocity,Value)中的Velocity,即高并发问题,而在Spring 5中,引入了响应式编程的支持。

本章介绍 Spring Boot 如何集成Spring 5 中的WebFlux 开发响应式 Web 应用。

1.1 响应式宣言

响应式宣言和敏捷宣言一样,说起响应式编程,必先提到响应式宣言——

We want systems that are Responsive, Resilient, Elastic and Message Driven. We call these Reactive Systems. - The Reactive Manifesto

响应式宣言中包含了4组关键词:

 Responsive: 可响应的。要求系统尽可能做到在任何时候都能及时响应。

 Resilient: 可恢复的。要求系统即使出错了,也能保持可响应性。

 Elastic: 可伸缩的。要求系统在各种负载下都能保持可响应性。

 Message Driven: 消息驱动的。要求系统通过异步消息连接各个组件。

可以看到,对于任何一个响应式系统,首先要保证的就是可响应性,否则就称不上是响应式系统。

1.2 Spring 5 响应式Web框架架构图

引用一张来自 Spring 5框架官方文档中的图:

图13-1 Spring 5框架

左侧是传统的基于Servlet的Spring Web MVC框架。右侧是Spring 5.0新引入的基于Reactive Streams的Spring WebFlux框架。从上到下依次是如下三个新组件:

 Router Functions

 WebFlux

 Reactive Streams

下面分别作简要介绍。

Router Functions

对标@Controller,@RequestMapping等标准的Spring MVC注解,提供一套函数式风格的API,用于创建Router,Handler和Filter。

WebFlux: 核心组件

协调上下游各个组件提供响应式编程支持。

Reactive Streams

一种支持背压(Backpressure)的异步数据流处理标准,主流实现有RxJava和Reactor,Spring WebFlux默认集成的是Reactor。

在Web容器的选择上,Spring WebFlux既支持像Tomcat,Jetty这样的的传统容器(前提是支持Servlet 3.1 Non-Blocking IO API),又支持像Netty,Undertow那样的异步容器。不管是何种容器,Spring WebFlux都会将其输入输出流适配成Flux<DataBuffer>格式,以便进行统一处理。

值得一提的是,除了新的Router Functions接口,Spring WebFlux同时支持使用老的Spring MVC注解声明Reactive Controller。和传统的MVC Controller不同,Reactive Controller操作的是非阻塞的ServerHttpRequest和ServerHttpResponse,而不再是Spring MVC里的HttpServletRequest和HttpServletResponse。

1.3 项目实战

本节通过实例工程具体介绍开发一个Reactive Web 应用程序的过程。

1.3.1 创建项目

使用​​​http://start.spring.io/​​ 创建项目,选择 Reactive Web起步依赖。如下图:

图13-2 选择 Reactive Web起步依赖

生成好项目 zip 包后,解压导入 IDEA 中, 选择 Gradle 构建项目。如下图:

图13-3 选择 Gradle 构建

配置 Gradle 本地环境,如下图:

图13-4 配置 Gradle 本地环境

完成导入 IDEA,等待项目构建初始化完毕,可以看到项目依赖树如下图:

图13-5 项目依赖树

可以看到,在 webflux的 starter 中依赖了 reactor、reactive-streams、netty 等。

Spring Initializr 将会帮我们自动生成一个样板工程。下面我们分别来加入 model 层 、dao层、 service层、 handler层等模块的代码。完整的项目的代码目录结构设计如下:

├── src
│   ├── main
│   │   ├── java
│   │   ├── kotlin
│   │   │   └── com
│   │   │       └── easy
│   │   │           └── kotlin
│   │   │               └── webflux
│   │   │                   ├── WebfluxApplication.kt
│   │   │                   ├── dao
│   │   │                   │   └── PersonRepository.kt
│   │   │                   ├── handler
│   │   │                   │   └── PersonHandler.kt
│   │   │                   ├── model
│   │   │                   │   └── Person.kt
│   │   │                   ├── router
│   │   │                   │   └── RouterConfig.kt
│   │   │                   ├── server
│   │   │                   │   └── HttpServerConfig.kt
│   │   │                   └── service
│   │   │                       └── PersonService.kt
│   │   └── resources
│   │       └── application.properties
      

1.3.2 项目代码

本节具体介绍具体的代码实现。

模型层

Person 对象模型代码是

class Person(@JsonProperty("name") val name: String, @JsonProperty("age") val age: Int) {

    override fun toString(): String {
        return "Person{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}'
    }
}      

服务层

接口PersonRepository.kt 的代码如下:

interface PersonRepository {

    fun getPerson(id: Int): Mono<Person>

    fun allPeople(): Flux<Person>

    fun savePerson(person: Mono<Person>): Mono<Void>
}      

服务层的实现类PersonService.kt 代码如下

@Service
class PersonService : PersonRepository {
    var persons: MutableMap<Int, Person> = hashMapOf()

    constructor() {
        this.persons[1] = Person("Jack", 20)
        this.persons[2] = Person("Rose", 16)
    }

    // 根据 id 获取 Mono 对象包装的 Person数据
    override fun getPerson(id: Int): Mono<Person> {
        return Mono.justOrEmpty(this.persons[id])
    }
    // 返回所有 Person数据,包装在 Flux 对象中
    override fun allPeople(): Flux<Person> {
        return Flux.fromIterable(this.persons.values)
    }

    override fun savePerson(person: Mono<Person>): Mono<Void> {
        return person.doOnNext {
            val id = this.persons.size + 1
            persons.put(id, it)
            println("Saved ${person} with ${id}")
        }.thenEmpty(Mono.empty())

    }
}      

其中, Mono 和 Flux 是由 Reactor 提供的两个 Reactor的类型。Reactor有两种类型,Flux<T>和Mono<T>。

 Flux

Flux 单词的意思是“流”。Flux类似RaxJava的Observable,它可以触发零个或者多个事件,并根据实际情况结束处理或触发错误。

 Mono

Mono这个单词本身的意思是“单子”的意思。Mono最多只触发一个事件,它跟RxJava的Single和Maybe类似,所以可以把Mono<Void>用于在异步任务完成时发出通知。

Spring 同时支持其他 Reactive 流实现,如 RXJava。

控制器层PersonHandler.kt 代码如下:

@Service
class PersonHandler {

    @Autowired lateinit var repository: PersonRepository

    fun getPerson(request: ServerRequest): Mono<ServerResponse> {
        val personId = Integer.valueOf(request.pathVariable("id"))!!
        val notFound = ServerResponse.notFound().build()
        val personMono = this.repository.getPerson(personId)
        return personMono
            .flatMap { person -> ServerResponse.ok().contentType(APPLICATION_JSON).body(fromObject(person)) }
            .switchIfEmpty(notFound)
    }

    fun createPerson(request: ServerRequest): Mono<ServerResponse> {
        val person = request.bodyToMono(Person::class.java)
        return ServerResponse.ok().build(this.repository.savePerson(person))
    }

    fun listPeople(request: ServerRequest): Mono<ServerResponse> {
        val people = this.repository.allPeople()
        return ServerResponse.ok().contentType(APPLICATION_JSON).body(people, Person::class.java)
    }

}      

这里我们没有真实去连接数据库进行操作,只是在内存中模拟了数据的返回。

请求路由

RouterConfig.kt 配置请求路由,把请求映射到相应的 Handler 处理方法。代码如下:

@Configuration
class RouterConfig {

    @Autowired lateinit var personHandler: PersonHandler

    @Bean
    fun routerFunction(): RouterFunction<*> {
        return route(GET("/api/person").and(accept(APPLICATION_JSON)),
                HandlerFunction { personHandler.listPeople(it) })
            .and(route(GET("/api/person/{id}").and(accept(APPLICATION_JSON)),
                    HandlerFunction { personHandler.getPerson(it) }))
    }

}      

这里我们配置/api/person的 GET 请求映射到personHandler.listPeople()方法处理;/api/person/{id}的 GET 请求映射到personHandler.getPerson() 方法来处理。

Reactive Web服务器配置类HttpServerConfig.kt 配置基于 netty 的 Reactive Web Server。我们配置端口号为application.properties 文件中server.port的值。代码如下

@Configuration
class HttpServerConfig {
    @Autowired
    lateinit var environment: Environment

    @Bean
    fun httpServer(routerFunction: RouterFunction<*>): HttpServer {
        val httpHandler = RouterFunctions.toHttpHandler(routerFunction)
        val adapter = ReactorHttpHandlerAdapter(httpHandler)
        val server = HttpServer.create("localhost", environment.getProperty("server.port").toInt())
        server.newHandler(adapter)
        return server
    }

}      

项目入口类

项目入口类 WebfluxApplication代码如下:

@SpringBootApplication
class WebfluxApplication

fun main(args: Array<String>) {
    runApplication<WebfluxApplication>(*args)
}      

运行测试

直接在 IDEA 中启动运行应用,在控制台启动日志中,可以看到路由映射的信息:

Mapped ((GET && /api/person) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$1@46292372
((GET && /api/person/{id}) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$2@126be319
……
2017-11-04 00:39:50.459  INFO 2884 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext     : Started HttpServer on /0:0:0:0:0:0:0:0:9000
2017-11-04 00:39:50.459  INFO 2884 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 9000
2017-11-04 00:39:50.466  INFO 2884 --- [           main] c.e.kotlin.webflux.WebfluxApplicationKt  : Started WebfluxApplicationKt in 5.047 seconds (JVM running for 6.276)      

直接在命令行执行curl 请求相应的 url,可以看到对应的输出:

$ curl http://127.0.0.1:9000/api/person
 [{"name":"Jack","age":20},{"name":"Rose","age":16}]

$ curl http://127.0.0.1:9000/api/person/1
{"name":"Jack","age":20}

$ curl http://127.0.0.1:9000/api/person/2
{"name":"Rose","age":16}