天天看點

SpringBoot項目實作釋出訂閱模式,真的很簡單

作者:IT網際網路新資訊

大家好,我是老三,在項目裡,經常會有一些主線業務之外的其它業務,比如,下單之後,發送通知、監控埋點、記錄日志……

這些非核心業務,如果全部一梭子寫下去,有兩個問題,一個是業務耦合,一個是串行耗時。

SpringBoot項目實作釋出訂閱模式,真的很簡單

下單之後的邏輯

是以,一般在開發的時候,都會把這些操作å抽象成觀察者模式,也就是釋出/訂閱模式(這裡就不讨論觀察者模式和釋出/訂閱模式的不同),而且一般會采用多線程的方式來異步執行這些觀察者方法。

SpringBoot項目實作釋出訂閱模式,真的很簡單

觀察者模式

一開始,我們都是自己去寫觀察者模式。

自己實作觀察者模式

觀察者簡圖

觀察者

  • 觀察者定義接口
/**
 * @Author: fighter3
 * @Description: 觀察者接口
 * @Date: 2022/11/7 11:40 下午
 */
public interface OrderObserver {

    void afterPlaceOrder(PlaceOrderMessage placeOrderMessage);
}
           
  • 具體觀察者@Slf4j

    public class OrderMetricsObserver implements OrderObserver {

    @Override

    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {

    log.info("[afterPlaceOrder] metrics");

    }

    }

    @Slf4j

    public class OrderLogObserver implements OrderObserver{

    @Override

    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {

    log.info("[afterPlaceOrder] log.");

    }

    }

    @Slf4j

    public class OrderNotifyObserver implements OrderObserver{

    @Override

    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {

    log.info("[afterPlaceOrder] notify.");

    }

    }

    • 業務通知觀察者
    • 日志記錄觀察者
    • 監控埋點觀察者

被觀察者

  • 消息實體定義
@Data
public class PlaceOrderMessage implements Serializable {
    /**
     * 訂單号
     */
    private String orderId;
    /**
     * 訂單狀态
     */
    private Integer orderStatus;
    /**
     * 下單使用者ID
     */
    private String userId;
    //……
}
           
  • 被觀察者抽象類
public abstract class OrderSubject {
    //定義一個觀察者清單
    private List<OrderObserver> orderObserverList = new ArrayList<>();
    //定義一個線程池,這裡參數随便寫的
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));

    //增加一個觀察者
    public void addObserver(OrderObserver o) {
        this.orderObserverList.add(o);
    }

    //删除一個觀察者
    public void delObserver(OrderObserver o) {
        this.orderObserverList.remove(o);
    }

    //通知所有觀察者
    public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
        for (OrderObserver orderObserver : orderObserverList) {
            //利用多線程異步執行
            threadPoolExecutor.execute(() -> {
                orderObserver.afterPlaceOrder(placeOrderMessage);
            });
        }
    }
}
           

這裡利用了多線程,來異步執行觀察者。

  • 被觀察者實作類
/**
 * @Author: fighter3
 * @Description: 訂單實作類-被觀察者實作類
 * @Date: 2022/11/7 11:52 下午
 */
@Service
@Slf4j
public class OrderServiceImpl extends OrderSubject implements OrderService {

    /**
     * 下單
     */
    @Override
    public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
        PlaceOrderResVO resVO = new PlaceOrderResVO();
        //添加觀察者
        this.addObserver(new OrderMetricsObserver());
        this.addObserver(new OrderLogObserver());
        this.addObserver(new OrderNotifyObserver());
        //通知觀察者
        this.notifyObservers(new PlaceOrderMessage());
        log.info("[placeOrder] end.");
        return resVO;
    }
}
           

測試

@Test
    @DisplayName("下單")
    void placeOrder() {
        PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
        orderService.placeOrder(placeOrderReqVO);
    }
           
  • 測試執行結果
