天天看點

Android 基于Jetpack LiveData實作消息總線

作者:mqcoder
Android 基于Jetpack LiveData實作消息總線

morning

前面的文章介紹了LiveData的基本用法,本文來介紹一下LiveData的一個進階用法 — 基于LiveData實作消息總線

消息總線

在Android開發中,跨頁面傳遞資料(尤其是跨多個頁面傳遞資料)是一個很常見的操作,可以通過Handler、接口回調等方式進行傳遞,但這幾種方式都不太優雅,消息總線傳遞資料的方式相比更優雅。

消息總線最大的優勢就是解耦,避免了類與類之間強耦合,通常消息總線有以下幾種實作方式:

  • EventBus:https://github.com/greenrobot/EventBus
  • RxBus : 基于RxJava實作的消息總線
  • LiveDataBus:基于Jetpack中的LiveData實作,也是本文主要介紹的實作方式。

EventBus

EventBus整體思想如下:

Android 基于Jetpack LiveData實作消息總線

eventBus.png

EventBus基于釋出/訂閱模式,釋出者和訂閱者是一對多的關系,釋出者隻有一個,訂閱者可以有多個,他們之間都是通過EventBus這個排程中心來進行資料處理與傳遞。其中釋出者将資料傳遞到排程中心,然後排程中心會找到該釋出者對應的訂閱者,并将資料依次傳遞到訂閱者,進而完成了資料的傳遞;如果沒有訂閱者,那麼也就不會傳遞資料了。整個過程釋出者和訂閱者不需要知道彼此的存在,即資料傳遞過程是解耦的。

RxBus

RxBus本身是依賴RxJava的強大功能實作的。RxJava中有一個Subject,是一種特殊的存在,它既是Observable,又是Observer,可以将其看做一個橋梁或代理。Subject有以下四種:

  • AsyncSubject: 無論訂閱發生在什麼時候,Observer隻會接收AsyncSubject發送的在onComplete()之前的最後一個資料,且onComplete()是必須要調用的。
  • BehaviorSubject:Observer會先接收BehaviorSubject被訂閱之前的最後一個事件,然後接收訂閱之後發送的所有事件。
  • PublishSubject: Observer隻接收PublishSubject被訂閱之後發送的事件。
  • ReplaySubject:無論subscribe訂閱是何時開始的,Observer會接收ReplaySubject發送的所有事件。

具體使用方式可以參考:RxJava中關于Subject和Processor的使用:https://blog.csdn.net/u013700502/article/details/113821445

可以通過Subject來實作一個消息總線,因為不是本文的重點介紹,就不再貼代碼了,可以自行搜尋其實作方式。

LiveDataBus

LiveDataBus是基于LiveData實作的,上篇文章中詳細介紹了其用法及優點:

  • 確定界面符合資料狀态

    LiveData 遵循觀察者模式。當資料發生變化時,LiveData 會通知 Observer 對象,那麼Observer回調的方法中就可以進行UI更新,即資料驅動。

  • 不會發生記憶體洩漏

    觀察者會綁定到 Lifecycle 對象,并在其關聯的生命周期遭到銷毀(如Activity進入ONDESTROY狀态)後進行自我清理。

  • 不會因 Activity 停止而導緻崩潰

    如果觀察者的生命周期處于非活躍狀态(如傳回棧中的 Activity),則它不會接收任何 LiveData 事件。

  • 不再需要手動處理生命周期

    界面元件隻是觀察相關資料,不會停止或恢複觀察。LiveData 将自動管理所有這些操作,因為它在觀察時可以感覺相關的生命周期狀态變化。

  • 資料始終保持最新狀态

    如果生命周期變為非活躍狀态,它會在再次變為活躍狀态時接收最新的資料。例如,曾經在背景的 Activity 會在傳回前台後立即接收最新的資料。

  • 配置更改時自動儲存資料

    如果由于配置更改(如裝置旋轉)而重新建立了 Activity 或 Fragment,它會立即接收最新的可用資料。

  • 共享資源

    使用單例模式擴充 LiveData 對象以封裝系統服務,以便在應用中共享它們。LiveData 對象連接配接到系統服務一次,然後需要相應資源的任何觀察者隻需觀察 LiveData 對象。

