天天看点

Spring 5新框架——WebFlux1.基础概念2.开发WebFlux服务端3.深入WebFlux服务端开发4.深入客户端开发5.使用路由函数开发WebFlux

目录

1.基础概念

1.1 Reactor模型

1.2 Spring WebFlux概述

1.3 WebHandler接口和运行流程

2.开发WebFlux服务端

2.1 依赖导入

2.2 开发持久层

2.3 开发服务层

2.4 开发控制层

2.5 配置服务

2.6 客户端开发——WebClient

3.深入WebFlux服务端开发

3.1 类型转换器——Converter

3.2 验证器——Validator

3.3 访问静态资源

4.深入客户端开发

4.1 处理服务端错误和转换

4.2 设置请求头

5.使用路由函数开发WebFlux

5.1开发处理器

5.2 开发请求路由

5.3使用过滤器

在互联网的应用中,存在电商和金融等企业,这些企业对于业务的严谨性要求特别高,因为他们的业务关系到用户和商家的账户以及财产的安全,多以它们对于数据的一致性十分重视,在存在并发的时候,就需要通过锁或者其他机制保证一些重要数据的一致性,但这也会造成性能的下降。对于另外一些互联网应用则不一样,如游戏、视频、新闻和广告的网站,它们一般不会涉及账户和财产的问题,也就是不需要很高的数据一致性,但是对于并发数和响应速度却十分在意,而使用传统的开发模式会引入一致性机制,造成性能下降,为此一些软件设计者提出了响应式编程的理念。

为了适应响应式编程的潮流,Spring 5发布了新一代响应式的Web框架——Spring WebFlux,不过在讨论WebFlux之前,需要先了解一下RxJava和Reactor,而Reactor是Spring WebFlux的默认实现,所以我们主要讨论一下Reactor。

1.基础概念

响应式编程是一种面向数据流和变化传播的编程范式。对于响应式框架,是基于响应式宣言的理念所产生的编程方式。响应式宣言分为4大理念:

  • 灵敏的:可以快速响应的,只要有任何可能,系统都应该能够尽可能快地做出响应。
  • 可恢复的:系统在运行中可能出现问题,但是能够有很强大的容错机制和修复机制保持响应性。
  • 可伸缩的:在任何负载下,响应式编程都可以根据自身压力变化,请求少时,通过减少资源释放服务器压力,负载大时能够通过扩展算法和软硬件的方式扩展服务压力,以经济实惠的方式实现可伸缩性。
  • 消息驱动的:响应式编程存在异步消息机制,事件之间的协作是通过消息进行连接的。

基于这些理念,响应式编程提出了各种模型来满足响应式编程的理念,其中著名的有Reactor和RxJava,Spring5就是基于它们构建WebFlux,而默认情况下它会使用Reactor。

1.1 Reactor模型

传统的编程模型中,当请求量大于系统能够承受的最大线程数时,大量的线程就只能够在队列中等待或者被系统杀死,无法及时响应用户。为了克服这个问题,提出了Reactor(反应器)模式,其模型图如下图所示:

Spring 5新框架——WebFlux1.基础概念2.开发WebFlux服务端3.深入WebFlux服务端开发4.深入客户端开发5.使用路由函数开发WebFlux

首先客户端会先向服务器注册其感兴趣的事件(Event),这样客户端就订阅了对应的事件,只是订阅事件并不会给服务器发送请求。当客户端发生一些已经注册的事件时,就会触发服务器的响应。当触发服务器响应时,服务器存在一个Selector线程,这个线程只是负责轮询客户端发送过来的事件,并不处理请求,当它接收到有客户端事件时,就会转到对应的请求处理器,然后启用另外一个线程运行处理器。因为Selector线程只是轮询,并不处理复杂的业务逻辑,所以它可以在轮询后对请求做到实时响应,速度十分快。由于事件存在很多种,所以请求处理器也存在多个,因此还需要进行区分事件的类型,所以Selector存在一个路由问题。当请求处理器处理业务时,结果最终也会转换为数据流发送到客户端。

从上面可以看出,Reactor是一种基于事件的模型,对于服务器线程而言,它也是一种异步的,首先是Selector线程轮询到事件,然后通过路由找到处理器去运行对应的逻辑,处理器最后所返回的结果会转换为数据流。

1.2 Spring WebFlux概述