2022-11-08 00:11:13.617  INFO 20235 --- [pool-1-thread-1] c.f.obverser.OrderMetricsObserver        : [afterPlaceOrder] metrics
2022-11-08 00:11:13.618  INFO 20235 --- [           main] cn.fighter3.obverser.OrderServiceImpl    : [placeOrder] end.
2022-11-08 00:11:13.618  INFO 20235 --- [pool-1-thread-3] c.fighter3.obverser.OrderNotifyObserver  : [afterPlaceOrder] notify.
2022-11-08 00:11:13.617  INFO 20235 --- [pool-1-thread-2] cn.fighter3.obverser.OrderLogObserver    : [afterPlaceOrder] log.
           

可以看到,觀察者是異步執行的。

利用Spring精簡

可以看到,觀察者模式寫起來還是比較簡單的,但是既然都用到了Spring來管理Bean的生命周期,代碼還可以更精簡一些。

SpringBoot項目實作釋出訂閱模式,真的很簡單

Spring精簡觀察者模式

觀察者實作類:定義成Bean

  • OrderLogObserver@Slf4j

    @Service

    public class OrderLogObserver implements OrderObserver {

    @Override

    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {

    log.info("[afterPlaceOrder] log.");

    }

    }

  • OrderMetricsObserver
@Slf4j
@Service
public class OrderMetricsObserver implements OrderObserver {

    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
        log.info("[afterPlaceOrder] metrics");
    }
}
           
  • OrderNotifyObserver
@Slf4j
@Service
public class OrderNotifyObserver implements OrderObserver {

    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
        log.info("[afterPlaceOrder] notify.");
    }
}
           

被觀察者:自動注入Bean

  • OrderSubjectpublic abstract class OrderSubject {

    /**

    * 利用Spring的特性直接注入觀察者

    */

    @Autowired

    protected List<OrderObserver> orderObserverList;

    //定義一個線程池,這裡參數随便寫的

    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));

    //通知所有觀察者

    public void notifyObservers(PlaceOrderMessage placeOrderMessage) {

    for (OrderObserver orderObserver : orderObserverList) {

    //利用多線程異步執行

    threadPoolExecutor.execute(() -> {

    orderObserver.afterPlaceOrder(placeOrderMessage);

    });

    }

    }

    }

  • OrderServiceImpl
@Service
@Slf4j
public class OrderServiceImpl extends OrderSubject implements OrderService {

    /**
     * 實作類裡也要注入一下
     */
    @Autowired
    private List<OrderObserver> orderObserverList;

    /**
     * 下單
     */
    @Override
    public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
        PlaceOrderResVO resVO = new PlaceOrderResVO();
        //通知觀察者
        this.notifyObservers(new PlaceOrderMessage());
        log.info("[placeOrder] end.");
        return resVO;
    }
}
           

這樣一來,發現被觀察者又簡潔了很多,但是後來我發現,在SpringBoot項目裡,利用Spring事件驅動驅動模型(event)模型來實作,更加地簡練。

Spring Event實作釋出/訂閱模式

Spring Event對釋出/訂閱模式進行了封裝,使用起來更加簡單,還是以我們這個場景為例,看看怎麼來實作吧。

自定義事件

  • PlaceOrderEvent:繼承ApplicationEvent,并重寫構造函數。ApplicationEvent是Spring提供的所有應用程式事件擴充類。
public class PlaceOrderEvent extends ApplicationEvent {

    public PlaceOrderEvent(PlaceOrderEventMessage source) {
        super(source);
    }
}
           
  • PlaceOrderEventMessage:事件消息,定義了事件的消息體。
@Data
public class PlaceOrderEventMessage implements Serializable {
    /**
     * 訂單号
     */
    private String orderId;
    /**
     * 訂單狀态
     */
    private Integer orderStatus;
    /**
     * 下單使用者ID
     */
    private String userId;
    //……
}
           

事件監聽者

事件監聽者,有兩種實作方式,一種是實作ApplicationListener接口,另一種是使用@EventListener注解。

SpringBoot項目實作釋出訂閱模式,真的很簡單

