天天看點

Guava學習筆記:EventBus(轉)

EventBus與MQ的關系:

兩者沒有關系,隻是應用場景有些類似.

一個是觀察者模式,一個是消息中間件

再具體點就是eventbus的消息是不能持久化的,拓機就丢掉,是不能叢集的

eventbus可同步可異步,MQ隻有異步

eventbus可以定義多個subscriber,然後指定順序執行

eventbus類似spring的applicationevent

EventBus是Guava的事件處理機制,是設計模式中的觀察者模式(生産/消費者程式設計模型)的優雅實作。

對于事件監聽和釋出訂閱模式,EventBus是一個非常優雅和簡單解決方案,我們不用建立複雜的類和接口層次結構。

  Observer模式是比較常用的設計模式之一,雖然有時候在具體代碼裡,它不一定叫這個名字,比如改頭換面叫個Listener,但模式就是這個模式。手工實作一個Observer也不是多複雜的一件事,隻是因為這個設計模式實在太常用了,Java就把它放到了JDK裡面:Observable和Observer,從JDK 1.0裡,它們就一直在那裡。從某種程度上說,它簡化了Observer模式的開發,至少我們不用再手工維護自己的Observer清單了。不過,如前所述,JDK裡的Observer從1.0就在那裡了,直到Java 7,它都沒有什麼改變,就連通知的參數還是Object類型。要知道,Java 5就已經泛型了。Java 5是一次大規模的文法調整,許多程式庫從那開始重新設計了API,使其更簡潔易用。當然,那些不做應對的程式庫,多半也就過時了。這也就是這裡要讨論知識更新的原因所在。今天,對于普通的應用,如果要使用Observer模式該如何做呢?答案是Guava的EventBus。

  EventBus基本用法:

  使用Guava之後, 如果要訂閱消息, 就不用再繼承指定的接口, 隻需要在指定的方法上加上@Subscribe注解即可。代碼如下:

  消息封裝類:

public class TestEvent {
    private final int message;
    public TestEvent(int message) {        
        this.message = message;
        System.out.println("event message:"+message);
    }
    public int getMessage() {
        return message;
    }
}      

 消息接受類:

public class EventListener {
    public int lastMessage = 0;

    @Subscribe
    public void listen(TestEvent event) {
        lastMessage = event.getMessage();
        System.out.println("Message:"+lastMessage);
    }

    public int getLastMessage() {      
        return lastMessage;
    }
}      

測試類及輸出結果:

public class TestEventBus {
    @Test
    public void testReceiveEvent() throws Exception {

        EventBus eventBus = new EventBus("test");
        EventListener listener = new EventListener();

        eventBus.register(listener);

        eventBus.post(new TestEvent(200));
        eventBus.post(new TestEvent(300));
        eventBus.post(new TestEvent(400));

        System.out.println("LastMessage:"+listener.getLastMessage());
        ;
    }
}

//輸出資訊
event message:200
Message:200
event message:300
Message:300
event message:400
Message:400
LastMessage:400      

 MultiListener的使用:

  隻需要在要訂閱消息的方法上加上@Subscribe注解即可實作對多個消息的訂閱,代碼如下:

public class MultipleListener {
    public Integer lastInteger;  
    public Long lastLong;  
   
    @Subscribe  
    public void listenInteger(Integer event) {  
        lastInteger = event; 
        System.out.println("event Integer:"+lastInteger);
    }  
   
    @Subscribe  
    public void listenLong(Long event) {  
        lastLong = event; 
        System.out.println("event Long:"+lastLong);
    }  
   
    public Integer getLastInteger() {  
        return lastInteger;  
    }  
   
    public Long getLastLong() {  
        return lastLong;  
    }  
}      

測試類:

public class TestMultipleEvents {
    @Test  
    public void testMultipleEvents() throws Exception {  
       
        EventBus eventBus = new EventBus("test");  
        MultipleListener multiListener = new MultipleListener();  
       
        eventBus.register(multiListener);  
       
        eventBus.post(new Integer(100));
        eventBus.post(new Integer(200));  
        eventBus.post(new Integer(300));  
        eventBus.post(new Long(800)); 
        eventBus.post(new Long(800990));  
        eventBus.post(new Long(800882934));  
       
        System.out.println("LastInteger:"+multiListener.getLastInteger());
        System.out.println("LastLong:"+multiListener.getLastLong());
    }   
}

//輸出資訊
event Integer:100
event Integer:200
event Integer:300
event Long:800
event Long:800990
event Long:800882934
LastInteger:300
LastLong:800882934      

