天天看點

Spring中事件監聽(通知)機制詳解與實踐

Spring中事件監聽(也有說法叫事件通知)機制,其實本質是觀察者模式的應用。當某個事件發生時,其會被廣播出去,監聽該實踐的listener就會被觸發然後執行響應的動作。該模式可良好應用于程式解耦,類似消息的釋出訂閱。

【1】事件、釋出與監聽

這個模式有三元素:事件、釋出與監聽。

① 事件

如下圖所示,事件繼承自​

​EventObject​

​​類,該類維護了事件最初發生在其上的對象-source。而我們通常自定義的事件實際應繼承自抽象類​

​ApplicationEvent​

​​。比如常見的上下文重新整理事件​

​ContextRefreshedEvent​

​​。

Spring中事件監聽(通知)機制詳解與實踐

比如我們可以自定義事件類如下:

// 定義一個事件
public class MyEvent extends ApplicationEvent {
    private String message;
    public EventDemo(Object source, String message) {
        super(source);
        this.message = message;
    }
    public String getMessage() {
        return message;
    }
}      

② 監聽

有了事件後,就需要有對應的監聽對象–其實就是觀察者。接口EventListener是一個頂級接口提供給其他監聽器接口繼承,如下所示其是一個空接口。

public interface EventListener {
}      

針對ApplicationEvent事件而言,Spring提供了ApplicationListener功能接口供使用者實作,如下所示:

@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {

   // 處理監聽到的事件
  void onApplicationEvent(E event);

  /**
   * Create a new {@code ApplicationListener} for the given payload consumer.
   */  
  static <T> ApplicationListener<PayloadApplicationEvent<T>> forPayload(Consumer<T> consumer) {
    return event -> consumer.accept(event.getPayload());
  }

}      

那麼我們自定義監聽該如何實作呢?有兩種方式:繼承自​

​ApplicationListener​

​​或者使用注解​

​@EventListener​

​。

如下所示繼承ApplicationListener:

// 定義一個事件監聽者
@Component
public class MyEventListener implements ApplicationListener<MyEvent> {
    @Override
    public void onApplicationEvent(MyEvent event) {
        // 這裡寫事件處理邏輯
    }
}      

如下所示使用注解​

​@EventListener​

​:

// 定義一個事件監聽者
@Component
public class MyEventListener  {
    @EventListener
    public void onApplicationEvent(MyEvent event) {
        // 這裡寫事件處理邏輯
    }
}      

該種方式将會被包裝為一個​

​ApplicationListenerMethodAdapter​

​​,類似如下:

Spring中事件監聽(通知)機制詳解與實踐

這是因為​

​EventListenerMethodProcessor​

​​ 處理器會解析​

​@EventListener​

​​ 注解,将其所在類封裝為一個​

​ApplicationListener​

​​(比如這裡的​

​ApplicationListenerMethodAdapter​

​),然後放入容器中。

常見的ApplicationListener樹結構示意圖

Spring中事件監聽(通知)機制詳解與實踐

③ 事件釋出

有了事件與監聽 ,那麼還需要在某個時刻将事件廣播出去觸發監聽動作。如何釋出事件呢?Spring提供了ApplicationEventPublisher接口。

@FunctionalInterface
public interface ApplicationEventPublisher {
  default void publishEvent(ApplicationEvent event) {
    publishEvent((Object) event);
  }
  void publishEvent(Object event);
}      

publishEvent方法解釋如下:

  • 将此事件通知給所有注冊到容器的并且比對的監聽器,事件可以是架構事件(例如ContextRefreshedEvent)或特定于應用程式的事件。
  • 這樣的事件釋出步驟實際是一種對于multicaster 的協調手段,其并不意味着同步、異步或者立即執行。
  • 對于那些需要長時間處理或者可能阻塞的操作,事件監聽器被鼓勵使用異步處理