事件監聽者實作

實作ApplicationListener接口

實作ApplicationListener接口,重寫onApplicationEvent方法,将類定義為Bean,這樣,一個監聽者就完成了。

  • OrderLogListener
@Slf4j
@Service
public class OrderLogListener implements ApplicationListener<PlaceOrderEvent> {

    @Override
    public void onApplicationEvent(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] log.");
    }
}
           
  • OrderMetricsListener
@Slf4j
@Service
public class OrderMetricsListener implements ApplicationListener<PlaceOrderEvent> {

    @Override
    public void onApplicationEvent(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] metrics");
    }
}
           
  • OrderNotifyListener
@Slf4j
@Service
public class OrderNotifyListener implements ApplicationListener<PlaceOrderEvent> {

    @Override
    public void onApplicationEvent(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] notify.");
    }
}
           

使用@EventListener注解

使用@EventListener注解就更簡單了,直接在方法上,加上@EventListener注解就行了。

  • OrderLogListener@Slf4j

    @Service

    public class OrderLogListener {

    @EventListener

    public void orderLog(PlaceOrderEvent event) {

    log.info("[afterPlaceOrder] log.");

    }

    }

  • OrderMetricsListener@Slf4j

    @Service

    public class OrderMetricsListener {

    @EventListener

    public void metrics(PlaceOrderEvent event) {

    log.info("[afterPlaceOrder] metrics");

    }

    }

  • OrderNotifyListener@Slf4j

    @Service

    public class OrderNotifyListener{

    @EventListener

    public void notify(PlaceOrderEvent event) {

    log.info("[afterPlaceOrder] notify.");

    }

    }

異步和自定義線程池

異步執行

異步執行也非常簡單,使用Spring的異步注解@Async就可以了。例如:

  • OrderLogListener
@Slf4j
@Service
public class OrderLogListener  {

    @EventListener
    @Async
    public void orderLog(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] log.");
    }
}
           

當然,還需要開啟異步,SpringBoot項目預設是沒有開啟異步的,我們需要手動配置開啟異步功能,很簡單,隻需要在配置類上加上@EnableAsync注解就行了,這個注解用于聲明啟用Spring的異步方法執行功能,需要和@Configuration注解一起使用,也可以直接加在啟動類上。

@SpringBootApplication
@EnableAsync
public class DailyApplication {

    public static void main(String[] args) {
        SpringApplication.run(DairlyLearnApplication.class, args);
    }

}
           

自定義線程池

使用@Async的時候,一般都會自定義線程池,因為@Async的預設線程池為SimpleAsyncTaskExecutor,不是真的線程池,這個類不重用線程,預設每次調用都會建立一個新的線程。

自定義線程池有三種方式:

SpringBoot項目實作釋出訂閱模式,真的很簡單

@Async自定義線程池

  • 實作接口AsyncConfigurer
  • 繼承AsyncConfigurerSupport
  • 配置由自定義的TaskExecutor替代内置的任務執行器

我們來看看三種寫法:

  • 實作接口AsyncConfigurer
@Configuration
@Slf4j
public class AsyncConfiguration implements AsyncConfigurer {

    @Bean("fighter3AsyncExecutor")
    public ThreadPoolTaskExecutor executor() {
        //Spring封裝的一個線程池
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //随便寫的一些配置
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(30);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("fighter3AsyncExecutor-");
        executor.initialize();
        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return executor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
    }
}

           
  • 繼承AsyncConfigurerSupport
@Configuration
@Slf4j
public class SpringAsyncConfigurer extends AsyncConfigurerSupport {

