天天看點

事件總線方案實踐liveData實作事件總線

liveData實作事件總線

目錄介紹

  • 01.EventBus使用原理
  • 02.RxBus使用原理
  • 03.為何使用liveData
  • 04.LiveDataBus的組成
  • 05.LiveDataBus原理圖
  • 06.簡單的實作案例代碼
  • 07.遇到的問題和分析思路
  • 08.使用反射解決遇到問題
  • 09.使用postValue的bug
  • 10.如何發送延遲事件消息
  • 11.如何發送輪訓延遲事件
  • 12.避免類型轉換異常問題
  • 13.如何實作生命周期感覺

00.事件開源庫

  • 架構的核心思想,就是消息的釋出和訂閱,使用訂閱者模式實作,其原理圖大概如下所示,摘自網絡。
    • 事件總線方案實踐liveData實作事件總線
  • 釋出和訂閱之間的依賴關系,其原理圖大概如下所示,摘自網絡。
    • 事件總線方案實踐liveData實作事件總線
  • 訂閱/釋出模式和觀察者模式之間有着微弱的差別,個人覺得訂閱/釋出模式是觀察者模式的一種增強版。兩者差別如下所示,摘自網絡。
    • 事件總線方案實踐liveData實作事件總線
  • 具體使用可以看demo代碼, demo開源位址

  • RxBus不是一個庫,而是一個檔案,實作隻有短短30行代碼。RxBus本身不需要過多分析,它的強大完全來自于它基于的RxJava技術。
  • 在RxJava中有個Subject類,它繼承Observable類,同時實作了Observer接口,是以Subject可以同時擔當訂閱者和被訂閱者的角色,我們使用Subject的子類PublishSubject來建立一個Subject對象(PublishSubject隻有被訂閱後才會把接收到的事件立刻發送給訂閱者),在需要接收事件的地方,訂閱該Subject對象,之後如果Subject對象接收到事件,則會發射給該訂閱者,此時Subject對象充當被訂閱者的角色。
  • 完成了訂閱,在需要發送事件的地方将事件發送給之前被訂閱的Subject對象,則此時Subject對象作為訂閱者接收事件,然後會立刻将事件轉發給訂閱該Subject對象的訂閱者,以便訂閱者處理相應事件,到這裡就完成了事件的發送與處理。
  • 最後就是取消訂閱的操作了,RxJava中,訂閱操作會傳回一個Subscription對象,以便在合适的時機取消訂閱,防止記憶體洩漏,如果一個類産生多個Subscription對象,我們可以用一個CompositeSubscription存儲起來,以進行批量的取消訂閱。

  • 為何使用liveData
    • LiveData具有的這種可觀察性和生命周期感覺的能力,使其非常适合作為Android通信總線的基礎構件。在一對多的場景中,釋出消息事件後,訂閱事件的頁面隻有在可見的時候才會處理事件邏輯。
    • 使用者不用顯示調用反注冊方法。LiveData具有生命周期感覺能力,是以LiveDataBus隻需要調用注冊回調方法,而不需要顯示的調用反注冊方法。這樣帶來的好處不僅可以編寫更少的代碼,而且可以完全杜絕其他通信總線類架構(如EventBus、RxBus)忘記調用反注冊所帶來的記憶體洩漏的風險。
  • 該liveDataBus優勢
    • 1.該LiveDataBus的實作比較簡單,支援發送普通事件,也支援發送粘性事件;
    • 2.該LiveDataBus支援發送延遲事件消息,也可以用作輪訓延遲事件(比如商城類項目某活動頁面5秒鐘刷一次接口資料),支援stop輪訓操作
    • 3.該LiveDataBus可以減小APK包的大小,由于LiveDataBus隻依賴Android官方Android Architecture Components元件的LiveData;
    • 4.該LiveDataBus具有生命周期感覺,這個是一個很大的優勢。不需要反注冊,避免了記憶體洩漏等問題;
  • 關于liveData深度解析,可以看我這篇部落格: 01.LiveData詳細分析

  • 消息: 消息可以是任何的 Object,可以定義不同類型的消息,如 Boolean、String。也可以定義自定義類型的消息。
  • 消息通道: LiveData 扮演了消息通道的角色,不同的消息通道用不同的名字區分,名字是 String 類型的,可以通過名字擷取到一個 LiveData 消息通道。
  • 消息總線: 消息總線通過單例實作,不同的消息通道存放在一個 HashMap 中。
  • 訂閱: 訂閱者通過 getChannel() 擷取消息通道,然後調用 observe() 訂閱這個通道的消息。
  • 釋出: 釋出者通過 getChannel() 擷取消息通道,然後調用 setValue() 或者 postValue() 釋出消息。

  • 為了友善了解,LiveDataBus原理圖如下所示
  • 訂閱和注冊的流程圖
    • 事件總線方案實踐liveData實作事件總線
  • 訂閱注冊原理圖
    • 事件總線方案實踐liveData實作事件總線
  • 為何用LiveDataBus替代EventBus和RxBus
    • LiveDataBus的實作極其簡單
    • LiveDataBus可以減小APK包的大小,由于LiveDataBus隻依賴Android官方Android Architecture Components元件的LiveData。
    • LiveDataBus具有生命周期感覺。

  • 我這裡先用最簡單的代碼實作liveDataBus,然後用一下,看一下會出現什麼問題,代碼如下所示:
    public final class LiveDataBus1 {
    
        private final Map<String, MutableLiveData<Object>> bus;
    
        private LiveDataBus1() {
            bus = new HashMap<>();
        }
    
        private static class SingletonHolder {
            private static final LiveDataBus1 DATA_BUS = new LiveDataBus1();
        }
    
        public static LiveDataBus1 get() {
            return SingletonHolder.DATA_BUS;
        }
    
        public <T> MutableLiveData<T> getChannel(String target, Class<T> type) {
            if (!bus.containsKey(target)) {
                bus.put(target, new MutableLiveData<>());
            }
            return (MutableLiveData<T>) bus.get(target);
        }
    
        public MutableLiveData<Object> getChannel(String target) {
            return getChannel(target, Object.class);
        }
    }           
  • 那麼如何發送消息和接收消息呢,注意兩者的key需要保持一緻,否則無法接收?具體代碼如下所示:
    //發送消息
    LiveDataBus1.get().getChannel("yc_bus").setValue(text);
    //接收消息
    LiveDataBus1.get().getChannel("yc_bus", String.class)
            .observe(this, new Observer<String>() {
                @Override
                public void onChanged(@Nullable String newText) {
                    // 更新資料
                    tvText.setText(newText);
                }
            });           

  • 遇到的問題:
    • 1.LiveData 一時使用一時爽,爽完了之後我們發現這個簡易的 LiveDataBus 存在一個問題,就是訂閱者會收到訂閱之前釋出的消息,類似于粘性消息。對于一個消息總線來說,這是不可接受的。
    • 2.多次調用了 postValue() 方法,隻有最後次調用的值會得到更新。也就是此方法是有可能會丢失事件!