原理

  • 消息:釋出者發送,訂閱者接收。消息可以是基本類型,也可以是自定義類型的消息。
  • 消息通道:LiveData 扮演了消息通道的角色,不同的消息通道用不同的名字區分,名字是 String 類型的,可以通過名字擷取到一個LiveData 消息通道。
  • 消息總線: 消息總線通過單例實作,不同的消息通道存放在一個HashMap 中。
  • 訂閱:訂閱者通過get() 擷取消息通道,然後調用 observe() 訂閱這個通道的消息。
  • 釋出:釋出者通過 get() 擷取消息通道,然後調用 setValue()釋出消息。
Android 基于Jetpack LiveData實作消息總線

LiveDataBus.png

圖檔來源:[LiveData實作消息總線]:https://tech.meituan.com/2018/07/26/android-livedatabus.html

LiveData實作消息總線的優勢

相比于EventBus、RxBus,使用LiveData實作消息總線有下面幾個優勢:

  • EventBus、RxBus、LiveDataBus都需要對事件進行注冊、解注冊。不同于EventBus、RxBus手動解注冊,LiveData可以自動管理生命周期,是以也能實作自動解注冊,避免忘記解注冊而導緻記憶體洩漏。
  • LiveData實作簡單,其為Jetpack中重要的一員,且為官方推出,支援更好
  • LiveData相比于EventBus、RxBus,類更少,包更小。

LiveData實作消息總線存在的隐患

LiveData預設是粘性消息

LiveData發送的消息為粘性消息,即先釋出後訂閱也能收到消息,再把訂閱observe()的邏輯貼出來:

@MainThread
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
    assertMainThread("observe");
    if (owner.getLifecycle().getCurrentState() == DESTROYED) {
        //如果目前觀察者處于DESTROYED狀态,直接傳回
        return;
    }
    //将LifecycleOwner、Observer包裝成LifecycleBoundObserver
    LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
    //ObserverWrapper是LifecycleBoundObserver的父類
    ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
    //如果mObservers中存在該Observer且跟傳進來的LifecycleOwner不同,直接抛異常,一個Observer隻能對應一個LifecycleOwner
    if (existing != null && !existing.isAttachedTo(owner)) {
        throw new IllegalArgumentException("Cannot add the same observer"
                + " with different lifecycles");
    }
    //如果已經存在Observer且跟傳進來的LifecycleOwner是同一個,直接傳回
    if (existing != null) {
        return;
    }
    //通過Lifecycle添加觀察者
    owner.getLifecycle().addObserver(wrapper);
}
           

最後執行addObserver()後,内部通過LifecycleRegistry添加Observer,進而會執行到onStateChanged()方法,該方法輾轉又調用到dispatchingValue方法(setValue/postValue最終也會調用到該方法),接着會調用到我們最關心的considerNotify():

void dispatchingValue(@Nullable ObserverWrapper initiator) {
        if (mDispatchingValue) {
            mDispatchInvalidated = true;
            return;
        }
        mDispatchingValue = true;
        do {
            mDispatchInvalidated = false;
            if (initiator != null) {
                //2、通過observe()的方式會調用這裡
                considerNotify(initiator);
                initiator = null;
            } else {
                //1、通過setValue/postValue的方式會調用這裡,周遊所有觀察者并進行分發
                for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator =
                        mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
                    considerNotify(iterator.next().getValue());
                    if (mDispatchInvalidated) {
                        break;
                    }
                }
            }
        } while (mDispatchInvalidated);
        mDispatchingValue = false;
    }

    private void considerNotify(ObserverWrapper observer) {
        if (!observer.mActive) {
            //觀察者不在活躍狀态 直接傳回
            return;
        }
        //如果是observe(),則是在STARTED、RESUMED狀态時活躍;如果是ObserveForever(),則認為一直是活躍狀态
        if (!observer.shouldBeActive()) {
            observer.activeStateChanged(false);
            return;
        }
        //Observer中的Version必須小于LiveData中的Version,防止重複發送
        if (observer.mLastVersion >= mVersion) {
            return;
        }
        observer.mLastVersion = mVersion;
        //回調Observer的onChange方法并接收資料
        observer.mObserver.onChanged((T) mData);
    }
           