    @Bean
    public ThreadPoolTaskExecutor asyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        //随便寫的一些配置
        threadPool.setCorePoolSize(10);
        threadPool.setMaxPoolSize(30);
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        threadPool.setAwaitTerminationSeconds(60 * 15);
        return threadPool;
    }

    @Override
    public Executor getAsyncExecutor() {
        return asyncExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
    }
}
           
  • 配置自定義的TaskExecutor@Slf4j

    @Service

    public class OrderLogListener {

    @EventListener

    @Async("asyncExecutor")

    public void orderLog(PlaceOrderEvent event) {

    log.info("[afterPlaceOrder] log.");

    }

    }

    • 配置線程池@Configuration

      public class TaskPoolConfig {

      @Bean(name = "asyncExecutor")

      public Executor taskExecutor() {

      ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

      //随便寫的一些配置

      executor.setCorePoolSize(10);

      executor.setMaxPoolSize(20);

      executor.setQueueCapacity(200);

      executor.setKeepAliveSeconds(60);

      executor.setThreadNamePrefix("asyncExecutor-");

      executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

      return executor;

      }

      }

    • 使用@Async注解的時候,指定線程池,推薦使用這種方式,因為在項目裡,盡量做到線程池隔離,不同的任務使用不同的線程池
異步和自定義線程池這一部分隻是一些擴充,稍微占了一些篇幅,大家可不要覺得Spring Event用起來很繁瑣。

釋出事件

釋出事件也非常簡單,隻需要使用Spring 提供的ApplicationEventPublisher來釋出自定義事件。

  • OrderServiceImpl@Service

    @Slf4j

    public class OrderServiceImpl implements OrderService {

    @Autowired

    private ApplicationEventPublisher applicationEventPublisher;

    /**

    * 下單

    */

    @Override

    public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {

    log.info("[placeOrder] start.");

    PlaceOrderResVO resVO = new PlaceOrderResVO();

    //消息

    PlaceOrderEventMessage eventMessage = new PlaceOrderEventMessage();

    //釋出事件

    applicationEventPublisher.publishEvent(new PlaceOrderEvent(eventMessage));

    log.info("[placeOrder] end.");

    return resVO;

    }

    }

在Idea裡檢視事件的監聽者也比較友善,點選下面圖中的圖示,就可以檢視監聽者。

SpringBoot項目實作釋出訂閱模式,真的很簡單

檢視監聽者

SpringBoot項目實作釋出訂閱模式,真的很簡單

監聽者

測試

最後,我們還是測試一下。

@Test
    void placeOrder() {
        PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
        orderService.placeOrder(placeOrderReqVO);
    }
           
  • 執行結果
2022-11-08 10:05:14.415  INFO 22674 --- [           main] c.f.o.event.event.OrderServiceImpl       : [placeOrder] start.
2022-11-08 10:05:14.424  INFO 22674 --- [           main] c.f.o.event.event.OrderServiceImpl       : [placeOrder] end.
2022-11-08 10:05:14.434  INFO 22674 --- [sync-executor-3] c.f.o.event.event.OrderNotifyListener    : [afterPlaceOrder] notify.
2022-11-08 10:05:14.435  INFO 22674 --- [sync-executor-2] c.f.o.event.event.OrderMetricsListener   : [afterPlaceOrder] metrics
2022-11-08 10:05:14.436  INFO 22674 --- [sync-executor-1] c.f.o.event.event.OrderLogListener       : [afterPlaceOrder] log.
           

可以看到,異步執行,而且用到了我們自定義的線程池。

小結

這篇文章裡,從最開始自己實作的觀察者模式,再到利用Spring簡化的觀察者模式,再到使用Spring Event實作釋出/訂閱模式,可以看到,Spring Event用起來還是比較簡單的。除此之外,還有Guava EventBus這樣的事件驅動實作,大家更習慣使用哪種呢?

小結

這篇文章裡,從最開始自己實作的觀察者模式,再到利用Spring簡化的觀察者模式,再到使用Spring Event實作釋出/訂閱模式,可以看到,Spring Event用起來還是比較簡單的。除此之外,還有Guava EventBus這樣的事件驅動實作,大家更習慣使用哪種呢?

原文:https://mp.weixin.qq.com/s/6l0LozzdPz4Pv7dGpckqNg

如果感覺本文對你有幫助,點贊關注支援一下