7.1 先看第一個問題

  • 然後看一下LiveData的訂閱方法observe源碼
    • 看下面代碼可知道,LiveData 内部會将傳入參數包裝成 wrapper ,然後存在一個 Map 中,最後通過 LifeCycle 元件添加觀察者。
    // 注釋隻能在主線程中調用該方法
    @MainThread
    public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
        // 目前綁定的元件(activity or fragment)狀态為DESTROYED的時候, 則會忽視目前的訂閱請求
        if (owner.getLifecycle().getCurrentState() == DESTROYED) {
            // ignore
            return;
        }
        // 轉為帶生命周期感覺的觀察者包裝類
        LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
        ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
        // 對應觀察者隻能與一個owner綁定,否則抛出異常
        if (existing != null && !existing.isAttachedTo(owner)) {
            throw new IllegalArgumentException("Cannot add the same observer"
                    + " with different lifecycles");
        }
        if (existing != null) {
            return;
        }
        // lifecycle注冊
        owner.getLifecycle().addObserver(wrapper);
    }           
  • 緊接着,來看一下LiveData的更新資料方法
    • LiveData 更新資料方式有兩個,一個是 setValue() 另一個是 postValue(),這兩個方法的差別是,postValue() 在内部會抛到主線程去執行更新資料,是以适合在子線程中使用;而 setValue() 則是直接更新資料。
    @MainThread
    protected void setValue(T value) {
        assertMainThread("setValue");
        // 這裡的 mVersion,它本問題關鍵,每次更新資料都會自增,預設值是 -1。
        mVersion++;
        mData = value;
        dispatchingValue(null);
    }           
    • 跟進下 dispatchingValue() 方法,注意,這裡需要重點看considerNotify代碼:
    private void dispatchingValue(@Nullable ObserverWrapper initiator) {
        // mDispatchingValue的判斷主要是為了解決并發調用dispatchingValue的情況
        // 當對應資料的觀察者在執行的過程中, 如有新的資料變更, 則不會再次通知到觀察者。是以觀察者内的執行不應進行耗時工作
        if (mDispatchingValue) {
            mDispatchInvalidated = true;
            return;
        }
        mDispatchingValue = true;
        do {
            mDispatchInvalidated = false;
            if (initiator != null) {
                // 等下重點看這裡的代碼
                considerNotify(initiator);
                initiator = null;
            } else {
                for (Iterator<Map.Entry<Observer<T>, ObserverWrapper>> iterator =
                        mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
                    // 等下重點看這裡的代碼
                    considerNotify(iterator.next().getValue());
                    if (mDispatchInvalidated) {
                        break;
                    }
                }
            }
        } while (mDispatchInvalidated);
        mDispatchingValue = false;
    }           
    • 然後看一下considerNotify() 方法做了什麼,代碼如下所示,這裡有道詞典翻譯下注釋:
    private void considerNotify(ObserverWrapper observer) {
        if (!observer.mActive) {
            return;
        }
        // 檢查最新的狀态b4排程。也許它改變了狀态,但我們還沒有得到事件。
        // 我們還是先檢查觀察者。活動,以保持它作為活動的入口。
        // 是以,即使觀察者移動到一個活動狀态,如果我們沒有收到那個事件,我們最好不要通知一個更可預測的通知順序。
        if (!observer.shouldBeActive()) {
            observer.activeStateChanged(false);
            return;
        }
        if (observer.mLastVersion >= mVersion) {
            return;
        }
        observer.mLastVersion = mVersion;
        //noinspection unchecked
        observer.mObserver.onChanged((T) mData);
    }           
  • 為何訂閱者會馬上收到訂閱之前釋出的最新消息?
    • 如果 ObserverWrapper 的 mLastVersion 小于 LiveData 的 mVersion,那麼就會執行的 onChange() 方法去通知觀察者資料已更新。而 ObserverWrapper.mLastVersion 的預設值是 -1, LiveData 隻要更新過資料,mVersion 就肯定會大于 -1,是以訂閱者會馬上收到訂閱之前釋出的最新消息!!