Dead Event:

  如果EventBus發送的消息都不是訂閱者關心的稱之為Dead Event。執行個體如下:

public class DeadEventListener {
    boolean notDelivered = false;  
       
    @Subscribe  
    public void listen(DeadEvent event) {  
        
        notDelivered = true;  
    }  
   
    public boolean isNotDelivered() {  
        return notDelivered;  
    }  
}      

  測試類:

public class TestDeadEventListeners {
    @Test  
    public void testDeadEventListeners() throws Exception {  
       
        EventBus eventBus = new EventBus("test");               
        DeadEventListener deadEventListener = new DeadEventListener();  
        eventBus.register(deadEventListener);  

        eventBus.post(new TestEvent(200));         
        eventBus.post(new TestEvent(300));        
       
        System.out.println("deadEvent:"+deadEventListener.isNotDelivered());

    }  
}

//輸出資訊
event message:200
event message:300
deadEvent:true      

 說明:如果沒有消息訂閱者監聽消息, EventBus将發送DeadEvent消息,這時我們可以通過log的方式來記錄這種狀态。

  Event的繼承:

  如果Listener A監聽Event A, 而Event A有一個子類Event B, 此時Listener A将同時接收Event A和B消息,執行個體如下:

  Listener 類:

public class NumberListener {  
       
    private Number lastMessage;  
   
    @Subscribe  
    public void listen(Number integer) {  
        lastMessage = integer; 
        System.out.println("Message:"+lastMessage);
    }  
   
    public Number getLastMessage() {  
        return lastMessage;  
    }  
}  

public class IntegerListener {  
       
    private Integer lastMessage;  
   
    @Subscribe  
    public void listen(Integer integer) {  
        lastMessage = integer; 
        System.out.println("Message:"+lastMessage);
    }  
   
    public Integer getLastMessage() {  
        return lastMessage;  
    }  
}      

 測試類:

public class TestEventsFromSubclass {
    @Test  
    public void testEventsFromSubclass() throws Exception {  
       
        EventBus eventBus = new EventBus("test");  
        IntegerListener integerListener = new IntegerListener();  
        NumberListener numberListener = new NumberListener();  
        eventBus.register(integerListener);  
        eventBus.register(numberListener);  
       
        eventBus.post(new Integer(100));  
       
        System.out.println("integerListener message:"+integerListener.getLastMessage());
        System.out.println("numberListener message:"+numberListener.getLastMessage());
              
        eventBus.post(new Long(200L));  
       
        System.out.println("integerListener message:"+integerListener.getLastMessage());
        System.out.println("numberListener message:"+numberListener.getLastMessage());        
    }  
}

//輸出類
Message:100
Message:100
integerListener message:100
numberListener message:100
Message:200
integerListener message:100
numberListener message:200      

 說明:在這個方法中,我們看到第一個事件(新的整數(100))是收到兩個聽衆,但第二個(新長(200 l))隻能到達NumberListener作為整數一不是建立這種類型的事件。可以使用此功能來建立更通用的監聽器監聽一個廣泛的事件和更詳細的具體的特殊的事件。

  一個綜合執行個體:

public class UserThread extends Thread {
    private Socket connection;
    private EventBus channel;
    private BufferedReader in;
    private PrintWriter out;

