RxJava背壓基礎講解
https://blog.csdn.net/weixin_37730482/article/details/78085190
本章節繼續講述RxJava的背壓問題
一.控制 觀察者 接收事件的速度
<1> 前言
當被觀察者發送消息的速度和觀察者接收消息的速度不比對時,可以控制觀察者接收消息的速度。
原理是 不管被觀察者發送了多少條資料,觀察者僅根據自身情況選擇接收幾條消息。哪怕被觀察者發送量100條資料,若觀察者僅需要兩條消息,則隻接收兩條消息。忽略另外98條消息。
<2> 代碼
package com.example.mydemo.rxjava;
import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;
import com.example.mydemo.R;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.schedulers.Schedulers;
public class RxJavaActivity extends AppCompatActivity {
private Subscription mSubscription;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjava);
method();
}
/**
* RxJava Flowable建立被觀察者 背壓
*/
public void method() {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
if (null == e) {
return;
}
/**
* 測試 被觀察者發送五條消息
* */
if (!e.isCancelled()) {
for (int i = 0; i < 5; i++) {
e.onNext("Flowable建立被觀察者 被觀察者發送第 " + i + " 波資料");
}
e.onComplete();
}
}
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())//被觀察者在io線程中進行
.observeOn(AndroidSchedulers.mainThread())//觀察者在主線程中進行
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
if (null == s) {
return;
}
mSubscription = s;
mSubscription.request(4);//測試 被觀察者發送五條消息 觀察者隻接收四條
}
@Override
public void onNext(Object o) {
if (null == o) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:" + o.toString());
}
@Override
public void onError(Throwable t) {
if (null == t) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
}
@Override
public void onComplete() {
Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
}
});
}
/**
* onDestroy方法 取消訂閱
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (null != mSubscription) {
mSubscription.cancel();
Log.d("TAG", "退出頁面 取消訂閱");
}
}
}
<3> 結果
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 0 波資料
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 1 波資料
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 2 波資料
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 3 波資料
<4> 說明
被觀察者發送五條消息。觀察者通過onSubscribe方法中參數Subscription的request(number)方法設定隻接收number即四條消息。
<5> 重要說明&異步訂閱
Flowable抽象類建立被觀察者方式 如果被觀察者和觀察者是異步訂閱的 如果不設定
mSubscription.request(N);//設定 觀察者隻接收N條資料
則說明 觀察者不接收消息。此時 被觀察者仍然可以發送消息。并将消息存在緩存區。等待觀察者需要時再取出。即調用
mSubscription.request(N);//設定 觀察者隻接收N條資料
方法。
(1) 代碼
package com.example.mydemo.rxjava;
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.TextView;
import androidx.appcompat.app.AppCompatActivity;
import com.example.mydemo.R;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.schedulers.Schedulers;
public class RxJavaActivity extends AppCompatActivity {
private Subscription mSubscription;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjava);
findView();
method();
}
/**
* 初始化各種View
*/
private void findView() {
TextView textView = findViewById(R.id.activity_rxjava_textview);
textView.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
if (null != mSubscription) {
mSubscription.request(4);//觀察者隻接收四條
}
}
});
}
/**
* RxJava Flowable建立被觀察者 背壓
*/
public void method() {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
if (null == e) {
return;
}
/**
* 測試 被觀察者發送五條消息
* */
if (!e.isCancelled()) {
for (int i = 0; i < 5; i++) {
e.onNext("Flowable建立被觀察者 被觀察者發送第 " + i + " 波資料");
}
e.onComplete();
}
}
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())//被觀察者在io線程中進行
.observeOn(AndroidSchedulers.mainThread())//觀察者在主線程中進行
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
if (null == s) {
return;
}
mSubscription = s;
}
@Override
public void onNext(Object o) {
if (null == o) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:" + o.toString());
}
@Override
public void onError(Throwable t) {
if (null == t) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
}
@Override
public void onComplete() {
Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
}
});
}
/**
* onDestroy方法 取消訂閱
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (null != mSubscription) {
mSubscription.cancel();
Log.d("TAG", "退出頁面 取消訂閱");
}
}
}
(2) 結果
預設
無列印 即無消息接收
點選按鈕 點選第一次 取出四條消息。
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 0 波資料
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 1 波資料
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 2 波資料
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 3 波資料
點選按鈕 點選第二次 取出剩餘消息并執行onComplete方法。
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 4 波資料
D/TAG: Flowable建立被觀察者 觀察者 onComplete方法
點選按鈕 點選第三四五...次
無列印 即無消息接收。
<6> 重要說明&同步訂閱
同步訂閱,即被觀察者和觀察者處在同一線程中。同步訂閱關系中沒有緩存區。
同步訂閱&異步訂閱差別
訂閱關系 | 說明 |
同步訂閱 | 觀察者和被觀察者處在同一線程。被觀察者每發送一條消息。必須等到觀察者接收後才會發送下一條消息。 |
異步訂閱 | 觀察者和被觀察者處在不同的線程。被觀察者不需要等待觀察者處理消息後再發送下一條消息,而是不斷發送直到發送消息完畢。 |
代碼
[1] 被觀察者發送的消息數和觀察者設定接收的消息數一緻時。
(1) 代碼
package com.example.mydemo.rxjava;
import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;
import com.example.mydemo.R;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
public class RxJavaActivity extends AppCompatActivity {
private Subscription mSubscription;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjava);
method();
}
/**
* RxJava Flowable建立被觀察者 背壓
*/
public void method() {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
if (null == e) {
return;
}
/**
* 測試 被觀察者發送五條消息
* */
if (!e.isCancelled()) {
for (int i = 0; i < 5; i++) {
e.onNext("Flowable建立被觀察者 被觀察者發送第 " + i + " 波資料");
Log.d("TAG", "Flowable建立被觀察者 被觀察者發送資料 i----:" + i);
}
e.onComplete();
}
}
}, BackpressureStrategy.LATEST)
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
if (null == s) {
return;
}
mSubscription = s;
mSubscription.request(5);//觀察者接收五條
}
@Override
public void onNext(Object o) {
if (null == o) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:" + o.toString());
}
@Override
public void onError(Throwable t) {
if (null == t) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
}
@Override
public void onComplete() {
Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
}
});
}
/**
* onDestroy方法 取消訂閱
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (null != mSubscription) {
mSubscription.cancel();
Log.d("TAG", "退出頁面 取消訂閱");
}
}
}
(2) 結果
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 0 波資料
D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:0
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 1 波資料
D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:1
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 2 波資料
D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:2
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 3 波資料
D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:3
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 4 波資料
D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:4
D/TAG: Flowable建立被觀察者 觀察者 onComplete方法
(3) 綜上所述,同步訂閱情況下不會出現被觀察者發送事件速度 > 觀察者接收事件速度的情況。可是,卻會出現被觀察者發送事件數量 > 觀察者接收事件數量的問題。
[2] 被觀察者發送的消息數和觀察者設定接收的消息數不一緻時。
(1) 代碼
package com.example.mydemo.rxjava;
import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;
import com.example.mydemo.R;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
public class RxJavaActivity extends AppCompatActivity {
private Subscription mSubscription;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjava);
method();
}
/**
* RxJava Flowable建立被觀察者 背壓
*/
public void method() {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
if (null == e) {
return;
}
/**
* 測試 被觀察者發送五條消息
* */
if (!e.isCancelled()) {
for (int i = 0; i < 5; i++) {
e.onNext("Flowable建立被觀察者 被觀察者發送第 " + i + " 波資料");
Log.d("TAG", "Flowable建立被觀察者 被觀察者發送資料 i----:" + i);
}
e.onComplete();
}
}
}, BackpressureStrategy.LATEST)
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
if (null == s) {
return;
}
mSubscription = s;
mSubscription.request(3);//觀察者接收三條
}
@Override
public void onNext(Object o) {
if (null == o) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:" + o.toString());
}
@Override
public void onError(Throwable t) {
if (null == t) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
}
@Override
public void onComplete() {
Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
}
});
}
/**
* onDestroy方法 取消訂閱
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (null != mSubscription) {
mSubscription.cancel();
Log.d("TAG", "退出頁面 取消訂閱");
}
}
}
即 被觀察者發送五條資料 觀察者隻接收三條消息。
(2) 結果
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 0 波資料
D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:0
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 1 波資料
D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:1
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 2 波資料
D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:2
D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:3
D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:4
(3) 綜上所述 對于沒有緩存區概念的同步訂閱關系來說,單純采用控制觀察者的接收事件數量(響應式拉取)還是會出現問題。
[3] 點選時再執行觀察者的取方法
(1) 代碼
package com.example.mydemo.rxjava;
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.TextView;
import androidx.appcompat.app.AppCompatActivity;
import com.example.mydemo.R;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
public class RxJavaActivity extends AppCompatActivity {
private Subscription mSubscription;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjava);
findView();
method();
}
/**
* 初始化各種View
*/
private void findView() {
TextView textView = findViewById(R.id.activity_rxjava_textview);
textView.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
if (null != mSubscription) {
mSubscription.request(4);//觀察者隻接收四條
}
}
});
}
/**
* RxJava Flowable建立被觀察者 背壓
*/
public void method() {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
if (null == e) {
return;
}
/**
* 測試 被觀察者發送五條消息
* */
if (!e.isCancelled()) {
for (int i = 0; i < 5; i++) {
e.onNext("Flowable建立被觀察者 被觀察者發送第 " + i + " 波資料");
}
e.onComplete();
}
}
}, BackpressureStrategy.LATEST)
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
if (null == s) {
return;
}
mSubscription = s;
}
@Override
public void onNext(Object o) {
if (null == o) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:" + o.toString());
}
@Override
public void onError(Throwable t) {
if (null == t) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
}
@Override
public void onComplete() {
Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
}
});
}
/**
* onDestroy方法 取消訂閱
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (null != mSubscription) {
mSubscription.cancel();
Log.d("TAG", "退出頁面 取消訂閱");
}
}
}
(2) 結果
預設
無列印 即無消息接收
點選按鈕 第一次點選
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 4 波資料
D/TAG: Flowable建立被觀察者 觀察者 onComplete方法
點選按鈕第二三四五...次點選
無列印 即無消息接收
二.控制 被觀察者 發送事件的速度
<1> 前言
上述可知可以通過觀察者的onSubscribe方法中參數Subscription的request(N)方法控制觀察者可接收的消息數,繼而控制觀察者接收事件的速度。那麼被觀察者的發送事件的速度怎麼控制呢?
源碼分析
create操作符建立被觀察者 重寫 subscribe方法 該方法參數FlowableEmitter<Object> e 點選
public interface FlowableEmitter<T> extends Emitter<T> {
/**
* Sets a Disposable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param s the disposable, null is allowed
*/
void setDisposable(Disposable s);
/**
* Sets a Cancellable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param c the cancellable resource, null is allowed
*/
void setCancellable(Cancellable c);
/**
* The current outstanding request amount.
* <p>This method is thread-safe.
* @return the current outstanding request amount
*/
long requested();
/**
* Returns true if the downstream cancelled the sequence.
* <p>This method is thread-safe.
* @return true if the downstream cancelled the sequence
*/
boolean isCancelled();
/**
* Ensures that calls to onNext, onError and onComplete are properly serialized.
* @return the serialized FlowableEmitter
*/
FlowableEmitter<T> serialize();
}
可以看到 此參數可以判斷是否取消訂閱 isCancelled()方法。而request()方法可以擷取被觀察者中的消息數量。
<2> 同步訂閱 控制被觀察者發送消息速度
(1) 簡介
同步訂閱中,被觀察者和觀察者處在同一線程。被觀察者中request方法得到的數等于觀察者中Subscription.request()方法中設定的數。
(2) 代碼
package com.example.mydemo.rxjava;
import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;
import com.example.mydemo.R;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
public class RxJavaActivity extends AppCompatActivity {
private Subscription mSubscription;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjava);
method();
}
/**
* RxJava Flowable建立被觀察者 背壓
*/
public void method() {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
if (null == e) {
return;
}
long msgNumber = e.requested();
Log.d("TAG", "Flowable建立被觀察者 被觀察者 msgNumber----:" + msgNumber);
if (!e.isCancelled()) {
for (int i = 0; i < msgNumber; i++) {
e.onNext("Flowable建立被觀察者 被觀察者發送第 " + i + " 波資料");
}
e.onComplete();
}
}
}, BackpressureStrategy.LATEST)
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
if (null == s) {
return;
}
mSubscription = s;
mSubscription.request(5);//觀察者設定可接收五條消息
}
@Override
public void onNext(Object o) {
if (null == o) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:" + o.toString());
}
@Override
public void onError(Throwable t) {
if (null == t) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
}
@Override
public void onComplete() {
Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
}
});
}
/**
* onDestroy方法 取消訂閱
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (null != mSubscription) {
mSubscription.cancel();
Log.d("TAG", "退出頁面 取消訂閱");
}
}
}
(3) 結果
D/TAG: Flowable建立被觀察者 被觀察者 msgNumber----:5
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 0 波資料
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 1 波資料
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 2 波資料
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 3 波資料
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 4 波資料
D/TAG: Flowable建立被觀察者 觀察者 onComplete方法
(4) 說明
{1} 被觀察者的 e.requested();方法具有實時性。即被觀察者發送了一條資料 e.requested();方法就會減一。
代碼 在上基礎上改動
long msgNumber = e.requested();
Log.d("TAG", "Flowable建立被觀察者 被觀察者 msgNumber----:" + msgNumber);
if (!e.isCancelled()) {
for (int i = 0; i < msgNumber; i++) {
e.onNext("Flowable建立被觀察者 被觀察者發送第 " + i + " 波資料");
Log.d("TAG", "Flowable建立被觀察者 被觀察者 發送了i後e.requested()----:" + e.requested());
}
e.onComplete();
}
結果
D/TAG: Flowable建立被觀察者 被觀察者 msgNumber----:5
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 0 波資料
D/TAG: Flowable建立被觀察者 被觀察者 發送了i後e.requested()----:4
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 1 波資料
D/TAG: Flowable建立被觀察者 被觀察者 發送了i後e.requested()----:3
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 2 波資料
D/TAG: Flowable建立被觀察者 被觀察者 發送了i後e.requested()----:2
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 3 波資料
D/TAG: Flowable建立被觀察者 被觀察者 發送了i後e.requested()----:1
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 4 波資料
D/TAG: Flowable建立被觀察者 被觀察者 發送了i後e.requested()----:0
D/TAG: Flowable建立被觀察者 觀察者 onComplete方法
{2} 觀察者沒有設定可接收事件數量,即無調用Subscription.request()。那麼被觀察者預設觀察者可接收事件數量 = 0,即FlowableEmitter.requested()的傳回值 = 0。
{3} 可疊加性
代碼
mSubscription.request(5);//第一次設定 觀察者設定可接收五條消息
mSubscription.request(10);//第二條設定 觀察者設定可接收十條消息
結果 5+10=15
D/TAG: Flowable建立被觀察者 被觀察者 msgNumber----:15
<3> 異步訂閱 控制被觀察者發送消息速度
(1) 說明
異步訂閱中,被觀察者和觀察者處在不同的線程。是以被觀察者 無法通過 FlowableEmitter.requested()知道觀察者自身接收事件能力。即 被觀察者不能根據 觀察者自身接收事件的能力 控制發送事件的速度。
被觀察者中request方法得到的數==128,96,0。 和觀察者中Subscription.request()方法中設定的數不相等。
(2) 代碼
package com.example.mydemo.rxjava;
import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;
import com.example.mydemo.R;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.schedulers.Schedulers;
public class RxJavaActivity extends AppCompatActivity {
private Subscription mSubscription;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjava);
method();
}
/**
* RxJava Flowable建立被觀察者 背壓
*/
public void method() {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
if (null == e) {
return;
}
long msgNumber = e.requested();
Log.d("TAG", "Flowable建立被觀察者 被觀察者 msgNumber----:" + msgNumber);
if (!e.isCancelled()) {
for (int i = 0; i < msgNumber; i++) {
e.onNext("Flowable建立被觀察者 被觀察者發送第 " + i + " 波資料");
}
e.onComplete();
}
}
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())//被觀察者 IO線程
.observeOn(AndroidSchedulers.mainThread())//觀察者 UI線程
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
if (null == s) {
return;
}
mSubscription = s;
mSubscription.request(5);//觀察者設定可接收五條消息
}
@Override
public void onNext(Object o) {
if (null == o) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:" + o.toString());
}
@Override
public void onError(Throwable t) {
if (null == t) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
}
@Override
public void onComplete() {
Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
}
});
}
/**
* onDestroy方法 取消訂閱
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (null != mSubscription) {
mSubscription.cancel();
Log.d("TAG", "退出頁面 取消訂閱");
}
}
}
(3) 結果
D/TAG: Flowable建立被觀察者 被觀察者 msgNumber----:128
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 0 波資料
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 1 波資料
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 2 波資料
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 3 波資料
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第 4 波資料
即 觀察者設定可以接收的消息數是五條,但是被觀察者中請求的消息數是128。
三.Flowable其他操作符使用背壓政策
<1> 前言
上述的講解都是使用的Flowable的create操作符建立的被觀察者,那麼背壓政策可以在第二個參數傳入。如下。
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
if (null == e) {
return;
}
if (!e.isCancelled()) {
e.onNext("Flowable建立被觀察者 被觀察者發送第1波資料");
e.onNext("Flowable建立被觀察者 被觀察者發送第2波資料");
e.onNext("Flowable建立被觀察者 被觀察者發送第3波資料");
e.onComplete();
}
Log.d("TAG", "Flowable建立被觀察者 被觀察者 subscribe方法 線程----:" + Thread.currentThread().getName());
}
}, BackpressureStrategy.LATEST);
那麼使用其他的操作符呢,比如前節講的interval操作符。下面講解。
<2> 代碼
package com.wjn.rxdemo.rxjava;
import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;
import com.wjn.rxdemo.R;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.TimeUnit;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
public class RxJavaActivity extends AppCompatActivity {
private Subscription mSubscription;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjava);
method();
}
/**
* RxJava Flowable建立被觀察者 背壓
*/
private void method() {
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureBuffer() // 添加背壓政策封裝好的方法,此處選擇Buffer模式,即緩存區大小無限制
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
if (null == s) {
return;
}
mSubscription = s;
s.request(Long.MAX_VALUE);//觀察者設定可接收最多條數的消息
}
@Override
public void onNext(Long aLong) {
if (null == aLong) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 aLong----:" + aLong);
//模拟1秒接收
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
if (null == t) {
return;
}
Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
}
@Override
public void onComplete() {
Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
}
});
}
/**
* onDestroy方法 取消訂閱
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (null != mSubscription) {
mSubscription.cancel();
Log.d("TAG", "退出頁面 取消訂閱");
}
}
}
<3> 結果
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 aLong----:0
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 aLong----:1
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 aLong----:2
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 aLong----:3
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 aLong----:4
D/TAG: Flowable建立被觀察者 觀察者 onNext方法 aLong----:5
<4> 關閉頁面
D/TAG: 退出頁面 取消訂閱
<5> 說明
上述代碼中有這樣的設定
.onBackpressureBuffer() // 添加背壓政策封裝好的方法,此處選擇Buffer模式,即緩存區大小無限制
那麼還有沒有其他的設定呢,以及每個設定對應的含義是什麼呢。
舉例 | 含義 | 對create操作符 |
.onBackpressureBuffer() | 緩存區滿時 将緩存區設定無限大 注意記憶體溢出 | BUFFER |
.onBackpressureDrop() | 緩存區滿時 事件丢失 保留[0,127]條資料 即 前 128條資料 | DROP |
onBackpressureLatest() | 緩存區滿時 事件丢失 保留[0,127]條資料+超出的最後一條資料 | LATEST |
四.總結
<1> RxJava的背壓可以使用Flowable建立被觀察者來實作。背壓的使用大前提是 異步訂閱中,觀察者和被觀察者發送消息的速度或者數量不比對,一般是被觀察者發送的速度太快或者消息太多,觀察者來不及處理。
<2> 背壓一般應用在被觀察者和觀察者異步訂閱時,因為同步訂閱正式項目很少使用。(RxJava核心就是異步嘛)。
<3> 其實一般使用RxJava實作異步,隻要注意在适當的時機取消訂閱,防止記憶體洩漏即可。沒必要使用背壓。
<4> 其實處理被觀察者和觀察者比對問題也可以有其他的操作辦法,比如使用delay()操作符建立一個延時的被觀察者。再比如使用interval操作符建立指定時間的被觀察者等等。