7.2 然後看一下第二個問題

  • 首先看一下postValue源代碼,如下所示:
    • 看代碼注釋中說,如果在多線程中同一個時刻,多次調用了 postValue() 方法,隻有最後次調用的值會得到更新。也就是此方法是有可能會丢失事件!
    • postValue 隻是把傳進來的資料先存到 mPendingData,ArchTaskExecutor.getInstance()擷取的是一個單利對象。然後往主線程抛一個 Runnable,在這個 Runnable 裡面再調用 setValue 來把存起來的值真正設定上去,并回調觀察者們。而如果在這個 Runnable 執行前多次 postValue,其實隻是改變暫存的值 mPendingData,并不會再次抛另一個 Runnable。
    protected void postValue(T value) {
        boolean postTask;
        synchronized (mDataLock) {
            postTask = mPendingData == NOT_SET;
            mPendingData = value;
        }
        if (!postTask) {
            return;
        }
        ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable);
    }
    
    private final Runnable mPostValueRunnable = new Runnable() {
        @Override
        public void run() {
            Object newValue;
            synchronized (mDataLock) {
                newValue = mPendingData;
                mPendingData = NOT_SET;
            }
            //noinspection unchecked
            setValue((T) newValue);
        }
    };           

  • 根據之前的分析,隻需要在注冊一個新的訂閱者的時候把Wrapper的version設定成跟LiveData的version一緻即可。
  • 能不能從Map容器mObservers中取到LifecycleBoundObserver,然後再更改version呢?答案是肯定的,通過檢視SafeIterableMap的源碼我們發現有一個protected的get方法。是以,在調用observe的時候,我們可以通過反射拿到LifecycleBoundObserver,再把LifecycleBoundObserver的version設定成和LiveData一緻即可。
    @Override
    public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
        super.observe(owner, observer);
         hook(observer);
    }
    
    private void hook(@NonNull Observer<T> observer) {
        try {
            Class<LiveData> classLiveData = LiveData.class;
            Field fieldObservers = classLiveData.getDeclaredField("mObservers");
            fieldObservers.setAccessible(true);
            Object objectObservers = fieldObservers.get(this);
            Class<?> classObservers = objectObservers.getClass();
            Method methodGet = classObservers.getDeclaredMethod("get", Object.class);
            methodGet.setAccessible(true);
            Object objectWrapperEntry = methodGet.invoke(objectObservers, observer);
            Object objectWrapper = null;
            if (objectWrapperEntry instanceof Map.Entry) {
                objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();
            }
            if (objectWrapper != null) {
                Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass();
                Field fieldLastVersion = null;
                if (classObserverWrapper != null) {
                    fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");
                    fieldLastVersion.setAccessible(true);
                    Field fieldVersion = classLiveData.getDeclaredField("mVersion");
                    fieldVersion.setAccessible(true);
                    Object objectVersion = fieldVersion.get(this);
                    fieldLastVersion.set(objectWrapper, objectVersion);
                }
            }
        } catch (Exception e){
            e.printStackTrace();
        }
    }           
  • 同時還需要注意,在實作MutableLiveData自定義類BusMutableLiveData中,需要重寫這幾個方法。代碼如下所示:
    /**
     * 在給定的觀察者的生命周期内将給定的觀察者添加到觀察者清單所有者。
     * 事件是在主線程上分派的。如果LiveData已經有資料集合,它将被傳遞給觀察者。
     * @param owner                                 owner
     * @param observer                              observer
     */
    public void observeSticky(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
        super.observe(owner, observer);
    }
    
    /**
     * 将給定的觀察者添加到觀察者清單中。這個調用類似于{@link LiveData#observe(LifecycleOwner, Observer)}
     * 和一個LifecycleOwner, which總是積極的。這意味着給定的觀察者将接收所有事件,并且永遠不會 被自動删除。
     * 您應該手動調用{@link #removeObserver(Observer)}來停止 觀察這LiveData。
     * @param observer                              observer
     */
    public void observeStickyForever(@NonNull Observer<T> observer) {
        super.observeForever(observer);
    }           

