天天看點

Google EventBus源碼分析

1.訂閱者注冊中心

1.1 訂閱者存放結構

線程安全的Map,key訂閱者訂閱的事件class,value為接收此事件對應的方法

private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
      Maps.newConcurrentMap();
           

1.2 訂閱者注冊

/**注冊訂閱者 所有的訂閱方法*/
  void register(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      /**事件的資料類型*/
      Class<?> eventType = entry.getKey();

      /**接收此事件的訂閱者*/
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }
           

1.3訂閱者抽象類

class Subscriber {

  /**支援建立線程安全的方法*/
  static Subscriber create(EventBus bus, Object listener, Method method) {
    return isDeclaredThreadSafe(method)
        ? new Subscriber(bus, listener, method)
        : new SynchronizedSubscriber(bus, listener, method);
  }

  /** The event bus this subscriber belongs to. */
  /**訂閱者所屬的 事件總線*/
  @Weak private EventBus bus;

  /** The object with the subscriber method. */
  @VisibleForTesting final Object target;

  /** Subscriber method. */
  /**訂閱者的方法*/
  private final Method method;

  ...

  /**分發事件給自己進行執行相關方法*/
  //event為訂閱者訂閱方法參數
  final void dispatchEvent(final Object event) {
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
               //調用訂閱者的方法
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }
           
1.4 調用訂閱者的方法
/**
   * Invokes the subscriber method. This method can be overridden to make the invocation
   * synchronized.
   *
   * 調用調用者的方法
   */
  @VisibleForTesting
  void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
      /**java方法調用*/
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throw e;
    }
  }
           

2.分發器(Dispather)

分發器的主要作用是把産生的事件分發給訂閱者;

2.1 沒有中間隊列的分發器

把事件直接交給分發器,不适用隊列

private static final class ImmediateDispatcher extends Dispatcher {
    private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher();

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      while (subscribers.hasNext()) {
        subscribers.next().dispatchEvent(event);
      }
    }
  }
}
           

2.2 使用全局隊列的分發器

不能保證事件的有序性

/** Implementation of a {@link #legacyAsync()} dispatcher. */
  /**
   * 傳統的異步分發實作
   * 所有的事件都放到一個全局的隊列裡
   */
  private static final class LegacyAsyncDispatcher extends Dispatcher {

    /** Global event queue. */
    /**全局的事件隊列*/
    private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
        Queues.newConcurrentLinkedQueue();

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      while (subscribers.hasNext()) {
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }

      EventWithSubscriber e;
      while ((e = queue.poll()) != null) {
        e.subscriber.dispatchEvent(e.event);
      }
    }

    private static final class EventWithSubscriber {
      private final Object event;
      private final Subscriber subscriber;

      private EventWithSubscriber(Object event, Subscriber subscriber) {
        this.event = event;
        this.subscriber = subscriber;
      }
    }
  }
           

2.3 每個線程隊列分發器實作

每個線程有一個自己的隊列,線程把自己的産生的事件存放到跟自己相關的隊列中。

同一個線程隻能進入dipatcher方法一次,保證消息的順序性。

/**
   * 每個線程有個對應的隊列的分發器
   * 同一個線程時互斥的,隻能同一個線程進入dispatcher進入。
   */
  private static final class PerThreadQueuedDispatcher extends Dispatcher {

    // This dispatcher matches the original dispatch behavior of EventBus.

    /** Per-thread queue of events to dispatch. */
    /**
     * 每個線程對應的事件隊列
     */
    private final ThreadLocal<Queue<Event>> queue =
        new ThreadLocal<Queue<Event>>() {
          @Override
          protected Queue<Event> initialValue() {
            return Queues.newArrayDeque();
          }
        };

    /** Per-thread dispatch state, used to avoid reentrant event dispatching. */
    /**
     * 每個線程分發狀态,使用它為了避免重新進入分發事件
     */
    private final ThreadLocal<Boolean> dispatching =
        new ThreadLocal<Boolean>() {
          @Override
          protected Boolean initialValue() {
            return false;
          }
        };

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      checkNotNull(subscribers);
      Queue<Event> queueForThread = queue.get();
      queueForThread.offer(new Event(event, subscribers));
      /**标記線程重複進入*/
      if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
          while ((nextEvent = queueForThread.poll()) != null) {
            while (nextEvent.subscribers.hasNext()) {
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          dispatching.remove();
          queue.remove();
        }
      }
    }

    private static final class Event {
      private final Object event;
      private final Iterator<Subscriber> subscribers;

      private Event(Object event, Iterator<Subscriber> subscribers) {
        this.event = event;
        this.subscribers = subscribers;
      }
    }
  }
           

3.事件線實作 EventBus

EventBus 用來注冊訂閱者、解除安裝訂閱者、送出訂閱事件等。

public class EventBus {

  private static final Logger logger = Logger.getLogger(EventBus.class.getName());

  /**唯一辨別符*/
  private final String identifier;

  /**線程執行器*/
  private final Executor executor;
  private final SubscriberExceptionHandler exceptionHandler;

  /**訂閱者注冊中心*/
  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
  private final Dispatcher dispatcher;

  ....

}
           

4. 總結

  1. 事件靈活,可以是任何java類
  2. 使用注解的方式訂閱事件
  3. 父類也可以訂閱事件

繼續閱讀