該接口不需要我們去實作,實際上如下圖所示所有的容器都實作了該接口:

Spring中事件監聽(通知)機制詳解與實踐

那麼我們該如何做呢?隻需要注入​​

​ApplicationEventPublisher​

​ 然後釋出即可,如下所示:

@Autowired
ApplicationEventPublisher applicationEventPublisher;

public void publishEvent(){
    MyEvent event = new MyEvent(this);
    applicationEventPublisher.publishEvent(event);
}      

如何使用異步處理呢?使用@EnableAsync與@Async注解。

【2】事件廣播

​publishEvent​

​方法會将我們的事件通知給監聽器,這個場景叫做廣播。也就是說,将該事件廣播出去,但凡對該事件感興趣的監聽器均被通知到。spring是如何實作的呢?

這個邏輯是在​

​AbstractApplicationContext​

​的​

​publishEvent​

​中實作的。

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
  Assert.notNull(event, "Event must not be null");

  // Decorate event as an ApplicationEvent if necessary
  ApplicationEvent applicationEvent;
  if (event instanceof ApplicationEvent) {
    applicationEvent = (ApplicationEvent) event;
  }
  else {
    applicationEvent = new PayloadApplicationEvent<>(this, event);
    if (eventType == null) {
      eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
    }
  }

  // Multicast right now if possible - or lazily once the multicaster is initialized
  if (this.earlyApplicationEvents != null) {
    this.earlyApplicationEvents.add(applicationEvent);
  }
  else {
  // 擷取廣播器進行事件廣播
    getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
  }

  // Publish event via parent context as well...
  if (this.parent != null) {
    if (this.parent instanceof AbstractApplicationContext) {
      ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
    }
    else {
      this.parent.publishEvent(event);
    }
  }
}      

上述方法首先對event進行了處理,嘗試轉換為​

​ApplicationEvent​

​​或者​

​PayloadApplicationEvent​

​​,如果是​

​PayloadApplicationEvent​

​​則擷取​

​eventType​

​。

其次判斷​

​earlyApplicationEvents​

​​是否為空(也就是已經被釋出的事件),如果不為空則将目前事件放入否則擷取​

​ApplicationEventMulticaster​

​​調用其​

​multicastEvent​

​​将事件廣播出去。本文這裡擷取到的廣播器執行個體是​

​SimpleApplicationEventMulticaster​

​。

最後如果其parent不為null,則嘗試調用父類的​

​publishEvent​

​方法。

我們繼續看下​

​multicastEvent​

​方法的實作。這裡我們來到了​

​SimpleApplicationEventMulticaster​

​的​

​multicastEven​

​方法。

@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
// 解析事件類型
  ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
  // 嘗試擷取任務執行器
  Executor executor = getTaskExecutor();
  // 擷取合适的ApplicationListener,循環調用監聽器的onApplicationEvent方法
  for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
    // 判斷executor 是否不為null
    if (executor != null) {
      executor.execute(() -> invokeListener(listener, event));
    }
    // 判斷applicationStartup 
    else if (this.applicationStartup != null) {
      StartupStep invocationStep = this.applicationStartup.start("spring.event.invoke-listener");
      invokeListener(listener, event);
      invocationStep.tag("event", event::toString);
      if (eventType != null) {
        invocationStep.tag("eventType", eventType::toString);
      }
      invocationStep.tag("listener", listener::toString);
      invocationStep.end();
    }
    // 否則,直接調用listener.onApplicationEvent
    else {
      invokeListener(listener, event);
    }
  }
}      

代碼邏輯分析如下:

  • ① 擷取事件類型與TaskExecutor;
  • ② getApplicationListeners擷取合适的監聽器,也就是堆目前事件類型感興趣的;然後進行周遊
  • ③ 如果executor不為null,則交給executor去調用監聽器;
  • ④ 如果applicationStartup不為null,則會額外記錄處理步驟;
  • ⑤ 否則,使用目前主線程直接調用監聽器;