可以看到considerNotify()裡有這麼一個邏輯:

if (observer.mLastVersion >= mVersion) {
   return;
   }
           

mVersion代表版本号,發送方、訂閱方都有這個變量,預設是-1。發送方每發送一個消息,mVersion都會進行+1操作;而Observer中的mVersion每成功接收一次消息,都會将發送方最新的version指派給自己的mLastVersion,當Observer中的mLastVersion>=發送方mVersion時,Observer會拒絕接收消息,防止重複發送消息。

是以,如果當發送方之前的mVersion不是預設值-1,說明LiveData發送過消息。如果此時執行LiveData.observe(),因為Observer中的mLastVersion為預設值-1,小于發送方的mVersion,是以該消息不會被攔截,Observer一定可以拿到之前發送的消息,即粘性消息。

LiveData.postValue可能會丢失消息

當頻繁使用LiveData.postValue發送多個消息時,LiveData.observe()接收消息時可能會發生丢失,為什麼會這樣呢?來看postValue()的内部實作

//LiveData.java

//postValue發送資料,可以在子線程中使用
protected void postValue(T value) {
    boolean postTask;
    synchronized (mDataLock) {
       //mPendingData預設值是NOT_SET,第一次發送時postTask是true
       postTask = mPendingData == NOT_SET;
       //将發送的值指派給mPendingData
       mPendingData = value;
    }
    //第一次發送時postTask是true,當第一個消息還未處理時,後面再發送消息時postTask會變成false,是以後面的消息都會被攔截,但是發送的值可以更新到第一次發送裡裡面
    if (!postTask) {
       return;
    }
    ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable);
 }

//setValue發送資料,隻能在主線程中使用
protected void setValue(T value) {
    assertMainThread("setValue");
    mVersion++;
    mData = value;
    dispatchingValue(null); 
 }

private final Runnable mPostValueRunnable = new Runnable() {
        @SuppressWarnings("unchecked")
        @Override
        public void run() {
            Object newValue;
            synchronized (mDataLock) {
                //将mPendingData中的值通過setValue傳給Observer,并将自身格式化為NOT_SET
                newValue = mPendingData;
                mPendingData = NOT_SET;
            }
            setValue((T) newValue);
     }
 };
           

詳細的過程寫在注釋中了,主要的原因就是postValue發送消息時,會判斷之前的消息是否已經處理,如果還未處理,會将目前發送的最新值更新到之前的消息中去(之前的消息存在mPendingData中,直接更新之),是以當多次頻繁使用postValue發送消息時,Observer收到的為最後一次發送的最新值。個人猜測官方這麼實作的目的主要是LiveData在MVVM架構中使用,既主要為了更新UI的最新資料即可,但是當用LiveData實作的消息總線時,可能就會出現丢失消息的隐患了,這是我們不想看到的,那麼怎麼解決呢?放棄使用postValue,都通過setValue去發送消息,如果是在子線程中發送消息,自行建構Handler發送到主線程中即可,後續貼代碼。

解決方案

支援粘性、非粘性消息

因為LiveData預設即是粘性消息,我們隻需要添加非粘性消息支援即可,LiveData的mVersion預設是private的,如果想在其他類中使用,可以通過反射擷取,但是效率相對低;還可以通過androidx.lifecycle包名來避免反射擷取LiveData.mVersion,代碼如下:

//package androidx.lifecycle
open class ExternalLiveData<T> : MutableLiveData<T>() {

    companion object {
        //通過androidx.lifecycle包名來避免反射擷取LiveData.START_VERSION
        const val START_VERSION = LiveData.START_VERSION
    }