9.1 模拟通過發送多個postValue消息出現丢失問題

  • 首先看看MutableLiveData源代碼,如下所示,這裡重點展示測試資料案例
    public void postValue(T value) {
        super.postValue(value);
    }           
  • 然後使用for循環,使用postValue發送100條消息事件,代碼如下所示:
    public void postValueCountTest() {
        sendCount = 100;
        receiveCount = 0;
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        for (int i = 0; i < sendCount; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    LiveDataBus2.get().getChannel(Constant.LIVE_BUS3).postValue("test_1_data"+sendCount);
                }
            });
        }
        new Handler().postDelayed(new Runnable() {
            @Override
            public void run() {
                BusLogUtils.d("sendCount: " + sendCount + " | receiveCount: " + receiveCount);
                Toast.makeText(ThirdActivity4.this, "sendCount: " + sendCount +
                        " | receiveCount: " + receiveCount, Toast.LENGTH_LONG).show();
            }
        }, 1000);
    }
    //接收消息
    LiveDataBus2.get()
            .getChannel(Constant.LIVE_BUS3, String.class)
            .observe(this, new Observer<String>() {
                @Override
                public void onChanged(@Nullable String s) {
                    receiveCount++;
                    BusLogUtils.d("接收消息--ThirdActivity4------yc_bus---1-"+s+"----"+receiveCount);
                }
            });           
  • 然後看一下列印日志,是不是發現了什麼問題?發現根本沒有100條資料……
    2020-03-03 10:25:51.397 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----1
    2020-03-03 10:25:51.397 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----2
    2020-03-03 10:25:51.397 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----3
    2020-03-03 10:25:51.403 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----4           

