天天看點

Webflux響應式程式設計(SpringBoot 2.0新特性)——完整版

  • 學習webflux前需要學習三個基礎:
  1. 函數式程式設計和lambda表達式
  2. Stream流程式設計
  3. Reactive stream 響應式流
  • 接下來進入學習

一、函數式程式設計和lambda表達式

1. 什麼是函數式程式設計

函數式程式設計是一種相對于指令式程式設計的一種程式設計範式,它不是一種具體的技術,而是一種如何搭建應用程式的方法論

2. 為什麼要使用函數式程式設計

* 能讓我們以一種更加優雅的方式進行程式設計

* 函數式程式設計與指令式程式設計相比

1)不同點:

關注點不同,指令式程式設計我們關注的是怎麼樣做,而函數式程式設計關注的是做什麼。

2)優點:

可以使代碼更加的簡短,更加的好讀。

3. lambda表達式初接觸

具體看一個例子,求數組中的最大值,如果資料量太大,想要處理更高效,jdk8以前,隻能自己建立線程池,自己拆分,而jdk8以後隻需要加上parallel(),意思就是告訴它我要多線程的處理該資料,以此可以看到他的魅力

public class MinDemo {
    public static void main(String[] args) {
        int[] arr = {15,24,12,451,156};
        int min = Integer.MAX_VALUE;
        for (int a :
                arr) {
            if (a < min) {
                min = a;
            }
        }
        System.out.println(min);

        //jdk8 lambda,parallel()多線程處理
        int min2 = IntStream.of(arr).parallel().min().getAsInt();
        System.out.println(min2);

    }
           

4. 當然還有很多其他的特性,這裡隻簡單介紹一下

  • jdk8接口新特性
    1. 接口裡隻有一個要實作的方法,單一責任制
    2. 新增預設方法
  • 函數接口
    1. 隻需要知道輸入輸出的類型
    2. 支援鍊式操作
  • 方法引用
    1. 靜态方法引用
    2. 非靜态方法引用
    3. 構造方法引用
  • 級聯表達式和柯裡化
    1. 級聯表達式是傳回函數的函數
    2. 柯裡化把多個參數的函數轉換為隻有一個參數的函數
  • 變量引用
    1. 引用外邊的變量必須是final類型

5. 以下是函數式程式設計常用的接口

Webflux響應式程式設計(SpringBoot 2.0新特性)——完整版

二、 Stream流程式設計

1. 是什麼,不是什麼

是一個進階的疊代器,不是一個資料結構、不是一個集合、不會存放資料、關注的是怎麼把資料高效處理

2. 建立/中間操作/終止操作

1) 建立

Webflux響應式程式設計(SpringBoot 2.0新特性)——完整版

代碼示範

List<String> list = new ArrayList<>();

		// 從集合建立
		list.stream();
		list.parallelStream();

		// 從數組建立
		Arrays.stream(new int[] { 2, 3, 5 });

		// 建立數字流
		IntStream.of(1, 2, 3);
		IntStream.rangeClosed(1, 10);

		// 使用random建立一個無限流
		new Random().ints().limit(10);
		Random random = new Random();

		// 自己産生流
		Stream.generate(() -> random.nextInt()).limit(20);
           

2) 中間操作

Webflux響應式程式設計(SpringBoot 2.0新特性)——完整版
String str = "my name is AlgerFan";

		System.out.println("--------------filter------------");
		// 把每個單詞的長度調用出來
		Stream.of(str.split(" ")).filter(s -> s.length() > 2)
				.map(String::length).forEach(System.out::println);

		System.out.println("--------------flatMap------------");
		// flatMap A->B屬性(是個集合), 最終得到所有的A元素裡面的所有B屬性集合
		// intStream/longStream 并不是Stream的子類, 是以要進行裝箱 boxed
		Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed())
				.forEach(i -> System.out.println((char) i.intValue()));

		System.out.println("--------------peek------------");
		// peek 用于debug. 是個中間操作,和 forEach 是終止操作
		Stream.of(str.split(" ")).peek(System.out::println)
				.forEach(System.out::println);

		System.out.println("--------------limit------------");
		// limit 使用, 主要用于無限流
		new Random().ints().filter(i -> i > 100 && i < 1000).limit(5)
				.forEach(System.out::println);
           