    override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
        if (owner.lifecycle.currentState == Lifecycle.State.DESTROYED) {
            // ignore
            return
        }
        try {
            val wrapper = ExternalLifecycleBoundObserver(owner, observer)
            val existing =
                callMethodPutIfAbsent(observer, wrapper) as? LiveData<*>.LifecycleBoundObserver
            require(!(existing != null && !existing.isAttachedTo(owner))) {
                ("Cannot add the same observer" + " with different lifecycles")
            }
            if (existing != null) return
            owner.lifecycle.addObserver(wrapper)
        } catch (e: Exception) {
            //ignore
        }
    }

    //繼承父類并将修飾符改為public,可以對外暴露
    public override fun getVersion(): Int {
        return super.getVersion()
    }

    internal inner class ExternalLifecycleBoundObserver(
        owner: LifecycleOwner,
        observer: Observer<in T>?
    ) : LifecycleBoundObserver(owner, observer) {
        override fun shouldBeActive(): Boolean {
            return mOwner.lifecycle.currentState.isAtLeast(observerActiveLevel())
        }
    }

    /**
     * @return Lifecycle.State
     */
    protected open fun observerActiveLevel(): Lifecycle.State {
        return Lifecycle.State.STARTED
    }

    //反射擷取LiveData.mObservers
    private val fieldObservers: Any
        get() {
            val fieldObservers = LiveData::class.java.getDeclaredField("mObservers")
            fieldObservers.isAccessible = true
            return fieldObservers
        }

    /**
     * 反射調用LiveData的putIfAbsent方法
     */
    private fun callMethodPutIfAbsent(observer: Any, wrapper: Any): Any? {
        val mObservers = fieldObservers.javaClass
        val putIfAbsent =
            mObservers.getDeclaredMethod("putIfAbsent", Any::class.java, Any::class.java)
        putIfAbsent.isAccessible = true
        return putIfAbsent.invoke(mObservers, observer, wrapper)
    }
}
           

這樣外面就可以使用mVersion了,整體思路是通過裝飾者模式對Observer進行控制,如:

/**
 * Observer裝飾者模式
 */
class ObserverWrapper<T>(
        private val observer: Observer<T>,
        var preventNextEvent: Boolean = false
) : Observer<T> {
    override fun onChanged(t: T) {
        if (preventNextEvent) {
            preventNextEvent = false
            return
        }
        observer.onChanged(t)
    }
}
           

非粘性消息:

val observerWrapper = ObserverWrapper(observer)
observerWrapper.preventNextEvent = liveData.version > ExternalLiveData.START_VERSION
liveData.observe(owner, observerWrapper)
           

liveData.version > ExternalLiveData.START_VERSION說明liveData裡發送過消息,version值已經不是初始值,如果是後注冊的觀察者,observerWrapper.preventNextEvent傳回的是true,即會屏蔽目前消息,觀察者不執行;如果是先注冊的觀察者,則不受影響,這樣就是實作了非粘性消息。

粘性消息:

val observerWrapper = ObserverWrapper(observer)
liveData.observe(owner, observerWrapper)
           

沒什麼可說的,預設就是粘性的,無需特殊處理。

支援子線程發送消息

判斷是否在主線程:

object ThreadUtils {

    /**
     * 是否是在主線程
     */
    fun isMainThread(): Boolean {
        return Looper.myLooper() == Looper.getMainLooper()
    }
}
           

發送消息時判斷目前所線上程:

private val mainHandler = Handler(Looper.getMainLooper())

override fun post(value: T) {
    if (ThreadUtils.isMainThread()) {
        postInternal(value)
    } else {
        mainHandler.post(PostValueTask(value))
    }
}

@MainThread
private fun postInternal(value: T) {
    liveData.value = value
}

inner class PostValueTask(val newValue: T) : Runnable {
    override fun run() {
        postInternal(newValue)
    }
}
           

當post消息時,先判斷目前所線上程,主線程的話直接發送,在子線程的話通過MainHandler将消息發送到主線程再發送,進而支援了在子線程發送消息。

參考

【1】https://tech.meituan.com/2018/07/26/android-livedatabus.html

【2】文中代碼主要來自 https://github.com/JeremyLiao/LiveEventBus

繼續閱讀