    public UserThread(Socket connection, EventBus channel) {
        this.connection = connection;
        this.channel = channel;
        try {
            in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
            out = new PrintWriter(connection.getOutputStream(), true);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Subscribe
    public void recieveMessage(String message) {//接收Publish事件的地方,就是收和發在一個class中,自己和自己玩了
        if (out != null) {
            out.println(message);
            System.out.println("recieveMessage:"+message);
        }
    }

    @Override
    public void run() {
        try {
            String input;
            while ((input = in.readLine()) != null) {
                channel.post(input);//Publish事件的地方
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        
        //reached eof
        channel.unregister(this);
        try {
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        in = null;
        out = null;
    }
}      
mport java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

import com.google.common.eventbus.EventBus;

public class EventBusChat {
    public static void main(String[] args) {
        EventBus channel = new EventBus();
        ServerSocket socket;
        try {
            socket = new ServerSocket(4444);
            while (true) {
                Socket connection = socket.accept();
                UserThread newUser = new UserThread(connection, channel);
                channel.register(newUser);
                newUser.start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}      

說明:用telnet指令登入:telnet 127.0.0.1 4444 ,如果你連接配接多個執行個體你會看到任何消息發送被傳送到其他執行個體。

EventBusExplained

Colin Decker edited this page on 25 Jul 2015 · ​​5 revisions​​

55

  • ​​Introduction​​
  • Basic Utilities
  • ​​Using/avoiding null​​
  • ​​Optional​​
  • ​​Preconditions​​
  • ​​Ordering​​
  • ​​Creation​​
  • ​​Chaining​​
  • ​​Application​​
  • ​​Object methods​​
  • ​​equals​​
  • ​​hashCode​​
  • ​​toString​​
  • ​​compare/compareTo​​
  • ​​Throwables​​
  • Collections
  • ​​Immutable collections​​
  • ​​New collection types​​
  • ​​Multiset​​
  • ​​Multimap​​
  • ​​BiMap​​
  • ​​Table​​
  • ​​ClassToInstanceMap​​
  • ​​RangeSet​​
  • ​​Utility Classes​​
  • ​​Iterables​​
  • ​​Lists​​
  • ​​Sets​​
  • ​​Maps​​
  • ​​Multisets​​
  • ​​Multimaps​​
  • ​​Tables​​
  • ​​Extension Utilities​​
  • ​​Forwarding Decorators​​
  • ​​PeekingIterator​​
  • ​​AbstractIterator​​
  • ​​Caches​​
  • ​​Applicability​​
  • ​​Population​​
  • ​​Eviction​​
  • ​​Removal Listeners​​
  • ​​Refresh​​
  • ​​Timed Eviction​​
  • ​​Size Caps​​
  • ​​Garbage Collection​​
  • ​​Explicit Removals​​
  • ​​Features​​
  • ​​Statistics​​
  • ​​Interruption​​
  • ​​Functional Idioms​​
  • ​​Obtaining​​
  • ​​Using Predicates​​
  • ​​Using Functions​​
  • Concurrency
  • ​​ListenableFuture​​
  • ​​Service​​
  • ​​Using​​
  • ​​Implementations​​
  • ​​Strings​​
  • ​​Joiner​​
  • ​​Splitter​​
  • ​​CharMatcher​​
  • ​​Charsets​​
  • Networking
  • ​​InternetDomainName​​
  • ​​Primitives​​
  • ​​Primitive arrays​​
  • ​​General utilities​​
  • ​​Byte conversion​​
  • ​​Unsigned support​​
  • ​​Ranges​​
  • ​​Building​​
  • ​​Operations​​
  • ​​Discrete Domains​​
  • ​​I/O​​
  • ​​Closing Resources​​
  • ​​Hashing​​
  • ​​BloomFilter​​
  • ​​EventBus​​
  • ​​Math​​
  • ​​Integral​​
  • ​​Overflow Checking​​
  • ​​Floating Point​​
  • ​​Reflection​​
  • ​​TypeToken​​
  • ​​Invokable​​
  • ​​Dynamic Proxies​​
  • ​​ClassPath​​
  • ​​Releases​​
  • ​​Release 19​​
  • ​​Release 18​​
  • ​​Release 17​​
  • ​​Release 16​​
  • ​​Release 15​​
  • ​​Release 14​​
  • ​​Release 13​​
  • ​​Release 12​​
  • ​​Release 11​​
  • ​​Release 10​​
  • Tips
  • ​​Philosophy​​
  • ​​Building with Guava​​
  • ​​Shrinking JARs with ProGuard​​
  • ​​Translating from Apache Commons​​
  • ​​Guava and Compatibility​​
  • ​​Idea Graveyard​​
  • ​​Friends of Guava​​
  • ​​How to Contribute​​
  • ​​Glossary​​
  • ​​Mailing List​​
  • ​​Stack Overflow​​
  • ​​Footprint of JDK/Guava data structures​​

Clone this wiki locally

​​ Clone in Desktop​​

​EventBus​

​ allows publish-subscribe-style communication between components without requiring the components to explicitly register with one another (and thus be aware of each other). It is designed exclusively to replace traditional Java in-process event distribution using explicit registration. It is not a general-purpose publish-subscribe system, nor is it intended for interprocess communication.

Example

// Class is typically registered by the container.
class EventBusChangeRecorder {
  @Subscribe public void recordCustomerChange(ChangeEvent e) {
    recordChange(e.getChange());
  }
}
// somewhere during initialization
eventBus.register(new EventBusChangeRecorder());
// much later
public void changeCustomer() {
  ChangeEvent event = getChangeEvent();
  eventBus.post(event);
}      

One-Minute Guide

Converting an existing ​

​EventListener​

​​-based system to use the ​

​EventBus​

​ is easy.

For Listeners

To listen for a specific flavor of event (say, a ​

​CustomerChangeEvent​

​)...

  • ...in traditional Java events: implement an interface defined with the event -- such as​

    ​CustomerChangeEventListener​

    ​.
  • ...with​

    ​EventBus​

    ​​: create a method that accepts​

    ​CustomerChangeEvent​

    ​​ as its sole argument, and mark it with the​​@Subscribe​​ annotation.

To register your listener methods with the event producers...

  • ...in traditional Java events: pass your object to each producer's​

    ​registerCustomerChangeEventListener​

    ​ method. These methods are rarely defined in common interfaces, so in addition to knowing every possible producer, you must also know its type.
  • ...with​

    ​EventBus​

    ​​: pass your object to the​​EventBus.register(Object)​​​ method on an​

    ​EventBus​

    ​​. You'll need to make sure that your object shares an​

    ​EventBus​

    ​ instance with the event producers.

To listen for a common event supertype (such as ​

​EventObject​

​​ or ​

​Object​

​)...

  • ...in traditional Java events: not easy.
  • ...with​

    ​EventBus​

    ​​: events are automatically dispatched to listeners of any supertype, allowing listeners for interface types or "wildcard listeners" for​

    ​Object​

    ​.

To listen for and detect events that were dispatched without listeners...

  • ...in traditional Java events: add code to each event-dispatching method (perhaps using AOP).
  • ...with​

    ​EventBus​

    ​​: subscribe to​​DeadEvent​​​. The​

    ​EventBus​

    ​ will notify you of any events that were posted but not delivered. (Handy for debugging.)

For Producers

To keep track of listeners to your events...

  • ...in traditional Java events: write code to manage a list of listeners to your object, including synchronization, or use a utility class like​

    ​EventListenerList​

    ​.
  • ...with​

    ​EventBus​

    ​​:​

    ​EventBus​

    ​ does this for you.

To dispatch an event to listeners...

  • ...in traditional Java events: write a method to dispatch events to each event listener, including error isolation and (if desired) asynchronicity.
  • ...with​

    ​EventBus​

    ​​: pass the event object to an​

    ​EventBus​

    ​​'s​​EventBus.post(Object)​​ method.

Glossary

The ​

​EventBus​

​ system and code use the following terms to discuss event distribution:

Event Any object that may be posted to a bus.
Subscribing The act of registering a listener with an ​

​EventBus​

​, so that its handler methodswill receive events.
Listener An object that wishes to receive events, by exposing handler methods.
Handler method A public method that the ​

​EventBus​

​ should use to deliver posted events. Handler methods are marked by the ​​@Subscribe​​ annotation.
Posting an event Making the event available to any listeners through the ​

​EventBus​

​.

FAQ

Why must I create my own Event Bus, rather than using a singleton?

​EventBus​

​​ doesn't specify how you use it; there's nothing stopping your application from having separate ​

​EventBus​

​​ instances for each component, or using separate instances to separate events by context or topic. This also makes it trivial to set up and tear down ​

​EventBus​

​ objects in your tests.

Of course, if you'd like to have a process-wide ​

​EventBus​

​​ singleton, there's nothing stopping you from doing it that way. Simply have your container (such as Guice) create the ​

​EventBus​

​ as a singleton at global scope (or stash it in a static field, if you're into that sort of thing).

In short, ​

​EventBus​

​ is not a singleton because we'd rather not make that decision for you. Use it how you like.

Can I unregister a listener from the Event Bus?

Yes, using ​

​EventBus.unregister​

​, but we find this is needed only rarely:

  • Most listeners are registered on startup or lazy initialization, and persist for the life of the application.
  • Scope-specific​

    ​EventBus​

    ​ instances can handle temporary event distribution (e.g. distributing events among request-scoped objects)
  • For testing,​

    ​EventBus​

    ​ instances can be easily created and thrown away, removing the need for explicit unregistration.

Why use an annotation to mark handler methods, rather than requiring the listener to implement an interface?

We feel that the Event Bus's ​

​@Subscribe​

​ annotation conveys your intentions just as explicitly as implementing an interface (or perhaps more so), while leaving you free to place event handler methods wherever you wish and give them intention-revealing names.

Traditional Java Events use a listener interface which typically sports only a handful of methods -- typically one. This has a number of disadvantages:

  • Any one class can only implement a single response to a given event.
  • Listener interface methods may conflict.
  • The method must be named after the event (e.g.​

    ​handleChangeEvent​

    ​​), rather than its purpose (e.g.​

    ​recordChangeInJournal​

    ​).
  • Each event usually has its own interface, without a common parent interface for a family of events (e.g. all UI events).

The difficulties in implementing this cleanly has given rise to a pattern, particularly common in Swing apps, of using tiny anonymous classes to implement event listener interfaces.

Compare these two cases:

class ChangeRecorder {
     void setCustomer(Customer cust) {
       cust.addChangeListener(new ChangeListener() {
         public void customerChanged(ChangeEvent e) {
           recordChange(e.getChange());
         }
       };
     }
   }      

versus

// Class is typically registered by the container.
   class EventBusChangeRecorder {
     @Subscribe public void recordCustomerChange(ChangeEvent e) {
       recordChange(e.getChange());
     }
   }      
The intent is actually clearer in the second case: there's less noise code, and the event handler has a clear and meaningful name.

What about a generic ​

​Handler<T>​

​ interface?

Some have proposed a generic ​

​Handler<T>​

​​ interface for ​

​EventBus​

​ listeners. This runs into issues with Java's use of type erasure, not to mention problems in usability.

Let's say the interface looked something like the following:

interface Handler<T> {
  void handleEvent(T event);
}      
Due to erasure, no single class can implement a generic interface more than once with different type parameters. This is a giant step backwards from traditional Java Events, where even if​

​actionPerformed​

​​ and ​

​keyPressed​

​ aren't very meaningful names, at least you can implement both methods!

Doesn't ​

​EventBus​

​ destroy static typing and eliminate automated refactoring support?

Some have freaked out about ​

​EventBus​

​​'s ​

​register(Object)​

​​ and ​

​post(Object)​

​​ methods' use of the ​

​Object​

​ type.

​Object​

​​ is used here for a good reason: the Event Bus library places no restrictions on the types of either your event listeners (as in ​

​register(Object)​

​​) or the events themselves (in ​

​post(Object)​

​).

Event handler methods, on the other hand, must explicitly declare their argument type -- the type of event desired (or one of its supertypes). Thus, searching for references to an event class will instantly find all handler methods for that event, and renaming the type will affect all handler methods within view of your IDE (and any code that creates the event).

It's true that you can rename your ​

​@Subscribed​

​ event handler methods at will; Event Bus will not stop this or do anything to propagate the rename because, to Event Bus, the names of your handler methods are irrelevant. Test code that calls the methods directly, of course, will be affected by your renaming -- but that's what your refactoring tools are for. We see this as a feature, not a bug: being able to rename your handler methods at will lets you make their meaning clearer.

What happens if I ​

​register​

​ a listener without any handler methods?

Nothing at all.

The Event Bus was designed to integrate with containers and module systems, with Guice as the prototypical example. In these cases, it's convenient to have the container/factory/environment passevery created object to an ​

​EventBus​

​​'s ​

​register(Object)​

​ method.

This way, any object created by the container/factory/environment can hook into the system's event model simply by exposing handler methods.

What Event Bus problems can be detected at compile time?

Any problem that can be unambiguously detected by Java's type system. For example, defining a handler method for a nonexistent event type.

What Event Bus problems can be detected immediately at registration?

Immediately upon invoking ​

​register(Object)​

​ , the listener being registered is checked for the well-formedness of its handler methods. Specifically, any methods marked with ​

​@Subscribe​

​ must take only a single argument.

Any violations of this rule will cause an ​

​IllegalArgumentException​

​ to be thrown.

(This check could be moved to compile-time using APT, a solution we're researching.)

What ​

​EventBus​

​ problems may only be detected later, at runtime?

If a component posts events with no registered listeners, it may indicate an error (typically an indication that you missed a ​

​@Subscribe​

​ annotation, or that the listening component is not loaded).

(Note that this is not necessarily indicative of a problem. There are many cases where an application will deliberately ignore a posted event, particularly if the event is coming from code you don't control.)

To handle such events, register a handler method for the ​

​DeadEvent​

​​ class. Whenever ​

​EventBus​

​​receives an event with no registered handlers, it will turn it into a ​

​DeadEvent​

​ and pass it your way -- allowing you to log it or otherwise recover.

How do I test event listeners and their handler methods?

Because handler methods on your listener classes are normal methods, you can simply call them from your test code to simulate the ​

​EventBus​

​.

Why can't I do with ​

​EventBus​

​?

​EventBus​

​ is designed to deal with a large class of use cases really, really well. We prefer hitting the nail on the head for most use cases to doing decently on all use cases.

Additionally, making ​

​EventBus​

​ extensible -- and making it useful and productive to extend, while stillallowing ourselves to make additions to the core ​

​EventBus​

​ API that don't conflict with any of your extensions -- is an extremely difficult problem.

If you really, really need magic thing X, that ​

​EventBus​

​ can't currently provide, you should file an issue, and then design your own alternative.

​​https://github.com/google/guava/wiki/EventBusExplained#for-listeners​​