3) 終止操作

Webflux響應式程式設計(SpringBoot 2.0新特性)——完整版
String str = "my name is AlgerFan";

		System.out.println("-------并行流parallel--------");
		// 使用并行流
		str.chars().parallel().forEach(i -> System.out.print((char) i));
		System.out.println();
		// 使用 forEachOrdered 保證順序
		str.chars().parallel().forEachOrdered(i -> System.out.print((char) i));
		System.out.println();

		System.out.println("-------collect收集到list--------");
		// 收集到list
		List<String> list = Stream.of(str.split(" "))
				.collect(Collectors.toList());
		System.out.println(list);

		System.out.println("-------使用 reduce 拼接字元串--------");
		// 使用 reduce 拼接字元串
		Optional<String> letters = Stream.of(str.split(" "))
				.reduce((s1, s2) -> s1 + "|" + s2);
		System.out.println(letters.orElse(""));

		System.out.println("-------帶初始化值的reduce--------");
		// 帶初始化值的reduce
		String reduce = Stream.of(str.split(" ")).reduce("",
				(s1, s2) -> s1 + "|" + s2);
		System.out.println(reduce);

		System.out.println("-------計算所有單詞總長度--------");
		// 計算所有單詞總長度
		Integer length = Stream.of(str.split(" ")).map(s -> s.length())
				.reduce(0, (s1, s2) -> s1 + s2);
		System.out.println(length);

		System.out.println("-------max 的使用--------");
		// max 的使用
		Optional<String> max = Stream.of(str.split(" "))
				.max((s1, s2) -> s1.length() - s2.length());
		System.out.println(max.get());

		System.out.println("-------使用 findFirst 短路操作--------");
		// 使用 findFirst 短路操作
		OptionalInt findFirst = new Random().ints().findFirst();
		System.out.println(findFirst.getAsInt());
           
  1. 并行流

    以上已經接觸了parallel()并行流,能夠多線程的處理資料

  2. 收集器

    示例代碼:

// 測試資料
		List<Student> students = Arrays.asList(
				new Student("小明", 10, Gender.MALE, Grade.ONE),
				new Student("大明", 9, Gender.MALE, Grade.THREE),
				new Student("小白", 8, Gender.FEMALE, Grade.TWO),
				new Student("小黑", 13, Gender.FEMALE, Grade.FOUR),
				new Student("小紅", 7, Gender.FEMALE, Grade.THREE),
				new Student("小黃", 13, Gender.MALE, Grade.ONE),
				new Student("小青", 13, Gender.FEMALE, Grade.THREE),
				new Student("小紫", 9, Gender.FEMALE, Grade.TWO),
				new Student("小王", 6, Gender.MALE, Grade.ONE),
				new Student("小李", 6, Gender.MALE, Grade.ONE),
				new Student("小馬", 14, Gender.FEMALE, Grade.FOUR),
				new Student("小劉", 13, Gender.MALE, Grade.FOUR));

		// 得到所有學生的年齡清單
		// s -> s.getAge() --> Student::getAge , 不會多生成一個類似 lambda$0這樣的函數
		Set<Integer> ages = students.stream().map(Student::getAge)
				.collect(Collectors.toCollection(TreeSet::new));
		System.out.println("所有學生的年齡:" + ages);

		// 統計彙總資訊
		IntSummaryStatistics agesSummaryStatistics = students.stream()
				.collect(Collectors.summarizingInt(Student::getAge));
		System.out.println("年齡彙總資訊:" + agesSummaryStatistics);

		// 分塊
		Map<Boolean, List<Student>> genders = students.stream().collect(
				Collectors.partitioningBy(s -> s.getGender() == Gender.MALE));
		System.out.println("男女學生清單:" + genders);

		// 分組
		Map<Grade, List<Student>> grades = students.stream()
				.collect(Collectors.groupingBy(Student::getGrade));
		System.out.println("學生班級清單:" + grades);

		// 得到所有班級學生的個數
		Map<Grade, Long> gradesCount = students.stream().collect(Collectors
				.groupingBy(Student::getGrade, Collectors.counting()));
		System.out.println("班級學生個數清單:" + gradesCount);
           

測試結果

