大家好,我是老三,在項目裡,經常會有一些主線業務之外的其它業務,比如,下單之後,發送通知、監控埋點、記錄日志……
這些非核心業務,如果全部一梭子寫下去,有兩個問題,一個是業務耦合,一個是串行耗時。
下單之後的邏輯
是以,一般在開發的時候,都會把這些操作å抽象成觀察者模式,也就是釋出/訂閱模式(這裡就不讨論觀察者模式和釋出/訂閱模式的不同),而且一般會采用多線程的方式來異步執行這些觀察者方法。
觀察者模式
一開始,我們都是自己去寫觀察者模式。
自己實作觀察者模式
觀察者簡圖
觀察者
- 觀察者定義接口
/**
* @Author: fighter3
* @Description: 觀察者接口
* @Date: 2022/11/7 11:40 下午
*/
public interface OrderObserver {
void afterPlaceOrder(PlaceOrderMessage placeOrderMessage);
}
-
具體觀察者@Slf4j
public class OrderMetricsObserver implements OrderObserver {
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] metrics");
}
}
@Slf4j
public class OrderLogObserver implements OrderObserver{
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] log.");
}
}
@Slf4j
public class OrderNotifyObserver implements OrderObserver{
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] notify.");
}
}
- 業務通知觀察者
- 日志記錄觀察者
- 監控埋點觀察者
被觀察者
- 消息實體定義
@Data
public class PlaceOrderMessage implements Serializable {
/**
* 訂單号
*/
private String orderId;
/**
* 訂單狀态
*/
private Integer orderStatus;
/**
* 下單使用者ID
*/
private String userId;
//……
}
- 被觀察者抽象類
public abstract class OrderSubject {
//定義一個觀察者清單
private List<OrderObserver> orderObserverList = new ArrayList<>();
//定義一個線程池,這裡參數随便寫的
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
//增加一個觀察者
public void addObserver(OrderObserver o) {
this.orderObserverList.add(o);
}
//删除一個觀察者
public void delObserver(OrderObserver o) {
this.orderObserverList.remove(o);
}
//通知所有觀察者
public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
for (OrderObserver orderObserver : orderObserverList) {
//利用多線程異步執行
threadPoolExecutor.execute(() -> {
orderObserver.afterPlaceOrder(placeOrderMessage);
});
}
}
}
這裡利用了多線程,來異步執行觀察者。
- 被觀察者實作類
/**
* @Author: fighter3
* @Description: 訂單實作類-被觀察者實作類
* @Date: 2022/11/7 11:52 下午
*/
@Service
@Slf4j
public class OrderServiceImpl extends OrderSubject implements OrderService {
/**
* 下單
*/
@Override
public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
PlaceOrderResVO resVO = new PlaceOrderResVO();
//添加觀察者
this.addObserver(new OrderMetricsObserver());
this.addObserver(new OrderLogObserver());
this.addObserver(new OrderNotifyObserver());
//通知觀察者
this.notifyObservers(new PlaceOrderMessage());
log.info("[placeOrder] end.");
return resVO;
}
}
測試
@Test
@DisplayName("下單")
void placeOrder() {
PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
orderService.placeOrder(placeOrderReqVO);
}
- 測試執行結果
2022-11-08 00:11:13.617 INFO 20235 --- [pool-1-thread-1] c.f.obverser.OrderMetricsObserver : [afterPlaceOrder] metrics
2022-11-08 00:11:13.618 INFO 20235 --- [ main] cn.fighter3.obverser.OrderServiceImpl : [placeOrder] end.
2022-11-08 00:11:13.618 INFO 20235 --- [pool-1-thread-3] c.fighter3.obverser.OrderNotifyObserver : [afterPlaceOrder] notify.
2022-11-08 00:11:13.617 INFO 20235 --- [pool-1-thread-2] cn.fighter3.obverser.OrderLogObserver : [afterPlaceOrder] log.
可以看到,觀察者是異步執行的。
利用Spring精簡
可以看到,觀察者模式寫起來還是比較簡單的,但是既然都用到了Spring來管理Bean的生命周期,代碼還可以更精簡一些。
Spring精簡觀察者模式
觀察者實作類:定義成Bean
-
OrderLogObserver@Slf4j
@Service
public class OrderLogObserver implements OrderObserver {
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] log.");
}
}
- OrderMetricsObserver
@Slf4j
@Service
public class OrderMetricsObserver implements OrderObserver {
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] metrics");
}
}
- OrderNotifyObserver
@Slf4j
@Service
public class OrderNotifyObserver implements OrderObserver {
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] notify.");
}
}
被觀察者:自動注入Bean
-
OrderSubjectpublic abstract class OrderSubject {
/**
* 利用Spring的特性直接注入觀察者
*/
@Autowired
protected List<OrderObserver> orderObserverList;
//定義一個線程池,這裡參數随便寫的
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
//通知所有觀察者
public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
for (OrderObserver orderObserver : orderObserverList) {
//利用多線程異步執行
threadPoolExecutor.execute(() -> {
orderObserver.afterPlaceOrder(placeOrderMessage);
});
}
}
}
- OrderServiceImpl
@Service
@Slf4j
public class OrderServiceImpl extends OrderSubject implements OrderService {
/**
* 實作類裡也要注入一下
*/
@Autowired
private List<OrderObserver> orderObserverList;
/**
* 下單
*/
@Override
public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
PlaceOrderResVO resVO = new PlaceOrderResVO();
//通知觀察者
this.notifyObservers(new PlaceOrderMessage());
log.info("[placeOrder] end.");
return resVO;
}
}
這樣一來,發現被觀察者又簡潔了很多,但是後來我發現,在SpringBoot項目裡,利用Spring事件驅動驅動模型(event)模型來實作,更加地簡練。
Spring Event實作釋出/訂閱模式
Spring Event對釋出/訂閱模式進行了封裝,使用起來更加簡單,還是以我們這個場景為例,看看怎麼來實作吧。
自定義事件
- PlaceOrderEvent:繼承ApplicationEvent,并重寫構造函數。ApplicationEvent是Spring提供的所有應用程式事件擴充類。
public class PlaceOrderEvent extends ApplicationEvent {
public PlaceOrderEvent(PlaceOrderEventMessage source) {
super(source);
}
}
- PlaceOrderEventMessage:事件消息,定義了事件的消息體。
@Data
public class PlaceOrderEventMessage implements Serializable {
/**
* 訂單号
*/
private String orderId;
/**
* 訂單狀态
*/
private Integer orderStatus;
/**
* 下單使用者ID
*/
private String userId;
//……
}
事件監聽者
事件監聽者,有兩種實作方式,一種是實作ApplicationListener接口,另一種是使用@EventListener注解。
事件監聽者實作
實作ApplicationListener接口
實作ApplicationListener接口,重寫onApplicationEvent方法,将類定義為Bean,這樣,一個監聽者就完成了。
- OrderLogListener
@Slf4j
@Service
public class OrderLogListener implements ApplicationListener<PlaceOrderEvent> {
@Override
public void onApplicationEvent(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] log.");
}
}
- OrderMetricsListener
@Slf4j
@Service
public class OrderMetricsListener implements ApplicationListener<PlaceOrderEvent> {
@Override
public void onApplicationEvent(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] metrics");
}
}
- OrderNotifyListener
@Slf4j
@Service
public class OrderNotifyListener implements ApplicationListener<PlaceOrderEvent> {
@Override
public void onApplicationEvent(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] notify.");
}
}
使用@EventListener注解
使用@EventListener注解就更簡單了,直接在方法上,加上@EventListener注解就行了。
-
OrderLogListener@Slf4j
@Service
public class OrderLogListener {
@EventListener
public void orderLog(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] log.");
}
}
-
OrderMetricsListener@Slf4j
@Service
public class OrderMetricsListener {
@EventListener
public void metrics(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] metrics");
}
}
-
OrderNotifyListener@Slf4j
@Service
public class OrderNotifyListener{
@EventListener
public void notify(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] notify.");
}
}
異步和自定義線程池
異步執行
異步執行也非常簡單,使用Spring的異步注解@Async就可以了。例如:
- OrderLogListener
@Slf4j
@Service
public class OrderLogListener {
@EventListener
@Async
public void orderLog(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] log.");
}
}
當然,還需要開啟異步,SpringBoot項目預設是沒有開啟異步的,我們需要手動配置開啟異步功能,很簡單,隻需要在配置類上加上@EnableAsync注解就行了,這個注解用于聲明啟用Spring的異步方法執行功能,需要和@Configuration注解一起使用,也可以直接加在啟動類上。
@SpringBootApplication
@EnableAsync
public class DailyApplication {
public static void main(String[] args) {
SpringApplication.run(DairlyLearnApplication.class, args);
}
}
自定義線程池
使用@Async的時候,一般都會自定義線程池,因為@Async的預設線程池為SimpleAsyncTaskExecutor,不是真的線程池,這個類不重用線程,預設每次調用都會建立一個新的線程。
自定義線程池有三種方式:
@Async自定義線程池
- 實作接口AsyncConfigurer
- 繼承AsyncConfigurerSupport
- 配置由自定義的TaskExecutor替代内置的任務執行器
我們來看看三種寫法:
- 實作接口AsyncConfigurer
@Configuration
@Slf4j
public class AsyncConfiguration implements AsyncConfigurer {
@Bean("fighter3AsyncExecutor")
public ThreadPoolTaskExecutor executor() {
//Spring封裝的一個線程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//随便寫的一些配置
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(30);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("fighter3AsyncExecutor-");
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor() {
return executor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
}
}
- 繼承AsyncConfigurerSupport
@Configuration
@Slf4j
public class SpringAsyncConfigurer extends AsyncConfigurerSupport {
@Bean
public ThreadPoolTaskExecutor asyncExecutor() {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
//随便寫的一些配置
threadPool.setCorePoolSize(10);
threadPool.setMaxPoolSize(30);
threadPool.setWaitForTasksToCompleteOnShutdown(true);
threadPool.setAwaitTerminationSeconds(60 * 15);
return threadPool;
}
@Override
public Executor getAsyncExecutor() {
return asyncExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
}
}
-
配置自定義的TaskExecutor@Slf4j
@Service
public class OrderLogListener {
@EventListener
@Async("asyncExecutor")
public void orderLog(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] log.");
}
}
-
配置線程池@Configuration
public class TaskPoolConfig {
@Bean(name = "asyncExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//随便寫的一些配置
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("asyncExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
- 使用@Async注解的時候,指定線程池,推薦使用這種方式,因為在項目裡,盡量做到線程池隔離,不同的任務使用不同的線程池
異步和自定義線程池這一部分隻是一些擴充,稍微占了一些篇幅,大家可不要覺得Spring Event用起來很繁瑣。
釋出事件
釋出事件也非常簡單,隻需要使用Spring 提供的ApplicationEventPublisher來釋出自定義事件。
-
OrderServiceImpl@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
/**
* 下單
*/
@Override
public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
log.info("[placeOrder] start.");
PlaceOrderResVO resVO = new PlaceOrderResVO();
//消息
PlaceOrderEventMessage eventMessage = new PlaceOrderEventMessage();
//釋出事件
applicationEventPublisher.publishEvent(new PlaceOrderEvent(eventMessage));
log.info("[placeOrder] end.");
return resVO;
}
}
在Idea裡檢視事件的監聽者也比較友善,點選下面圖中的圖示,就可以檢視監聽者。
檢視監聽者
監聽者
測試
最後,我們還是測試一下。
@Test
void placeOrder() {
PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
orderService.placeOrder(placeOrderReqVO);
}
- 執行結果
2022-11-08 10:05:14.415 INFO 22674 --- [ main] c.f.o.event.event.OrderServiceImpl : [placeOrder] start.
2022-11-08 10:05:14.424 INFO 22674 --- [ main] c.f.o.event.event.OrderServiceImpl : [placeOrder] end.
2022-11-08 10:05:14.434 INFO 22674 --- [sync-executor-3] c.f.o.event.event.OrderNotifyListener : [afterPlaceOrder] notify.
2022-11-08 10:05:14.435 INFO 22674 --- [sync-executor-2] c.f.o.event.event.OrderMetricsListener : [afterPlaceOrder] metrics
2022-11-08 10:05:14.436 INFO 22674 --- [sync-executor-1] c.f.o.event.event.OrderLogListener : [afterPlaceOrder] log.
可以看到,異步執行,而且用到了我們自定義的線程池。
小結
這篇文章裡,從最開始自己實作的觀察者模式,再到利用Spring簡化的觀察者模式,再到使用Spring Event實作釋出/訂閱模式,可以看到,Spring Event用起來還是比較簡單的。除此之外,還有Guava EventBus這樣的事件驅動實作,大家更習慣使用哪種呢?
小結
這篇文章裡,從最開始自己實作的觀察者模式,再到利用Spring簡化的觀察者模式,再到使用Spring Event實作釋出/訂閱模式,可以看到,Spring Event用起來還是比較簡單的。除此之外,還有Guava EventBus這樣的事件驅動實作,大家更習慣使用哪種呢?
原文:https://mp.weixin.qq.com/s/6l0LozzdPz4Pv7dGpckqNg
如果感覺本文對你有幫助,點贊關注支援一下