invokeListener做了什麼呢?我們繼續往下看。

// 該方法增加了錯誤處理邏輯,然後調用doInvokeListener
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
  ErrorHandler errorHandler = getErrorHandler();
  if (errorHandler != null) {
    try {
      doInvokeListener(listener, event);
    }
    catch (Throwable err) {
      errorHandler.handleError(err);
    }
  }
  else {
    doInvokeListener(listener, event);
  }
}

// doInvokeListener 直接調用listener.onApplicationEvent
@SuppressWarnings({"rawtypes", "unchecked"})
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
  try {
    listener.onApplicationEvent(event);
  }
  catch (ClassCastException ex) {
  //...
  }
}      

兩個方法流程邏輯很清晰,關鍵的問題是​

​listener.onApplicationEvent(event);​

​​是直接調用你的監聽器方法嗎?非也,要看此時的listener是什麼。比如當你使用注解​

​@EventListener​

​​聲明監聽器的時候,這裡的listener就是​

​ApplicationListenerMethodAdapter​

​執行個體。

以​

​ApplicationListenerMethodAdapter​

​​為例,其onApplicationEvent方法會調用processEvent方法最終調用​

​Object result = doInvoke(args);​

​而doInvoke最終是反射調用你的監聽器(方法)。

@Override
public void onApplicationEvent(ApplicationEvent event) {
  processEvent(event);
}
public void processEvent(ApplicationEvent event) {
  Object[] args = resolveArguments(event);
  if (shouldHandle(event, args)) {
    Object result = doInvoke(args);
    if (result != null) {
      handleResult(result);
    }
    else {
      logger.trace("No result object given - no result to handle");
    }
  }
}

@Nullable
protected Object doInvoke(Object... args) {
// 擷取包裝的bean,也就是目标bean,也就是你的監聽器
  Object bean = getTargetBean();
  // Detect package-protected NullBean instance through equals(null) check
  if (bean.equals(null)) {
    return null;
  }
  // 設定方法可以通路
  ReflectionUtils.makeAccessible(this.method);
  try {
  // 反射調用目标方法
    return this.method.invoke(bean, args);
  }
  //...一堆catch
}      
Spring中事件監聽(通知)機制詳解與實踐

【3】廣播器ApplicationEventMulticaster

這裡我們主要分析​

​ApplicationEventMulticaster​

​​接口、抽象類​

​AbstractApplicationEventMulticaster​

​​以及具體實作​

​SimpleApplicationEventMulticaster​

​。

① ApplicationEventMulticaster

該接口實作将會管理一系列ApplicationListener并釋出事件給監聽器。

我們看下該接口源碼,如下所示該接口提供了添加/移除監聽器以及廣播事件給監聽器的行為。

public interface ApplicationEventMulticaster {

   // 添加監聽器
  void addApplicationListener(ApplicationListener<?> listener);

   // 添加一個監聽器 beanName
  void addApplicationListenerBean(String listenerBeanName);

   // 從通知清單移除掉一個監聽器
  void removeApplicationListener(ApplicationListener<?> listener);


   // 從通知清單移除掉一個 監聽器 bean name
  void removeApplicationListenerBean(String listenerBeanName);

   // 移除掉該廣播器管理的所有監聽器
  void removeAllListeners();

  /**
   * Multicast the given application event to appropriate listeners.
   * <p>Consider using {@link #multicastEvent(ApplicationEvent, ResolvableType)}
   * if possible as it provides better support for generics-based events.
   * @param event the event to multicast
   */
   // 廣播事件給合适的監聽器 建議使用末尾方法其對泛型提供了更好支援
  void multicastEvent(ApplicationEvent event);

// 廣播事件給合适的監聽器,如果eventType為null,将會根據event 執行個體建構一個預設的type
  void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType);

}      

② AbstractApplicationEventMulticaster