所有學生的年齡:[6, 7, 8, 9, 10, 13, 14]
年齡彙總資訊:IntSummaryStatistics{count=12, sum=121, min=6, average=10.083333, max=14}
男女學生清單:{false=[[name=小白, age=8, gender=FEMALE, grade=TWO], [name=小黑, age=13, gender=FEMALE, grade=FOUR], [name=小紅, age=7, gender=FEMALE, grade=THREE], [name=小青, age=13, gender=FEMALE, grade=THREE], [name=小紫, age=9, gender=FEMALE, grade=TWO], [name=小馬, age=14, gender=FEMALE, grade=FOUR]], true=[[name=小明, age=10, gender=MALE, grade=ONE], [name=大明, age=9, gender=MALE, grade=THREE], [name=小黃, age=13, gender=MALE, grade=ONE], [name=小王, age=6, gender=MALE, grade=ONE], [name=小李, age=6, gender=MALE, grade=ONE], [name=小劉, age=13, gender=MALE, grade=FOUR]]}
學生班級清單:{FOUR=[[name=小黑, age=13, gender=FEMALE, grade=FOUR], [name=小馬, age=14, gender=FEMALE, grade=FOUR], [name=小劉, age=13, gender=MALE, grade=FOUR]], ONE=[[name=小明, age=10, gender=MALE, grade=ONE], [name=小黃, age=13, gender=MALE, grade=ONE], [name=小王, age=6, gender=MALE, grade=ONE], [name=小李, age=6, gender=MALE, grade=ONE]], THREE=[[name=大明, age=9, gender=MALE, grade=THREE], [name=小紅, age=7, gender=FEMALE, grade=THREE], [name=小青, age=13, gender=FEMALE, grade=THREE]], TWO=[[name=小白, age=8, gender=FEMALE, grade=TWO], [name=小紫, age=9, gender=FEMALE, grade=TWO]]}
班級學生個數清單:{FOUR=3, ONE=4, THREE=3, TWO=2}
           
  1. 運作機制

    示範一個測試代碼

Random random = new Random();
		// 随機産生資料
		Stream<Integer> stream = Stream.generate(random::nextInt)
				// 産生300個 ( 無限流需要短路操作. )
				.limit(300)
				// 第1個無狀态操作,print(s)執行耗時操作5s
				.peek(s -> print("peek: " + s))
				// 第2個無狀态操作
				.filter(s -> {
					print("filter: " + s);
					return s > 1000000;
				})
				// 有狀态操作
				/*.sorted((i1, i2) -> {
					print("排序: " + i1 + ", " + i2);
					return i1.compareTo(i2);
				})*/
				// 又一個無狀态操作
				.peek(s -> {
					print("peek2: " + s);
				});

		// 終止操作
		stream.count();
           

分析以上代碼,發現Stream建立了一個256長度的數組

  1. 所有操作是鍊式調用, 一個元素隻疊代一次
  2. 每一個中間操作傳回一個新的流. 流裡面有一個屬性sourceStage

    指向同一個 地方,就是Head

  3. Head->nextStage->nextStage->… -> null
  4. 有狀态操作會把無狀态操作階段,單獨處理
  5. 并行環境下, 有狀态的中間操作不一定能并行操作.
  6. parallel/ sequetial 這2個操作也是中間操作(也是傳回stream)

    但是他們不建立流, 他們隻修改 Head的并行标志

三、Reactive stream 響應式流

  • Reactive stream是jdk9新特性,提供了一套API,就是一種訂閱釋出者模式
  • 被壓,背壓是指在異步場景中,釋出者發送事件速度遠快于訂閱者的處理速度的情況下,一種告訴上遊的釋出者降低發送速度的政策,簡而言之,背壓就是一種流速控制的政策。

    舉個例子:假設以前是沒有水龍頭的,隻能自來水廠主動的往使用者輸送水,但是不知道使用者需要多少水,有了Reactive stream,就相當于有了水龍頭,使用者可以主動的請求用水,而自來水廠也知道了使用者的需求

    示例代碼(需要jdk9以上版本的支援)

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;