Spring WebFlux是Spring 5推出的新一代Web响应式编程框架,它十分适合那些需要高并发和大量请求的互联网应用,特别是那些需要高速响应而对业务逻辑要求并不十分严格的网站,如游戏、视频和新闻浏览网站等。

参考地址:https://docs.spring.io/spring-framework/docs/5.2.2.RELEASE/spring-framework-reference/web-reactive.html#spring-webflux

下图是Spring MVC和Spring WebFlux的对比:

Spring 5新框架——WebFlux1.基础概念2.开发WebFlux服务端3.深入WebFlux服务端开发4.深入客户端开发5.使用路由函数开发WebFlux

WebFlux包含一个路由分发层,也就是根据请求的事件,决定采用什么类的什么方法处理客户端发送过来的事件请求,类似于Reactor中的Selector,业务逻辑处理完成后,再将结果转换为数据流。

WebFlux需要能够支持Servlet3.1+的容器,如Tomcat、Jetty和Undertow等,而在Java异步编程的领域,使用得最多的是Netty,所以WebFlux的starter中默认是依赖于Netty库的。

1.3 WebHandler接口和运行流程

与Spring MVC使用DispatcherServlet不同的是Spring WebFlux使用的是WebHandler。它与DispatcherServlet十分类似。只是WebHandler是一个接口,为此Spring WebFlux为其提供了几个实现类,以便于在不同的场景下使用。这几个实现类中DispatcherHandler是我们关注的核心,它与DispatcherServlet是十分接近的 ,我们首先看一下DispatcherHandler的核心代码handle方法:

@Override
	public Mono<Void> handle(ServerWebExchange exchange) {
		if (this.handlerMappings == null) {
			return createNotFoundError();
		}
		return Flux.fromIterable(this.handlerMappings)
				.concatMap(mapping -> mapping.getHandler(exchange))
				.next()
				.switchIfEmpty(createNotFoundError())
				.flatMap(handler -> invokeHandler(exchange, handler))
				.flatMap(result -> handleResult(exchange, result));
	}
           

从源码中我们可以看到,与Spring MVC一样,都是从HandlerMapping中找到对应的处理器,找到处理器之后就会通过invokeHandler方法运行处理器,最后得到了处理结果的handleResult方法,通过它将结果转变为对应的数据流序列。下图是它大概的流程图:

Spring 5新框架——WebFlux1.基础概念2.开发WebFlux服务端3.深入WebFlux服务端开发4.深入客户端开发5.使用路由函数开发WebFlux

下面我们以MongoDB作为响应式编程的数据源,这里之所以使用MongoDB,是因为Spring WebFlux只能支持Spring Data Reactive,它是一种非阻塞的响应数据方式。遗憾的是,因为数据库的开发往往是阻塞的,所以Spring Data Reactive并不能对数据库的开发给予有效支持。但Spring Data Reactive可以支持Redis、MongoDB等NoSQL的开发,而Redis功能受限,更加适合作为缓存使用,所以我们选择使用MongoDB作为Spring WebFlux的实例,这也是最广泛的使用方式。

2.开发WebFlux服务端

在Spring WebFlux中,存在两种开发方式,一种是类似于Spring MVC的模式,另一种则是函数功能性编程,无论哪种都是允许的。我们这里使用类似于Spring MVC的模式,其中@Controller、@ResponseMapping、@GetMapping、@PostMapping等Spring MVC注解依然有效,这就为构建WebFlux的应用带来了便利。

2.1 依赖导入

首先导入相关依赖:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
        </dependency>

        <!--MongoDb jpa-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
           

我们以MongoDB作为响应式编程的数据源,所以引入了JPA和MongoDB的starter。与此同时将WebFlux的starter包引入进来了,而它会依赖于Spring Web的包,这里还引入了Tomcat作为默认的服务器。

2.2 开发持久层

这里采用MongoDB作为开发的数据源,先定义POJO:

package com.martin.flux.pojo;

import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;

import java.io.Serializable;

/**
 * @author: martin
 * @date: 2019/11/3 10:44
 * @description:
 */
@Document(collection = "users")
@Data
public class UserDO implements Serializable {
    @Id
    private Long id;
    @Field("user_name")
    private String userName;
    private String note;
    /**
     * 定义了性别的枚举,可以通过typeHandler转换
     */
    private String sex;
}
           

这里采用JPA作为持久层,而Spring WebFlux为响应式提供了接口ReactiveMongoRepository,这样就可以通过继承它声明了一个JPA接口:

package com.martin.flux.dao;

import com.martin.flux.pojo.UserDO;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;

/**
 * @author: martin
 * @date: 2020/2/7
 */
@Repository
public interface UserDao extends ReactiveMongoRepository<UserDO, Long> {
    Flux<UserDO> findByUserNameLikeAndNoteLike(String userName, String note);
}
           

findByUserNameLikeAndNoteLike方法是一个按照JPA规则命名的方法,它的作用就是使用用户名和备注进行模糊查询。

2.3 开发服务层

先定义用户服务接口UserService:

package com.martin.flux.service;

import com.martin.flux.pojo.UserDO;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * @author: martin
 * @date: 2020/2/7
 */
public interface UserService {
    Mono<UserDO> getUser(Long id);

    Mono<UserDO> insertUser(UserDO userDO);

    Mono<UserDO> updateUser(UserDO userDO);

    Mono<Void> deleteUser(Long id);

    Flux<UserDO> findUsers(String userName, String note);
}
           

然后实现这个接口:

package com.martin.flux.service;

import com.martin.flux.dao.UserDao;
import com.martin.flux.pojo.UserDO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * @author: martin
 * @date: 2020/2/7
 */
@Service
public class UserServiceImpl implements UserService {
    @Autowired
    private UserDao userDao;

    @Override
    public Mono<UserDO> getUser(Long id) {
        return userDao.findById(id);
    }

    @Override
    public Mono<UserDO> insertUser(UserDO userDO) {
        return userDao.save(userDO);
    }

    @Override
    public Mono<UserDO> updateUser(UserDO userDO) {
        return userDao.save(userDO);
    }

    @Override
    public Mono<Void> deleteUser(Long id) {
        return userDao.deleteById(id);
    }

    @Override
    public Flux<UserDO> findUsers(String userName, String note) {
        return userDao.findByUserNameLikeAndNoteLike(userName, note);
    }
}
           

2.4 开发控制层

对于WebFlux而言,使用Rest风格更加合适,这里使用的也是Rest风格的控制器。首先我们定义前端视图模型UserVO:

package com.martin.flux.pojo;

import lombok.Data;

/**
 * @author: martin
 * @date: 2020/2/7
 */
@Data
public class UserVO {
    private Long id;
    private String userName;
    private int sexCode;
    private String sexName;
    private String note;
}
           

接下来定义用户控制器;

package com.martin.flux.controller;

import com.martin.flux.pojo.UserDO;
import com.martin.flux.pojo.UserVO;
import com.martin.flux.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * @author: martin
 * @date: 2020/2/7
 */
@RestController
public class UserController {
    @Autowired
    private UserService userService;

    //获取用户
    @GetMapping("/user/{id}")
    public Mono<UserVO> getUser(@PathVariable Long id) {
        return userService.getUser(id).map(u -> translate(u));
    }

    //新增用户
    @PostMapping("/user")
    public Mono<UserVO> insertUser(@RequestBody UserDO userDO) {
        return userService.insertUser(userDO).map(u -> translate(userDO));
    }

    //更新用户
    @PutMapping("/user")
    public Mono<UserVO> updateUser(@RequestBody UserDO userDO) {
        return userService.updateUser(userDO).map(u -> translate(userDO));
    }

    //删除用户
    @PutMapping("/user/{id}")
    public Mono<Void> updateUser(@PathVariable Long id) {
        return userService.deleteUser(id);
    }

    //查询用户
    @GetMapping("/user/{userName}/{note}")
    public Flux<UserVO> findUsers(@PathVariable String userName, @PathVariable String note) {
        return userService.findUsers(userName, note).map(u -> translate(u));
    }


    private UserVO translate(UserDO userDO) {
        UserVO userVO = new UserVO();
        userVO.setId(userDO.getId());
        userVO.setUserName(userDO.getUserName());
        userVO.setSexCode(Integer.valueOf(userDO.getSex()));
        userVO.setNote(userDO.getNote());
        return userVO;
    }
}
           

这里的@RestController代表采用Rest风格的控制器,这样Spring就知道将返回的内容转换为JSON数据序列。但是应注意的是,这里的方法返回的或者是Flux<UserVO>或者是Mono<User>,Mono是一个0~1个数据流序列,而Flux是一个0~N个数据流序列。

2.5 配置服务

Spring Boot配置文件如下:

spring.data.mongodb.host=localhost
spring.data.mongodb.username=dbAdmin
spring.data.mongodb.password=password
spring.data.mongodb.port=27017
spring.data.mongodb.database=test-it
           

启动文件配置如下:

package com.martin.flux;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
//定义扫描包
@SpringBootApplication(scanBasePackages = "com.martin.flux")
//因为引入了JPA,所以默认情况下,需要配置数据源
//排除原有自动配置的数据源
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class})
//在WebFlux下,驱动MongoDB的JPA接口
@EnableReactiveMongoRepositories(basePackages = "com.martin.flux.dao")
public class FluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(FluxApplication.class, args);
    }

}
           

因为引入了JPA,所以在默认的情况下Spring Boot会尝试装配关系数据库数据源(DataSource),而这里使用的是MongoDB并没有使用关系数据库,所以使用@EnableAutoConfiguration去排除数据源的初始化,否则就会得到错误的启动日志。在WebFlux中使用响应式的MongoDB的JPA接口,需要使用注解@EnableReactiveMongoRepositories进行驱动,还定义了扫描的包,这样就可以将代码扫描到IOC容器中了。

MongoDB数据库中导入如下数据:

Spring 5新框架——WebFlux1.基础概念2.开发WebFlux服务端3.深入WebFlux服务端开发4.深入客户端开发5.使用路由函数开发WebFlux

我们启动应用,并访问http://localhost:8080/user/1,页面会有如下返回结果:

Spring 5新框架——WebFlux1.基础概念2.开发WebFlux服务端3.深入WebFlux服务端开发4.深入客户端开发5.使用路由函数开发WebFlux

显然运行成功了,这样就完成了简单的WebFlux开发。

2.6 客户端开发——WebClient

为了方便测试,Spring WebFlux为我们提供了WebClient类,它是一个比RestTemplate更为强大的类,通过它就可以请求后端的服务。实例代码如下:

package com.martin.flux;

import com.martin.flux.pojo.SexEnum;
import com.martin.flux.pojo.UserDO;
import com.martin.flux.pojo.UserVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

import java.util.HashMap;
import java.util.Map;

/**
 * @author: martin
 * @date: 2020/2/9
 */
@Slf4j
public class FluxTest {
    public static void main(String[] args) {
        WebClient client = WebClient.create("http://localhost:8080");
        //新建一个用户
        UserDO newUser = new UserDO();
        newUser.setId(2L);
        newUser.setUserName("mars");
        newUser.setSex(String.valueOf(SexEnum.FEMALE.getCode()));
        //新增用户
        insertUser(client, newUser);
        getUser(client, 2L);
        //更新一个用户
        UserDO updateUser = new UserDO();
        updateUser.setId(1L);
        updateUser.setUserName("mars");
        updateUser.setSex(String.valueOf(SexEnum.FEMALE.getCode()));
        updateUser.setNote("mars老师");
        updateUser(client, updateUser);
        //查询用户
        findUsers(client, "mars", "mars老师");
        //删除用户
        deleteUser(client, 2L);
    }

    private static void insertUser(WebClient client, UserDO newUser) {
        Mono<UserVO> userVOMono = client.post()
                .uri("/user")
                .contentType(MediaType.APPLICATION_STREAM_JSON)
                .body(Mono.just(newUser), UserDO.class)
                .accept(MediaType.APPLICATION_STREAM_JSON)
                .retrieve()
                .bodyToMono(UserVO.class);

        UserVO userVO = userVOMono.block();
        log.error("insert user success:{}", userVO.getUserName());
    }

    private static void getUser(WebClient client, long id) {
        Mono<UserVO> userVOMono = client.get()
                .uri("/user/{id}", id)
                .accept(MediaType.APPLICATION_STREAM_JSON)
                .retrieve()
                .bodyToMono(UserVO.class);
        UserVO userVO = userVOMono.block();
        log.error("get user success:{}", userVO.getUserName());
    }

    private static void updateUser(WebClient client, UserDO userDO) {
        Mono<UserVO> userVOMono = client.put()
                .uri("/user")
                .contentType(MediaType.APPLICATION_STREAM_JSON)
                .body(Mono.just(userDO), UserDO.class)
                .accept(MediaType.APPLICATION_STREAM_JSON)
                .retrieve()
                .bodyToMono(UserVO.class);
        UserVO userVO = userVOMono.block();
        log.error("update user success:{}", userVO.getUserName());
    }