該抽象類實作了​

​ApplicationEventMulticaster​

​接口,提供了基礎的監聽器注冊/移除以及查找能力。

其中defaultRetriever 用來管理監聽器并進行查找,而retrieverCache 則是為了更快進行查找。

private final DefaultListenerRetriever defaultRetriever = new DefaultListenerRetriever();

final Map<ListenerCacheKey, CachedListenerRetriever> retrieverCache = new ConcurrentHashMap<>(64);      

如下是其方法示意圖:

Spring中事件監聽(通知)機制詳解與實踐

其内部類​​

​DefaultListenerRetriever​

​維護了兩個常量集合用來儲存監聽器與監聽器 bean Name。Set表示集合内部元素不可重複。

public final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();

public final Set<String> applicationListenerBeans = new LinkedHashSet<>();      

抽象類并未提供廣播事件的功能,留給子類SimpleApplicationEventMulticaster實作。

③ SimpleApplicationEventMulticaster

預設情況下,該廣播器将會把事件廣播給所有的監聽器,讓監聽器自己忽略他們不感興趣的事件。預設情況下,所有監聽器将會在目前線程中會調用。這允許惡意偵聽器阻塞整個應用程式的危險,但會增加最小的開銷。指定額外的任務執行器TaskExecutor,使監聽器在不同線程中執行,例如從線程池執行,将是一個良好的方案。

​setTaskExecutor​

​​方法允許你執行個體化​

​SimpleApplicationEventMulticaster​

​時,指定額外的任務執行器。這樣監聽器将不會在目前被調用的線程中執行。

public void setTaskExecutor(@Nullable Executor taskExecutor) {
  this.taskExecutor = taskExecutor;
}      

Spring提供了兩個任務執行器供你使用:

  • 同步執行器​

    ​org.springframework.core.task.SyncTaskExecutor​

  • 異步執行器​

    ​org.springframework.core.task.SimpleAsyncTaskExecutor​

核心屬性

// 任務執行器,可以使監聽器不在主線程中執行
@Nullable
private Executor taskExecutor;

// 錯誤處理器
@Nullable
private ErrorHandler errorHandler;

// 應用啟動記錄
@Nullable
private ApplicationStartup applicationStartup;      

ApplicationStartup 這個玩意很有一起,主要是用來标記/記錄程式處理步驟。核心容器及其基礎結構元件可以使用ApplicationStartup 标記應用程式啟動期間的步驟,并收集有關執行上下文或其處理時間的資料。

事件廣播

這個其實在上面我們已經分析過了,這裡可以再看下源碼。簡單來說該方法就是解析事件類型、嘗試擷取任務執行器,然後調用父類的​

​getApplicationListeners​

​​方法擷取監聽器進行周遊循環調用​

​invokeListener(listener, event);​

​。

@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
  ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
  Executor executor = getTaskExecutor();
  for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
    if (executor != null) {
      executor.execute(() -> invokeListener(listener, event));
    }
    else if (this.applicationStartup != null) {
      StartupStep invocationStep = this.applicationStartup.start("spring.event.invoke-listener");
      invokeListener(listener, event);
      invocationStep.tag("event", event::toString);
      if (eventType != null) {
        invocationStep.tag("eventType", eventType::toString);
      }
      invocationStep.tag("listener", listener::toString);
      invocationStep.end();
    }
    else {
      invokeListener(listener, event);
    }
  }
}      

【4】廣播器與監聽器的注冊

其實這裡要說的是什麼時候廣播器被初始化?什麼時候監聽器被注冊到了容器。這個發生在spring容器的初始化過程中,确切地說是SpringMVC容器還是IOC容器要看你目前項目環境是什麼。這裡我們重點不在于這裡,我們看如下​

​AbstractApplicationContext.refresh​

​方法。