9.2 修改後使用handler處理postValue消息

  • 既然post是在子線程中發送消息事件,那麼可不可以使用handler将它放到主線程中處理事件了,是可以的,代碼如下所示
    /**
     * 子線程發送事件
     * @param value                                 value
     */
    @Override
    public void postValue(T value) {
        //注意,去掉super方法,
        //super.postValue(value);
        mainHandler.post(new PostValueTask(value));
    }
    
    private BusWeakHandler mainHandler = new BusWeakHandler(Looper.getMainLooper());
    
    private class PostValueTask implements Runnable {
    
        private T newValue;
    
        public PostValueTask(@NonNull T newValue) {
            this.newValue = newValue;
        }
    
        @Override
        public void run() {
            setValue(newValue);
        }
    }           
  • 然後再次使用for循環,發送100條消息事件,檢視日志。發現就會剛好有100條資料。代碼這裡就不展示了,跟上面測試代碼類似。

  • 可以知道,通過postValue可以在子線程發送消息,那麼發送延遲消息也十分簡單,代碼如下所示:
    /**
     * 子線程發送事件
     * @param value                                 value
     */
    @Override
    public void postValue(T value) {
        //注意,去掉super方法,
        //super.postValue(value);
        mainHandler.post(new PostValueTask(value));
    }
    
    /**
     * 發送延遲事件
     * @param value                                 value
     * @param delay                                 延遲時間
     */
    @Override
    public void postValueDelay(T value, long delay) {
        mainHandler.postDelayed(new PostValueTask(value) , delay);
        //mainHandler.postAtTime(new PostValueTask(value) , delay);
    }           
  • 測試用例,延遲5秒鐘發送事件,代碼如下所示。具體可以看demo鐘的案例!
    LiveDataBus.get().with(Constant.LIVE_BUS4).postValueDelay("test_4_data",5000);           

  • 輪訓延遲事件,比如有的頁面需要實作,每間隔5秒鐘就重新整理一次頁面資料,常常用于活動頁面。在購物商城這類需求很常見
    @Override
    public void postValueInterval(final T value, final long interval) {
        mainHandler.postDelayed(new Runnable() {
            @Override
            public void run() {
                setValue(value);
                mainHandler.postDelayed(this,interval);
            }
        },interval);
    }           
  • 測試用例,輪訓延遲3秒鐘發送事件,代碼如下所示。具體可以看demo鐘的案例!
    LiveDataBus.get().with(Constant.LIVE_BUS5).postValueInterval("test_5_data",3000);           
  • 這裡遇到了一個問題,假如有多個頁面有這種輪訓發送事件的需求,顯然這個是實作不了的。那麼可不可以把每個輪訓runnable記錄一個名稱差別開來代碼更更改如下
    /**
     * 發送延遲事件,間隔輪訓
     * @param value                                 value
     * @param interval                              間隔
     */
    @Deprecated
    @Override
    public void postValueInterval(final T value, final long interval,@NonNull String taskName) {
        if(taskName.isEmpty()){
            return;
        }
        IntervalValueTask  intervalTask = new IntervalValueTask(value,interval);
        intervalTasks.put(taskName,intervalTask);
        mainHandler.postDelayed(intervalTask,interval);
    }
    
    private class IntervalValueTask implements Runnable {
    
        private T newValue;
        private long interval;
    
        public IntervalValueTask(T newValue, long interval) {
            this.newValue = newValue;
            this.interval = interval;
        }
    
        @Override
        public void run() {
            setValue(newValue);
            mainHandler.postDelayed(this,interval);
        }
    }           
  • 輪訓總不可以一直持續下去吧,這個時候可以添加一個手動關閉輪訓的方法。代碼如下所示:
    /**
     * 停止輪訓間隔發送事件
     */
    @Deprecated
    @Override
    public void stopPostInterval(@NonNull String taskName) {
        IntervalValueTask  intervalTask  = intervalTasks.get(taskName);
        if(intervalTask!= null){
            //移除callback
            mainHandler.removeCallbacks(intervalTask);
            intervalTasks.remove(taskName);
        }
    }           

  • 代碼如下所示
    public class SafeCastObserver<T> implements Observer<T> {
    
        @NonNull
        private final Observer<T> observer;
    
        public SafeCastObserver(@NonNull Observer<T> observer) {
            this.observer = observer;
        }
    
        @Override
        public void onChanged(@Nullable T t) {
            //捕獲異常,避免出現異常之後,收不到後續的消息事件
            try {
                //注意為了避免轉換出現的異常,try-catch捕獲
                observer.onChanged(t);
            } catch (ClassCastException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }           

  • 生命周期感覺能力就是當在Android平台的LifecycleOwner(如Activity)中使用的時候,隻需要訂閱消息,而不需要取消訂閱消息。LifecycleOwner的生命周期結束的時候,會自動取消訂閱。這帶來了兩個好處:
    • 可以在任何位置訂閱消息,而不是必須在onCreate方法中訂閱
    • 避免了忘記取消訂閱引起的記憶體洩漏
  • 具體已經對lifecycle源碼作出了分析,具體可以看我上一篇的部落格。 Lifecycle詳細分析

參考内容

繼續閱讀