    private static void findUsers(WebClient client, String userName, String note) {
        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put("userName", userName);
        paramMap.put("note", note);
        Mono<UserVO> userVOMono = client.get()
                .uri("/user/{userName}/{note}", paramMap)
                .accept(MediaType.APPLICATION_STREAM_JSON)
                .retrieve()
                .bodyToMono(UserVO.class);
        UserVO userVO = userVOMono.block();
        log.error("find user success:{}", userVO.getUserName());
    }

    private static void deleteUser(WebClient client, long id) {
        Mono<Void> voidMono = client.delete()
                .uri("/user/{id}", id)
                .accept(MediaType.APPLICATION_STREAM_JSON)
                .retrieve()
                .bodyToMono(Void.class);
        Void voidResult = voidMono.block();
        log.error("delete user success:{}", voidMono);
    }
}
           

这里采用了WebClient的静态方法create来创建WebClient对象,该方法的参数是网站的基础URI,这样就能够定位基础的URI。Mono<UserVO>对象的初始化代码,只是给后端注册一个事件而已,并不会发送请求。使用Mono的block方法则是发送触发事件,这样服务器才会响应事件,将数据流传送到客户端中。

3.深入WebFlux服务端开发

正如上面的例子,服务端的开发十分接近Spring MVC,在大部分情况下可以参考Spring MVC的内容。但Web Flux也有一些特殊的处理,比如新增加了参数转换和验证规则等,此外还有一些错误的处理。这些都是在实际开发中最常见到的内容,需要进一步地学习它们。

3.1 类型转换器——Converter

例如,现在来实现一个类型转换,约定用户将以字符串格式{id}-{userName}-{sex}-{note}进行传递,然后通过类型转换器(Converter)得到用户数据。实现代码如下:

package com.martin.flux.config;

import com.martin.common.constant.MarkCodeEnum;
import com.martin.flux.pojo.UserDO;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.convert.converter.Converter;
import org.springframework.format.FormatterRegistry;
import org.springframework.web.reactive.config.WebFluxConfigurer;

/**
 * @author: martin
 * @date: 2020/2/9
 */
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {
    //注册Converter
    @Override
    public void addFormatters(FormatterRegistry registry) {
        registry.addConverter(string2UserConverter());
    }

    /**
     * 定义String转换成UserDO类型转换器
     * 如果加上@Bean注解,Spring Boot会自动识别为类型转换器
     *
     * @return
     */
    public Converter<?, ?> string2UserConverter() {
        Converter<String, UserDO> converter = (s -> {
            String[] args = s.split(MarkCodeEnum.SEGMENT.getCode());
            UserDO userDO = new UserDO();
            Long id = Long.valueOf(args[0]);
            userDO.setId(id);
            userDO.setUserName(args[1]);
            userDO.setSex(args[2]);
            userDO.setNote(args[3]);
            return userDO;
        });

        return converter;
    }
}
           

上面的代码实现了WebFluxConfigurer 接口,并覆盖了addFormatters方法。该方法是加载转换器和格式化器的,这里使用了方法string2UserConverter来定义了一个Converter,这样就能够将字符串按照约定的格式转换为用户类。我们也可以继承WebFluxConfigurer接口,将string2UserConverter方法添加@Bean注解,这样Spring Boot会自动识别这个Bean为转换器,就不需要实现addFormatters方法手动注册了。

为了测试转换器,在UserController中添加如下方法:

//新增用户
    @PostMapping("/user2/{user}")
    public Mono<UserVO> insertUser2(@PathVariable("user") UserDO userDO) {
        return userService.insertUser(userDO).map(u -> translate(userDO));
    }
           

WebClient测试代码如下:

package com.martin.flux;

import com.martin.flux.pojo.UserVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

/**
 * @author: martin
 * @date: 2020/2/9
 */
@Slf4j
public class ConvertTest {
    public static void main(String[] args) {
        WebClient client = WebClient.create("http://localhost:8080");
        Mono<UserVO> userVOMono = client.post()
                .uri("/user2/{user}", "3-convert-1-转换器测试")
                .accept(MediaType.APPLICATION_STREAM_JSON)
                .retrieve()
                .bodyToMono(UserVO.class);

        UserVO userVO = userVOMono.block();
        log.error("user name is:{}", userVO.getUserName());
    }
}
           

我们传入和服务端约定格式,就能够将用户信息存入到MongDB中了。关于日期格式化器,WebFlux允许我们通过application.properties进行配置,例如:

spring.webflux.date-format=yyyy-MM-dd
           

3.2 验证器——Validator

有时候需要对参数进行验证,比如我们需要验证一下用户名称是否为空。这个时候可以使用Spring MVC的Validator机制,首先新建用户验证器UserValidator:

package com.martin.flux.config;


import com.martin.flux.pojo.UserDO;
import org.springframework.util.StringUtils;
import org.springframework.validation.Errors;
import org.springframework.validation.Validator;

/**
 * @author: martin
 * @date: 2020/2/9
 */
public class UserValidator implements Validator {
    //确定支持的验证类型
    @Override
    public boolean supports(Class<?> clazz) {
        return clazz.equals(UserDO.class);
    }

    //验证逻辑
    @Override
    public void validate(Object target, Errors errors) {
        UserDO userDO = (UserDO) target;
        if (StringUtils.isEmpty(userDO.getUserName())) {
            errors.rejectValue("userName", null, "用户名不能为空");
        }
    }
}
           

在WebFluxConfig中覆盖WebFluxConfigurer接口的getValidator方法,代码如下;

@Override
    public Validator getValidator() {
        return new UserValidator();
    }
           

上面只是定义了验证器,但并没有启用,为了测试该验证器,需要在Controller对应的方法中加入@Valid注解,实例代码如下;

//新增用户
    @PostMapping("/user3/")
    public Mono<UserVO> insertUser3(@Valid @RequestBody UserDO userDO) {
        return userService.insertUser(userDO).map(u -> translate(userDO));
    }
           

这样Spring就会启用UserValidator进行参数验证。但这里是在全局中加入验证器,有时候我们希望使用局部验证器,而不是使用全局验证器,这是可以仿照Spring MVC的办法使用注解@InitBinder,将类和验证器进行绑定。我们删除在WebFluxConfig中的getValidator方法,然后在Controller中加入如下代码:

@InitBinder
    public void initBinder(DataBinder binder) {
        binder.setValidator(new UserValidator());
    }
           

这样UserValidator就只能对当前的控制器有效,而不是全局有效了。

3.3 访问静态资源

有时候我们还需要访问一些文件,如图片、配置内容等。这时可以覆盖WebFluxConfigurer的addResourceHandlers方法,代码如下:

@Override
    public void addResourceHandlers(ResourceHandlerRegistry registry) {
        registry.addResourceHandler("/resources/static/**")
                .addResourceLocations("/public", "classpath:/static/")
                .setCacheControl(CacheControl.maxAge(365, TimeUnit.DAYS));
    }
           

通过这样的限定,就可以直接通过URI来访问/resources/static/下的静态资源,而在Spring的上下文机制中还可以直接访问/public路径,它将能够访问classpath:/static/下的资源。这里还设置了缓存的时限,设置为1年(365天)。为了区分静态资源设置一个前缀,这样便能把静态资源和动态资源区分出来,为了我们可以在application.properties文件中进行配置:

spring.webflux.static-path-pattern=/static/**
           

这样在访问静态资源的时候就需要加入/static/前缀了。

4.深入客户端开发

上面的客户端开发只是考虑了正常的状态,而一些特殊的要求却没有考虑,比如如何设置请求头,服务端发生了错误,应当如何处理等。这些都是实践中经常发生的,所以很有必要讨论一下。

4.1 处理服务端错误和转换

客户端处理服务端错误的实例代码如下:

private static void getUser(WebClient client, long id) {
        Mono<UserVO> userVOMono = client.get()
                .uri("/user/{id}", id)
                .accept(MediaType.APPLICATION_STREAM_JSON)
                .retrieve()
                .onStatus(httpStatus -> httpStatus.is4xxClientError() || httpStatus.is5xxServerError(), clientResponse -> Mono.empty())
                .bodyToMono(UserVO.class);
        UserVO userVO = userVOMono.block();
        if (userVO == null) {
            System.out.println("[用户名称]" + userVO.getUserName());
        } else {
            System.out.println("服务端没有返回该用户");
        }
    }
           

这里采用了onStatus方法,这个方法是监控服务器返回的方法。它的两个参数都采用了Lambda表达式的方式,第一个Lambda表达式的参数是HttpStatus类型,它需要返回的是boolean值;第二个Lambda表达式参数为ClientResponse类型的,它是在第一个Lambda表达式返回true时触发,这里是让结果转换为空。

4.2 设置请求头

有时候需要给HTTP请求头设置一些属性,以便于服务端的获取。实例代码如下:

//更新用户
    @PutMapping("/user/name")
    public Mono<UserVO> updateUserByHeader(@RequestHeader("id") Long id, @RequestHeader("userName") String userName) {
        Mono<UserDO> userDOMono = userService.getUser(id);
        UserDO userDO = userDOMono.block();
        if (userDO == null) {
            throw new IllegalArgumentException("找不到该用户");
        }
        userDO.setUserName(userName);

        return userService.updateUser(userDO).map(u -> translate(userDO));
    }
           

服务端和Spring MVC一样,使用@RequestHeader从请求头中获取参数,然后根据参数去查询用户。有了服务端的方法,接着使用客户端进行测试:

private static void updateUserByHeader(WebClient client, Long id,String userName) {
        Mono<UserVO> userVOMono = client.put()
                .uri("/user/name")
                .header("id",id+"")
                .header("userName",userName)
                .accept(MediaType.APPLICATION_STREAM_JSON)
                .retrieve()
                .bodyToMono(UserVO.class);
        UserVO userVO = userVOMono.block();
        log.error("update user success:{}", userVO.getUserName());
    }
           

这里的两个header方法,它们各自设置了用户编号id和用户名userName,这样就可以以请求头的形式给服务端传递参数了。

5.使用路由函数开发WebFlux

除了上述使用类似Spring MVC的开发方式以外,WebFlux还提供了路由函数开发方式来开发WebFlux。这样的方式体现了高并发的特性,也符合近期兴起的函数式编程的潮流。但是也会引入更多的API和长长的方法链。

5.1开发处理器

使用路由函数方法,首先需要开发一个处理器处理各种场景。UserHandler的实现代码如下:

package com.martin.flux.controller;

import com.martin.flux.pojo.UserDO;
import com.martin.flux.pojo.UserVO;
import com.martin.flux.service.UserService;
import org.apache.catalina.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * @author: martin
 * @date: 2020/2/10
 */
@Service
public class UserHandler {
    @Autowired
    private UserService userService;

    public Mono<ServerResponse> getUser(ServerRequest request) {
        //获取请求参数URI参数
        String idStr = request.pathVariable("id");
        Long id = Long.valueOf(idStr);
        Mono<UserVO> userVOMono = userService.getUser(id).map(u -> translate(u));
        return ServerResponse
                .ok() //响应成功
                .contentType(MediaType.APPLICATION_JSON_UTF8)  //响应体类型
                .body(userVOMono, UserVO.class);//响应体
    }

    public Mono<ServerResponse> insertUser(ServerRequest request) {
        //把请求头转换为UserDO对象
        Mono<UserDO> userDOMono = request.bodyToMono(UserDO.class);
        Mono<UserVO> userVOMono = userDOMono
                .cache() //将数据流对象缓存起来,无需等待数据接收
                .flatMap(user -> userService.insertUser(user))
                .map(u -> translate(u));

        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON_UTF8).body(userVOMono, UserVO.class);
    }

    public Mono<ServerResponse> updateUser(ServerRequest request) {
        Mono<UserDO> userDOMono = request.bodyToMono(UserDO.class);
        Mono<UserVO> userVOMono = userDOMono.cache().flatMap(userDO -> userService.updateUser(userDO)).map(u -> translate(u));

        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON_UTF8).body(userVOMono, UserVO.class);
    }

    public Mono<ServerResponse> deleteUser(ServerRequest request) {
        //获取请求参数URI参数
        String idStr = request.pathVariable("id");
        Long id = Long.valueOf(idStr);
        Mono<Void> monoVoid = userService.deleteUser(id);
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON_UTF8).body(monoVoid, Void.class);
    }

    /**
     * 查询用户
     *
     * @param request
     * @return
     */
    public Mono<ServerResponse> findUsers(ServerRequest request) {
        //获取请求参数URI参数
        String userName = request.pathVariable("userName");
        String note = request.pathVariable("note");
        //使用Flux封装多个数据单元
        Flux<UserVO> userVOFlux = userService.findUsers(userName, note).map(u -> translate(u));

        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON_UTF8).body(userVOFlux, UserVO.class);
    }

    /**
     * 修改用户名
     *
     * @param request
     * @return
     */
    public Mono<ServerResponse> updateUserName(ServerRequest request) {
        //获取请求头数据
        String idStr = request.headers().header("id").get(0);
        String userName = request.headers().header("userName").get(0);
        //获取原有用户信息
        Mono<UserDO> userDOMono = userService.getUser(Long.valueOf(idStr));
        UserDO userDO = userDOMono.block();
        //修改用户名
        userDO.setUserName(userName);
        Mono<UserVO> userVOMono = userService.insertUser(userDO).map(u -> translate(u));

        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON_UTF8).body(userVOMono, UserVO.class);
    }

    private UserVO translate(UserDO userDO) {
        UserVO userVO = new UserVO();
        userVO.setId(userDO.getId());
        userVO.setUserName(userDO.getUserName());
        userVO.setSexCode(Integer.valueOf(userDO.getSex()));
        userVO.setNote(userDO.getNote());
        return userVO;
    }
}
           