@Override
public void refresh() throws BeansException, IllegalStateException {
  synchronized (this.startupShutdownMonitor) {
    StartupStep contextRefresh = this.applicationStartup.start("spring.context.refresh");

    // Prepare this context for refreshing.
    prepareRefresh();

    // Tell the subclass to refresh the internal bean factory.
    ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();

    // Prepare the bean factory for use in this context.
    prepareBeanFactory(beanFactory);

    try {
      // Allows post-processing of the bean factory in context subclasses.
      postProcessBeanFactory(beanFactory);

      StartupStep beanPostProcess = this.applicationStartup.start("spring.context.beans.post-process");
      // Invoke factory processors registered as beans in the context.
      invokeBeanFactoryPostProcessors(beanFactory);

      // Register bean processors that intercept bean creation.
      registerBeanPostProcessors(beanFactory);
      beanPostProcess.end();

      // Initialize message source for this context.
      initMessageSource();

      // Initialize event multicaster for this context.
      initApplicationEventMulticaster();

      // Initialize other special beans in specific context subclasses.
      onRefresh();

      // Check for listener beans and register them.
      registerListeners();

      // Instantiate all remaining (non-lazy-init) singletons.
      finishBeanFactoryInitialization(beanFactory);

      // Last step: publish corresponding event.
      finishRefresh();
    }

    catch (BeansException ex) {
      if (logger.isWarnEnabled()) {
        logger.warn("Exception encountered during context initialization - " +
            "cancelling refresh attempt: " + ex);
      }

      // Destroy already created singletons to avoid dangling resources.
      destroyBeans();

      // Reset 'active' flag.
      cancelRefresh(ex);

      // Propagate exception to caller.
      throw ex;
    }

    finally {
      // Reset common introspection caches in Spring's core, since we
      // might not ever need metadata for singleton beans anymore...
      resetCommonCaches();
      contextRefresh.end();
    }
  }
}      

這裡我們主要看​

​initApplicationEventMulticaster​

​​和​

​registerListeners​

​();方法。前者就是初始化事件廣播器,後置則是注冊監聽。

① initApplicationEventMulticaster

public static final String APPLICATION_EVENT_MULTICASTER_BEAN_NAME 
= "applicationEventMulticaster";

protected void initApplicationEventMulticaster() {
  ConfigurableListableBeanFactory beanFactory = getBeanFactory();
  if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
    this.applicationEventMulticaster =
        beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
    if (logger.isTraceEnabled()) {
      logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
    }
  }
  else {
    SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
    simpleApplicationEventMulticaster.setApplicationStartup(getApplicationStartup());
    this.applicationEventMulticaster = simpleApplicationEventMulticaster;
    beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
    if (logger.isTraceEnabled()) {
      logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
          "[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
    }
  }
}      

② registerListeners

protected void registerListeners() {
  // Register statically specified listeners first.
  for (ApplicationListener<?> listener : getApplicationListeners()) {
    getApplicationEventMulticaster().addApplicationListener(listener);
  }

  // Do not initialize FactoryBeans here: We need to leave all regular beans
  // uninitialized to let post-processors apply to them!
  String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
  for (String listenerBeanName : listenerBeanNames) {
    getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
  }

  // Publish early application events now that we finally have a multicaster...
  Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
  this.earlyApplicationEvents = null;
  if (!CollectionUtils.isEmpty(earlyEventsToProcess)) {
    for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
      getApplicationEventMulticaster().multicastEvent(earlyEvent);
    }
  }
}      
  • ①​

    ​getApplicationListeners​

    ​擷取監聽器然後注冊到廣播器中;
  • ②​

    ​getBeanNamesForType​

    ​​擷取bean Name數組​

    ​String[] listenerBeanNames​

    ​ 然後注冊到廣播器中;
  • ③ 處理以前的事件,先将​

    ​earlyApplicationEvents​

    ​​ 賦予null,然後判斷​

    ​earlyEventsToProcess​

    ​ 如果不為空就廣播出去