morning
前面的文章介紹了LiveData的基本用法,本文來介紹一下LiveData的一個進階用法 — 基于LiveData實作消息總線
消息總線
在Android開發中,跨頁面傳遞資料(尤其是跨多個頁面傳遞資料)是一個很常見的操作,可以通過Handler、接口回調等方式進行傳遞,但這幾種方式都不太優雅,消息總線傳遞資料的方式相比更優雅。
消息總線最大的優勢就是解耦,避免了類與類之間強耦合,通常消息總線有以下幾種實作方式:
- EventBus:https://github.com/greenrobot/EventBus
- RxBus : 基于RxJava實作的消息總線
- LiveDataBus:基于Jetpack中的LiveData實作,也是本文主要介紹的實作方式。
EventBus
EventBus整體思想如下:
eventBus.png
EventBus基于釋出/訂閱模式,釋出者和訂閱者是一對多的關系,釋出者隻有一個,訂閱者可以有多個,他們之間都是通過EventBus這個排程中心來進行資料處理與傳遞。其中釋出者将資料傳遞到排程中心,然後排程中心會找到該釋出者對應的訂閱者,并将資料依次傳遞到訂閱者,進而完成了資料的傳遞;如果沒有訂閱者,那麼也就不會傳遞資料了。整個過程釋出者和訂閱者不需要知道彼此的存在,即資料傳遞過程是解耦的。
RxBus
RxBus本身是依賴RxJava的強大功能實作的。RxJava中有一個Subject,是一種特殊的存在,它既是Observable,又是Observer,可以将其看做一個橋梁或代理。Subject有以下四種:
- AsyncSubject: 無論訂閱發生在什麼時候,Observer隻會接收AsyncSubject發送的在onComplete()之前的最後一個資料,且onComplete()是必須要調用的。
- BehaviorSubject:Observer會先接收BehaviorSubject被訂閱之前的最後一個事件,然後接收訂閱之後發送的所有事件。
- PublishSubject: Observer隻接收PublishSubject被訂閱之後發送的事件。
- ReplaySubject:無論subscribe訂閱是何時開始的,Observer會接收ReplaySubject發送的所有事件。
具體使用方式可以參考:RxJava中關于Subject和Processor的使用:https://blog.csdn.net/u013700502/article/details/113821445
可以通過Subject來實作一個消息總線,因為不是本文的重點介紹,就不再貼代碼了,可以自行搜尋其實作方式。
LiveDataBus
LiveDataBus是基于LiveData實作的,上篇文章中詳細介紹了其用法及優點:
-
確定界面符合資料狀态
LiveData 遵循觀察者模式。當資料發生變化時,LiveData 會通知 Observer 對象,那麼Observer回調的方法中就可以進行UI更新,即資料驅動。
-
不會發生記憶體洩漏
觀察者會綁定到 Lifecycle 對象,并在其關聯的生命周期遭到銷毀(如Activity進入ONDESTROY狀态)後進行自我清理。
-
不會因 Activity 停止而導緻崩潰
如果觀察者的生命周期處于非活躍狀态(如傳回棧中的 Activity),則它不會接收任何 LiveData 事件。
-
不再需要手動處理生命周期
界面元件隻是觀察相關資料,不會停止或恢複觀察。LiveData 将自動管理所有這些操作,因為它在觀察時可以感覺相關的生命周期狀态變化。
-
資料始終保持最新狀态
如果生命周期變為非活躍狀态,它會在再次變為活躍狀态時接收最新的資料。例如,曾經在背景的 Activity 會在傳回前台後立即接收最新的資料。
-
配置更改時自動儲存資料
如果由于配置更改(如裝置旋轉)而重新建立了 Activity 或 Fragment,它會立即接收最新的可用資料。
-
共享資源
使用單例模式擴充 LiveData 對象以封裝系統服務,以便在應用中共享它們。LiveData 對象連接配接到系統服務一次,然後需要相應資源的任何觀察者隻需觀察 LiveData 對象。
原理
- 消息:釋出者發送,訂閱者接收。消息可以是基本類型,也可以是自定義類型的消息。
- 消息通道:LiveData 扮演了消息通道的角色,不同的消息通道用不同的名字區分,名字是 String 類型的,可以通過名字擷取到一個LiveData 消息通道。
- 消息總線: 消息總線通過單例實作,不同的消息通道存放在一個HashMap 中。
- 訂閱:訂閱者通過get() 擷取消息通道,然後調用 observe() 訂閱這個通道的消息。
- 釋出:釋出者通過 get() 擷取消息通道,然後調用 setValue()釋出消息。
LiveDataBus.png
圖檔來源:[LiveData實作消息總線]:https://tech.meituan.com/2018/07/26/android-livedatabus.html
LiveData實作消息總線的優勢
相比于EventBus、RxBus,使用LiveData實作消息總線有下面幾個優勢:
- EventBus、RxBus、LiveDataBus都需要對事件進行注冊、解注冊。不同于EventBus、RxBus手動解注冊,LiveData可以自動管理生命周期,是以也能實作自動解注冊,避免忘記解注冊而導緻記憶體洩漏。
- LiveData實作簡單,其為Jetpack中重要的一員,且為官方推出,支援更好
- LiveData相比于EventBus、RxBus,類更少,包更小。
LiveData實作消息總線存在的隐患
LiveData預設是粘性消息
LiveData發送的消息為粘性消息,即先釋出後訂閱也能收到消息,再把訂閱observe()的邏輯貼出來:
@MainThread
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
assertMainThread("observe");
if (owner.getLifecycle().getCurrentState() == DESTROYED) {
//如果目前觀察者處于DESTROYED狀态,直接傳回
return;
}
//将LifecycleOwner、Observer包裝成LifecycleBoundObserver
LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
//ObserverWrapper是LifecycleBoundObserver的父類
ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
//如果mObservers中存在該Observer且跟傳進來的LifecycleOwner不同,直接抛異常,一個Observer隻能對應一個LifecycleOwner
if (existing != null && !existing.isAttachedTo(owner)) {
throw new IllegalArgumentException("Cannot add the same observer"
+ " with different lifecycles");
}
//如果已經存在Observer且跟傳進來的LifecycleOwner是同一個,直接傳回
if (existing != null) {
return;
}
//通過Lifecycle添加觀察者
owner.getLifecycle().addObserver(wrapper);
}
最後執行addObserver()後,内部通過LifecycleRegistry添加Observer,進而會執行到onStateChanged()方法,該方法輾轉又調用到dispatchingValue方法(setValue/postValue最終也會調用到該方法),接着會調用到我們最關心的considerNotify():
void dispatchingValue(@Nullable ObserverWrapper initiator) {
if (mDispatchingValue) {
mDispatchInvalidated = true;
return;
}
mDispatchingValue = true;
do {
mDispatchInvalidated = false;
if (initiator != null) {
//2、通過observe()的方式會調用這裡
considerNotify(initiator);
initiator = null;
} else {
//1、通過setValue/postValue的方式會調用這裡,周遊所有觀察者并進行分發
for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator =
mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
considerNotify(iterator.next().getValue());
if (mDispatchInvalidated) {
break;
}
}
}
} while (mDispatchInvalidated);
mDispatchingValue = false;
}
private void considerNotify(ObserverWrapper observer) {
if (!observer.mActive) {
//觀察者不在活躍狀态 直接傳回
return;
}
//如果是observe(),則是在STARTED、RESUMED狀态時活躍;如果是ObserveForever(),則認為一直是活躍狀态
if (!observer.shouldBeActive()) {
observer.activeStateChanged(false);
return;
}
//Observer中的Version必須小于LiveData中的Version,防止重複發送
if (observer.mLastVersion >= mVersion) {
return;
}
observer.mLastVersion = mVersion;
//回調Observer的onChange方法并接收資料
observer.mObserver.onChanged((T) mData);
}
可以看到considerNotify()裡有這麼一個邏輯:
if (observer.mLastVersion >= mVersion) {
return;
}
mVersion代表版本号,發送方、訂閱方都有這個變量,預設是-1。發送方每發送一個消息,mVersion都會進行+1操作;而Observer中的mVersion每成功接收一次消息,都會将發送方最新的version指派給自己的mLastVersion,當Observer中的mLastVersion>=發送方mVersion時,Observer會拒絕接收消息,防止重複發送消息。
是以,如果當發送方之前的mVersion不是預設值-1,說明LiveData發送過消息。如果此時執行LiveData.observe(),因為Observer中的mLastVersion為預設值-1,小于發送方的mVersion,是以該消息不會被攔截,Observer一定可以拿到之前發送的消息,即粘性消息。
LiveData.postValue可能會丢失消息
當頻繁使用LiveData.postValue發送多個消息時,LiveData.observe()接收消息時可能會發生丢失,為什麼會這樣呢?來看postValue()的内部實作
//LiveData.java
//postValue發送資料,可以在子線程中使用
protected void postValue(T value) {
boolean postTask;
synchronized (mDataLock) {
//mPendingData預設值是NOT_SET,第一次發送時postTask是true
postTask = mPendingData == NOT_SET;
//将發送的值指派給mPendingData
mPendingData = value;
}
//第一次發送時postTask是true,當第一個消息還未處理時,後面再發送消息時postTask會變成false,是以後面的消息都會被攔截,但是發送的值可以更新到第一次發送裡裡面
if (!postTask) {
return;
}
ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable);
}
//setValue發送資料,隻能在主線程中使用
protected void setValue(T value) {
assertMainThread("setValue");
mVersion++;
mData = value;
dispatchingValue(null);
}
private final Runnable mPostValueRunnable = new Runnable() {
@SuppressWarnings("unchecked")
@Override
public void run() {
Object newValue;
synchronized (mDataLock) {
//将mPendingData中的值通過setValue傳給Observer,并将自身格式化為NOT_SET
newValue = mPendingData;
mPendingData = NOT_SET;
}
setValue((T) newValue);
}
};
詳細的過程寫在注釋中了,主要的原因就是postValue發送消息時,會判斷之前的消息是否已經處理,如果還未處理,會将目前發送的最新值更新到之前的消息中去(之前的消息存在mPendingData中,直接更新之),是以當多次頻繁使用postValue發送消息時,Observer收到的為最後一次發送的最新值。個人猜測官方這麼實作的目的主要是LiveData在MVVM架構中使用,既主要為了更新UI的最新資料即可,但是當用LiveData實作的消息總線時,可能就會出現丢失消息的隐患了,這是我們不想看到的,那麼怎麼解決呢?放棄使用postValue,都通過setValue去發送消息,如果是在子線程中發送消息,自行建構Handler發送到主線程中即可,後續貼代碼。
解決方案
支援粘性、非粘性消息
因為LiveData預設即是粘性消息,我們隻需要添加非粘性消息支援即可,LiveData的mVersion預設是private的,如果想在其他類中使用,可以通過反射擷取,但是效率相對低;還可以通過androidx.lifecycle包名來避免反射擷取LiveData.mVersion,代碼如下:
//package androidx.lifecycle
open class ExternalLiveData<T> : MutableLiveData<T>() {
companion object {
//通過androidx.lifecycle包名來避免反射擷取LiveData.START_VERSION
const val START_VERSION = LiveData.START_VERSION
}
override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
if (owner.lifecycle.currentState == Lifecycle.State.DESTROYED) {
// ignore
return
}
try {
val wrapper = ExternalLifecycleBoundObserver(owner, observer)
val existing =
callMethodPutIfAbsent(observer, wrapper) as? LiveData<*>.LifecycleBoundObserver
require(!(existing != null && !existing.isAttachedTo(owner))) {
("Cannot add the same observer" + " with different lifecycles")
}
if (existing != null) return
owner.lifecycle.addObserver(wrapper)
} catch (e: Exception) {
//ignore
}
}
//繼承父類并将修飾符改為public,可以對外暴露
public override fun getVersion(): Int {
return super.getVersion()
}
internal inner class ExternalLifecycleBoundObserver(
owner: LifecycleOwner,
observer: Observer<in T>?
) : LifecycleBoundObserver(owner, observer) {
override fun shouldBeActive(): Boolean {
return mOwner.lifecycle.currentState.isAtLeast(observerActiveLevel())
}
}
/**
* @return Lifecycle.State
*/
protected open fun observerActiveLevel(): Lifecycle.State {
return Lifecycle.State.STARTED
}
//反射擷取LiveData.mObservers
private val fieldObservers: Any
get() {
val fieldObservers = LiveData::class.java.getDeclaredField("mObservers")
fieldObservers.isAccessible = true
return fieldObservers
}
/**
* 反射調用LiveData的putIfAbsent方法
*/
private fun callMethodPutIfAbsent(observer: Any, wrapper: Any): Any? {
val mObservers = fieldObservers.javaClass
val putIfAbsent =
mObservers.getDeclaredMethod("putIfAbsent", Any::class.java, Any::class.java)
putIfAbsent.isAccessible = true
return putIfAbsent.invoke(mObservers, observer, wrapper)
}
}
這樣外面就可以使用mVersion了,整體思路是通過裝飾者模式對Observer進行控制,如:
/**
* Observer裝飾者模式
*/
class ObserverWrapper<T>(
private val observer: Observer<T>,
var preventNextEvent: Boolean = false
) : Observer<T> {
override fun onChanged(t: T) {
if (preventNextEvent) {
preventNextEvent = false
return
}
observer.onChanged(t)
}
}
非粘性消息:
val observerWrapper = ObserverWrapper(observer)
observerWrapper.preventNextEvent = liveData.version > ExternalLiveData.START_VERSION
liveData.observe(owner, observerWrapper)
liveData.version > ExternalLiveData.START_VERSION說明liveData裡發送過消息,version值已經不是初始值,如果是後注冊的觀察者,observerWrapper.preventNextEvent傳回的是true,即會屏蔽目前消息,觀察者不執行;如果是先注冊的觀察者,則不受影響,這樣就是實作了非粘性消息。
粘性消息:
val observerWrapper = ObserverWrapper(observer)
liveData.observe(owner, observerWrapper)
沒什麼可說的,預設就是粘性的,無需特殊處理。
支援子線程發送消息
判斷是否在主線程:
object ThreadUtils {
/**
* 是否是在主線程
*/
fun isMainThread(): Boolean {
return Looper.myLooper() == Looper.getMainLooper()
}
}
發送消息時判斷目前所線上程:
private val mainHandler = Handler(Looper.getMainLooper())
override fun post(value: T) {
if (ThreadUtils.isMainThread()) {
postInternal(value)
} else {
mainHandler.post(PostValueTask(value))
}
}
@MainThread
private fun postInternal(value: T) {
liveData.value = value
}
inner class PostValueTask(val newValue: T) : Runnable {
override fun run() {
postInternal(newValue)
}
}
當post消息時,先判斷目前所線上程,主線程的話直接發送,在子線程的話通過MainHandler将消息發送到主線程再發送,進而支援了在子線程發送消息。
參考
【1】https://tech.meituan.com/2018/07/26/android-livedatabus.html
【2】文中代碼主要來自 https://github.com/JeremyLiao/LiveEventBus