用户处理器各个方法开发完成之后,还不能接收请求,因为还没有使它与请求URI对应起来,也没有设置请求接收和相应的类型等信息。

5.2 开发请求路由

为了让HTTP请求能够映射到方法上,还需要一个路由的功能,因此需要开发路由器,使得请求能够映射到路由的方法上。

package com.martin.flux.config;

import com.martin.flux.controller.UserHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.PUT;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RequestPredicates.contentType;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

/**
 * @author: martin
 * @date: 2020/2/11
 */
@Configuration
public class RouterConfig {
    @Autowired
    private UserHandler userHandler;

    //定义用户路由
    @Bean
    public RouterFunction<ServerResponse> userRouter() {
        RouterFunction<ServerResponse> router = route(GET("/router/user/{id}").and(accept(MediaType.APPLICATION_STREAM_JSON)), userHandler::getUser)
                .andRoute(GET("/router/user/{userName}/{note}").and(accept(MediaType.APPLICATION_STREAM_JSON)), userHandler::findUsers)
                .andRoute(POST("/router/user").and(contentType(MediaType.APPLICATION_STREAM_JSON)).and(accept(MediaType.APPLICATION_STREAM_JSON)), userHandler::insertUser)
                .andRoute(PUT("/router/user").and(contentType(MediaType.APPLICATION_STREAM_JSON)).and(accept(MediaType.APPLICATION_STREAM_JSON)), userHandler::updateUser)
                .andRoute(DELETE("/router/user/{id}").and(accept(MediaType.APPLICATION_STREAM_JSON)), userHandler::deleteUser)
                .andRoute(PUT("/router/user/name").and(accept(MediaType.APPLICATION_STREAM_JSON)), userHandler::updateUserName);

        return router;
    }
}
           

5.3使用过滤器

在互联网的环境中往往还需要保护这些业务请求,以避免网站被攻击。这时可以采用过滤器的方式拦截请求,通过验证身份后才处理业务逻辑。下面的实例代码实现了在请求头中存放用户名和密码,后端验证后才处理业务逻辑。

package com.martin.flux.config;

import com.martin.flux.controller.UserHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.server.HandlerFunction;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

/**
 * @author: martin
 * @date: 2020/2/11
 */
@Configuration
public class RouterFiltConfig {
    @Autowired
    private UserHandler userHandler;

    public static final String HEADER_NAME = "user";
    public static final String HEADER_PWD = "password";

    //定义用户路由
    @Bean
    public RouterFunction<ServerResponse> userRouter() {
        RouterFunction<ServerResponse> router =
                route(GET("/security/user/{id}").and(accept(MediaType.APPLICATION_STREAM_JSON)), userHandler::getUser)
                        .filter(((serverRequest, handlerFunction) -> filterLogic(serverRequest, handlerFunction)));

        return router;
    }

    private Mono<ServerResponse> filterLogic(ServerRequest serverRequest, HandlerFunction<ServerResponse> handlerFunction) {
        //取出请求头
        String userName = serverRequest.headers().header(HEADER_NAME).get(0);
        String password = serverRequest.headers().header(HEADER_PWD).get(0);
        //验证通过条件
        if (!StringUtils.isEmpty(userName) && !StringUtils.isEmpty(password) && !userName.equals(password)) {
            //接收请求
            return handlerFunction.handle(serverRequest);
        }
        //验证不匹配,则不允许请求,返回未签名错误
        return ServerResponse.status(HttpStatus.UNAUTHORIZED).build();
    }
}
           

继续阅读