1.訂閱者注冊中心
1.1 訂閱者存放結構
線程安全的Map,key訂閱者訂閱的事件class,value為接收此事件對應的方法
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
1.2 訂閱者注冊
/**注冊訂閱者 所有的訂閱方法*/
void register(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
/**事件的資料類型*/
Class<?> eventType = entry.getKey();
/**接收此事件的訂閱者*/
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
1.3訂閱者抽象類
class Subscriber {
/**支援建立線程安全的方法*/
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}
/** The event bus this subscriber belongs to. */
/**訂閱者所屬的 事件總線*/
@Weak private EventBus bus;
/** The object with the subscriber method. */
@VisibleForTesting final Object target;
/** Subscriber method. */
/**訂閱者的方法*/
private final Method method;
...
/**分發事件給自己進行執行相關方法*/
//event為訂閱者訂閱方法參數
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
//調用訂閱者的方法
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
1.4 調用訂閱者的方法
/**
* Invokes the subscriber method. This method can be overridden to make the invocation
* synchronized.
*
* 調用調用者的方法
*/
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
/**java方法調用*/
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
2.分發器(Dispather)
分發器的主要作用是把産生的事件分發給訂閱者;
2.1 沒有中間隊列的分發器
把事件直接交給分發器,不适用隊列
private static final class ImmediateDispatcher extends Dispatcher {
private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher();
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
subscribers.next().dispatchEvent(event);
}
}
}
}
2.2 使用全局隊列的分發器
不能保證事件的有序性
/** Implementation of a {@link #legacyAsync()} dispatcher. */
/**
* 傳統的異步分發實作
* 所有的事件都放到一個全局的隊列裡
*/
private static final class LegacyAsyncDispatcher extends Dispatcher {
/** Global event queue. */
/**全局的事件隊列*/
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
Queues.newConcurrentLinkedQueue();
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
private static final class EventWithSubscriber {
private final Object event;
private final Subscriber subscriber;
private EventWithSubscriber(Object event, Subscriber subscriber) {
this.event = event;
this.subscriber = subscriber;
}
}
}
2.3 每個線程隊列分發器實作
每個線程有一個自己的隊列,線程把自己的産生的事件存放到跟自己相關的隊列中。
同一個線程隻能進入dipatcher方法一次,保證消息的順序性。
/**
* 每個線程有個對應的隊列的分發器
* 同一個線程時互斥的,隻能同一個線程進入dispatcher進入。
*/
private static final class PerThreadQueuedDispatcher extends Dispatcher {
// This dispatcher matches the original dispatch behavior of EventBus.
/** Per-thread queue of events to dispatch. */
/**
* 每個線程對應的事件隊列
*/
private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {
@Override
protected Queue<Event> initialValue() {
return Queues.newArrayDeque();
}
};
/** Per-thread dispatch state, used to avoid reentrant event dispatching. */
/**
* 每個線程分發狀态,使用它為了避免重新進入分發事件
*/
private final ThreadLocal<Boolean> dispatching =
new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
/**标記線程重複進入*/
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
private static final class Event {
private final Object event;
private final Iterator<Subscriber> subscribers;
private Event(Object event, Iterator<Subscriber> subscribers) {
this.event = event;
this.subscribers = subscribers;
}
}
}
3.事件線實作 EventBus
EventBus 用來注冊訂閱者、解除安裝訂閱者、送出訂閱事件等。
public class EventBus {
private static final Logger logger = Logger.getLogger(EventBus.class.getName());
/**唯一辨別符*/
private final String identifier;
/**線程執行器*/
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;
/**訂閱者注冊中心*/
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
private final Dispatcher dispatcher;
....
}
4. 總結
- 事件靈活,可以是任何java類
- 使用注解的方式訂閱事件
- 父類也可以訂閱事件