public class FlowDemo {
    public static void main(String[] args) throws Exception {
        // 1. 定義釋出者, 釋出的資料類型是 Integer
        // 直接使用jdk自帶的SubmissionPublisher, 它實作了 Publisher 接口
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
        
        // 2. 定義訂閱者
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 儲存訂閱關系, 需要用它來給釋出者響應
                this.subscription = subscription;
                // 請求一個資料
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接受到一個資料, 處理
                System.out.println("接受到資料: " + item);
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                // 處理完調用request再請求一個資料
                this.subscription.request(1);
                // 或者 已經達到了目标, 調用cancel告訴釋出者不再接受資料了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現了異常(例如處理資料的時候産生了異常)
                throwable.printStackTrace();
                // 我們可以告訴釋出者, 後面不接受資料了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部資料處理完了(釋出者關閉了)
                System.out.println("處理完了!");
            }
        };

        // 3. 釋出者和訂閱者 建立訂閱關系
        publiser.subscribe(subscriber);

        // 4. 生産資料, 并釋出
        // 這裡忽略資料生産過程
        for (int i = 0; i < 1000; i++) {
            System.out.println("生成資料:" + i);
            // submit是個block方法
            publiser.submit(i);
        }

        publiser.submit(111);
        publiser.submit(222);
        publiser.submit(333);

        // 5. 結束後 關閉釋出者
        // 正式環境 應該放 finally 或者使用 try-resouce 確定關閉
        publiser.close();

        // 主線程延遲停止, 否則資料沒有消費就退出
        Thread.currentThread().join(1000);
    }
}

           

四、Webflux響應式程式設計

先來一張圖,這是spring文檔的一張截圖,介紹了spring如今的兩種開發模式,MVC和webflux兩種開發模式,可見webflux的重要性

Webflux響應式程式設計(SpringBoot 2.0新特性)——完整版

1. 初識SpringWebFlux

webflux 是spring5推出的一種響應式Web架構,它是一種非阻塞的開發模式,可以在一個線程裡處理多個請求(非阻塞),運作在netty環境,也可以可以運作在servlet3.1之後的容器,支援異步servlet, 可以支援更高的并發量

2. 異步servlet

  • 我們知道同步servlet阻塞了Tomcat容器的線程,當一個網絡請求到我們的Tomcat容器之後,容器會給每個請求啟動一個線程去處理,線程裡面會調用一個servlet去處理,當使用同步servlet時,業務代碼花多長時間,你的線程就要等待多長時間,這就是堵塞(同步和異步是伺服器背景才有異步這個概念,對于浏覽器來說所有的請求都是異步,前台都要花費業務邏輯時間)
  • 異步servlet的主要作用是它不會堵塞Tomcat容器的servlet線程,它可以把一些耗時的操作放在一個獨立的線程池,那麼我們的servlet就可以立馬傳回,處理下一個請求,以此就可以達到高并發。

    通過代碼比較一下同步servlet與異步servlet

同步servlet

@WebServlet(urlPatterns = "/SyncServlet")
public class SyncServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    public SyncServlet() {
        super();
    }

    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        doGet(request, response);
    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        long t1 = System.currentTimeMillis();
        // 執行業務代碼
        doSomeThing(request, response);
        System.out.println("sync use:" + (System.currentTimeMillis() - t1));
    }
    private void doSomeThing(HttpServletRequest request,
                             HttpServletResponse response) throws IOException {
        // 模拟耗時操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        response.getWriter().append("done");
    }
}

           

異步servlet

@WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" })
public class AsyncServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    public AsyncServlet() {
        super();
    }

    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        doGet(request, response);
    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        long t1 = System.currentTimeMillis();

        // 開啟異步
        AsyncContext asyncContext = request.startAsync();

        // 執行業務代碼,放入一個線程池裡
        CompletableFuture.runAsync(() -> doSomeThing(asyncContext,
                asyncContext.getRequest(), asyncContext.getResponse()));

        System.out.println("async use:" + (System.currentTimeMillis() - t1));
    }
    private void doSomeThing(AsyncContext asyncContext,
                             ServletRequest servletRequest, ServletResponse servletResponse) {

        // 模拟耗時操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            servletResponse.getWriter().append("done");
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 業務代碼處理完畢, 通知結束
        asyncContext.complete();
    }
}
           
Webflux響應式程式設計(SpringBoot 2.0新特性)——完整版
  • 通過以上兩段代碼控制台的列印結果可以看出,異步servlet把耗時操作放在一個獨立的線程池,那麼我們的servlet就可以立馬傳回,處理下一個請求。

3. CRUD完整示例

  • 通過下圖可以看出MVC和wenflux的差別
    Webflux響應式程式設計(SpringBoot 2.0新特性)——完整版
  • 以下通過一個例子了解一下webflux開發
  1. 實體類
@Document(collection = "user")
@Data
public class User {

	@Id
	private String id;

	@NotBlank
	private String name;

	@Range(min=10, max=100)
	private int age;

}
           
  1. Controller層
@RestController
@RequestMapping("/user")
public class UserController {

	private final UserRepository repository;

	public UserController(UserRepository repository) {
		this.repository = repository;
	}

	/**
	 * 以數組形式一次性傳回資料
	 */
	@GetMapping("/")
	public Flux<User> getAll() {
		return repository.findAll();
	}

	/**
	 * 以SSE形式多次傳回資料
	 */
	@GetMapping(value = "/stream/all", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public Flux<User> streamGetAll() {
		return repository.findAll();
	}

	/**
	 * 新增資料
	 */
	@PostMapping("/")
	public Mono<User> createUser(@Valid @RequestBody User user) {
		// spring data jpa 裡面, 新增和修改都是save. 有id是修改, id為空是新增
		// 根據實際情況是否置空id
		user.setId(null);
		CheckUtil.checkName(user.getName());
		return this.repository.save(user);
	}

	/**
	 * 根據id删除使用者 存在的時候傳回200, 不存在傳回404
	 */
	@DeleteMapping("/{id}")
	public Mono<ResponseEntity<Void>> deleteUser(
			@PathVariable("id") String id) {
		// deletebyID 沒有傳回值, 不能判斷資料是否存在
		// this.repository.deleteById(id)
		return this.repository.findById(id)
				// 當你要操作資料, 并傳回一個Mono 這個時候使用flatMap
				// 如果不操作資料, 隻是轉換資料, 使用map
				.flatMap(user -> this.repository.delete(user).then(
						Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
				.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
	}

	/**
	 * 修改資料 存在的時候傳回200 和修改後的資料, 不存在的時候傳回404
	 */
	@PutMapping("/{id}")
	public Mono<ResponseEntity<User>> updateUser(@PathVariable("id") String id,
			@Valid @RequestBody User user) {
		CheckUtil.checkName(user.getName());
		return this.repository.findById(id)
				// flatMap 操作資料
				.flatMap(u -> {
					u.setAge(user.getAge());
					u.setName(user.getName());
					return this.repository.save(u);
				})
				// map: 轉換資料
				.map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
				.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
	}

	/**
	 * 根據ID查找使用者 存在傳回使用者資訊, 不存在傳回404
	 */
	@GetMapping("/{id}")
	public Mono<ResponseEntity<User>> findUserById(
			@PathVariable("id") String id) {
		return this.repository.findById(id)
				.map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
				.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
	}

	/**
	 * 根據年齡查找使用者
	 */
	@GetMapping("/age/{start}/{end}")
	public Flux<User> findByAge(@PathVariable("start") int start,
			@PathVariable("end") int end) {
		return this.repository.findByAgeBetween(start, end);
	}

	/**
	 * 根據年齡查找使用者
	 */
	@GetMapping(value = "/stream/age/{start}/{end}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public Flux<User> streamFindByAge(@PathVariable("start") int start,
			@PathVariable("end") int end) {
		return this.repository.findByAgeBetween(start, end);
	}
	
	/**
	 *  得到20-30使用者
	 */
	@GetMapping("/old")

	public Flux<User> oldUser() {
		return this.repository.oldUser();
	}

	/**
	 * 得到20-30使用者
	 */
	@GetMapping(value = "/stream/old", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public Flux<User> streamOldUser() {
		return this.repository.oldUser();
	}

}
           
  1. Repository層
@Repository
public interface UserRepository extends ReactiveMongoRepository<User, String> {

	/**
	 * 根據年齡查找使用者
	 */
	Flux<User> findByAgeBetween(int start, int end);
	
	@Query("{'age':{ '$gte': 20, '$lte' : 30}}")
	Flux<User> oldUser();
}
           
  • 以上代碼沒有進行校驗,當然沒有校驗的代碼是不能用的,校驗代碼我就不放了,想了解的GitHub上有完整代碼。