天天看點

RxJava詳解(基于2.X版本的背壓詳解)(下)

RxJava背壓基礎講解

https://blog.csdn.net/weixin_37730482/article/details/78085190

本章節繼續講述RxJava的背壓問題

一.控制 觀察者 接收事件的速度

<1> 前言

當被觀察者發送消息的速度和觀察者接收消息的速度不比對時,可以控制觀察者接收消息的速度。

原理是 不管被觀察者發送了多少條資料,觀察者僅根據自身情況選擇接收幾條消息。哪怕被觀察者發送量100條資料,若觀察者僅需要兩條消息,則隻接收兩條消息。忽略另外98條消息。

<2> 代碼

package com.example.mydemo.rxjava;

import android.os.Bundle;
import android.util.Log;

import androidx.appcompat.app.AppCompatActivity;

import com.example.mydemo.R;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.schedulers.Schedulers;

public class RxJavaActivity extends AppCompatActivity {

    private Subscription mSubscription;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxjava);

        method();
    }

    /**
     * RxJava Flowable建立被觀察者 背壓
     */

    public void method() {

        Flowable.create(new FlowableOnSubscribe<Object>() {
            @Override
            public void subscribe(FlowableEmitter<Object> e) throws Exception {
                if (null == e) {
                    return;
                }

                /**
                 * 測試 被觀察者發送五條消息
                 * */

                if (!e.isCancelled()) {
                    for (int i = 0; i < 5; i++) {
                        e.onNext("Flowable建立被觀察者 被觀察者發送第   " + i + "   波資料");
                    }
                    e.onComplete();
                }
            }
        }, BackpressureStrategy.LATEST)
                .subscribeOn(Schedulers.io())//被觀察者在io線程中進行
                .observeOn(AndroidSchedulers.mainThread())//觀察者在主線程中進行
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        if (null == s) {
                            return;
                        }
                        mSubscription = s;
                        mSubscription.request(4);//測試 被觀察者發送五條消息 觀察者隻接收四條
                    }

                    @Override
                    public void onNext(Object o) {
                        if (null == o) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:" + o.toString());
                    }

                    @Override
                    public void onError(Throwable t) {
                        if (null == t) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
                    }
                });
    }

    /**
     * onDestroy方法 取消訂閱
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (null != mSubscription) {
            mSubscription.cancel();
            Log.d("TAG", "退出頁面 取消訂閱");
        }
    }

}
           

<3> 結果

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   0   波資料


D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   1   波資料

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   2   波資料

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   3   波資料
           

<4> 說明

被觀察者發送五條消息。觀察者通過onSubscribe方法中參數Subscription的request(number)方法設定隻接收number即四條消息。

<5> 重要說明&異步訂閱

Flowable抽象類建立被觀察者方式  如果被觀察者和觀察者是異步訂閱的 如果不設定

mSubscription.request(N);//設定 觀察者隻接收N條資料
           

則說明 觀察者不接收消息。此時 被觀察者仍然可以發送消息。并将消息存在緩存區。等待觀察者需要時再取出。即調用

mSubscription.request(N);//設定 觀察者隻接收N條資料
           

方法。

(1) 代碼

package com.example.mydemo.rxjava;

import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.TextView;

import androidx.appcompat.app.AppCompatActivity;

import com.example.mydemo.R;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.schedulers.Schedulers;

public class RxJavaActivity extends AppCompatActivity {

    private Subscription mSubscription;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxjava);
        findView();
        method();
    }

    /**
     * 初始化各種View
     */

    private void findView() {
        TextView textView = findViewById(R.id.activity_rxjava_textview);
        textView.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                if (null != mSubscription) {
                    mSubscription.request(4);//觀察者隻接收四條
                }
            }
        });
    }

    /**
     * RxJava Flowable建立被觀察者 背壓
     */

    public void method() {

        Flowable.create(new FlowableOnSubscribe<Object>() {
            @Override
            public void subscribe(FlowableEmitter<Object> e) throws Exception {
                if (null == e) {
                    return;
                }

                /**
                 * 測試 被觀察者發送五條消息
                 * */

                if (!e.isCancelled()) {
                    for (int i = 0; i < 5; i++) {
                        e.onNext("Flowable建立被觀察者 被觀察者發送第   " + i + "   波資料");
                    }
                    e.onComplete();
                }
            }
        }, BackpressureStrategy.LATEST)
                .subscribeOn(Schedulers.io())//被觀察者在io線程中進行
                .observeOn(AndroidSchedulers.mainThread())//觀察者在主線程中進行
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        if (null == s) {
                            return;
                        }
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Object o) {
                        if (null == o) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:" + o.toString());
                    }

                    @Override
                    public void onError(Throwable t) {
                        if (null == t) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
                    }
                });
    }

    /**
     * onDestroy方法 取消訂閱
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (null != mSubscription) {
            mSubscription.cancel();
            Log.d("TAG", "退出頁面 取消訂閱");
        }
    }

}
           

(2) 結果

預設

無列印 即無消息接收

點選按鈕 點選第一次 取出四條消息。

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   0   波資料

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   1   波資料

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   2   波資料

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   3   波資料
           

點選按鈕 點選第二次 取出剩餘消息并執行onComplete方法。

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   4   波資料

D/TAG: Flowable建立被觀察者 觀察者 onComplete方法 
           

點選按鈕 點選第三四五...次 

無列印 即無消息接收。

<6> 重要說明&同步訂閱

同步訂閱,即被觀察者和觀察者處在同一線程中。同步訂閱關系中沒有緩存區。

同步訂閱&異步訂閱差別

訂閱關系 說明
同步訂閱 觀察者和被觀察者處在同一線程。被觀察者每發送一條消息。必須等到觀察者接收後才會發送下一條消息。
異步訂閱 觀察者和被觀察者處在不同的線程。被觀察者不需要等待觀察者處理消息後再發送下一條消息,而是不斷發送直到發送消息完畢。

 代碼

[1] 被觀察者發送的消息數和觀察者設定接收的消息數一緻時。

(1) 代碼

package com.example.mydemo.rxjava;

import android.os.Bundle;
import android.util.Log;

import androidx.appcompat.app.AppCompatActivity;

import com.example.mydemo.R;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;

public class RxJavaActivity extends AppCompatActivity {

    private Subscription mSubscription;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxjava);
        method();
    }

    /**
     * RxJava Flowable建立被觀察者 背壓
     */

    public void method() {

        Flowable.create(new FlowableOnSubscribe<Object>() {
            @Override
            public void subscribe(FlowableEmitter<Object> e) throws Exception {
                if (null == e) {
                    return;
                }

                /**
                 * 測試 被觀察者發送五條消息
                 * */

                if (!e.isCancelled()) {
                    for (int i = 0; i < 5; i++) {
                        e.onNext("Flowable建立被觀察者 被觀察者發送第   " + i + "   波資料");
                        Log.d("TAG", "Flowable建立被觀察者 被觀察者發送資料 i----:" + i);
                    }
                    e.onComplete();
                }
            }
        }, BackpressureStrategy.LATEST)
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        if (null == s) {
                            return;
                        }
                        mSubscription = s;
                        mSubscription.request(5);//觀察者接收五條
                    }

                    @Override
                    public void onNext(Object o) {
                        if (null == o) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:" + o.toString());
                    }

                    @Override
                    public void onError(Throwable t) {
                        if (null == t) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
                    }
                });
    }

    /**
     * onDestroy方法 取消訂閱
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (null != mSubscription) {
            mSubscription.cancel();
            Log.d("TAG", "退出頁面 取消訂閱");
        }
    }

}
           

(2) 結果

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   0   波資料

D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:0




D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   1   波資料

D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:1




D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   2   波資料

D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:2



D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   3   波資料

D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:3



D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   4   波資料

D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:4



D/TAG: Flowable建立被觀察者 觀察者 onComplete方法 
           

(3) 綜上所述,同步訂閱情況下不會出現被觀察者發送事件速度 > 觀察者接收事件速度的情況。可是,卻會出現被觀察者發送事件數量 > 觀察者接收事件數量的問題。

[2] 被觀察者發送的消息數和觀察者設定接收的消息數不一緻時。

(1) 代碼

package com.example.mydemo.rxjava;

import android.os.Bundle;
import android.util.Log;

import androidx.appcompat.app.AppCompatActivity;

import com.example.mydemo.R;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;

public class RxJavaActivity extends AppCompatActivity {

    private Subscription mSubscription;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxjava);
        method();
    }

    /**
     * RxJava Flowable建立被觀察者 背壓
     */

    public void method() {

        Flowable.create(new FlowableOnSubscribe<Object>() {
            @Override
            public void subscribe(FlowableEmitter<Object> e) throws Exception {
                if (null == e) {
                    return;
                }

                /**
                 * 測試 被觀察者發送五條消息
                 * */

                if (!e.isCancelled()) {
                    for (int i = 0; i < 5; i++) {
                        e.onNext("Flowable建立被觀察者 被觀察者發送第   " + i + "   波資料");
                        Log.d("TAG", "Flowable建立被觀察者 被觀察者發送資料 i----:" + i);
                    }
                    e.onComplete();
                }
            }
        }, BackpressureStrategy.LATEST)
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        if (null == s) {
                            return;
                        }
                        mSubscription = s;
                        mSubscription.request(3);//觀察者接收三條
                    }

                    @Override
                    public void onNext(Object o) {
                        if (null == o) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:" + o.toString());
                    }

                    @Override
                    public void onError(Throwable t) {
                        if (null == t) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
                    }
                });
    }

    /**
     * onDestroy方法 取消訂閱
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (null != mSubscription) {
            mSubscription.cancel();
            Log.d("TAG", "退出頁面 取消訂閱");
        }
    }

}
           

即 被觀察者發送五條資料 觀察者隻接收三條消息。

(2) 結果

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   0   波資料

D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:0



D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   1   波資料

D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:1



D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   2   波資料

D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:2



D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:3



D/TAG: Flowable建立被觀察者 被觀察者發送資料 i----:4
           

(3) 綜上所述 對于沒有緩存區概念的同步訂閱關系來說,單純采用控制觀察者的接收事件數量(響應式拉取)還是會出現問題。

[3] 點選時再執行觀察者的取方法

(1) 代碼

package com.example.mydemo.rxjava;

import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.TextView;

import androidx.appcompat.app.AppCompatActivity;

import com.example.mydemo.R;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;

public class RxJavaActivity extends AppCompatActivity {

    private Subscription mSubscription;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxjava);
        findView();
        method();
    }

    /**
     * 初始化各種View
     */

    private void findView() {
        TextView textView = findViewById(R.id.activity_rxjava_textview);
        textView.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                if (null != mSubscription) {
                    mSubscription.request(4);//觀察者隻接收四條
                }
            }
        });
    }

    /**
     * RxJava Flowable建立被觀察者 背壓
     */

    public void method() {

        Flowable.create(new FlowableOnSubscribe<Object>() {
            @Override
            public void subscribe(FlowableEmitter<Object> e) throws Exception {
                if (null == e) {
                    return;
                }

                /**
                 * 測試 被觀察者發送五條消息
                 * */

                if (!e.isCancelled()) {
                    for (int i = 0; i < 5; i++) {
                        e.onNext("Flowable建立被觀察者 被觀察者發送第   " + i + "   波資料");
                    }
                    e.onComplete();
                }
            }
        }, BackpressureStrategy.LATEST)
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        if (null == s) {
                            return;
                        }
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Object o) {
                        if (null == o) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:" + o.toString());
                    }

                    @Override
                    public void onError(Throwable t) {
                        if (null == t) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
                    }
                });
    }

    /**
     * onDestroy方法 取消訂閱
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (null != mSubscription) {
            mSubscription.cancel();
            Log.d("TAG", "退出頁面 取消訂閱");
        }
    }

}
           

(2) 結果

預設

無列印 即無消息接收

點選按鈕 第一次點選

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   4   波資料

D/TAG: Flowable建立被觀察者 觀察者 onComplete方法 
           

點選按鈕第二三四五...次點選

無列印 即無消息接收

二.控制 被觀察者 發送事件的速度

<1> 前言

上述可知可以通過觀察者的onSubscribe方法中參數Subscription的request(N)方法控制觀察者可接收的消息數,繼而控制觀察者接收事件的速度。那麼被觀察者的發送事件的速度怎麼控制呢?

源碼分析

create操作符建立被觀察者 重寫 subscribe方法 該方法參數FlowableEmitter<Object> e 點選

public interface FlowableEmitter<T> extends Emitter<T> {

    /**
     * Sets a Disposable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param s the disposable, null is allowed
     */
    void setDisposable(Disposable s);

    /**
     * Sets a Cancellable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param c the cancellable resource, null is allowed
     */
    void setCancellable(Cancellable c);

    /**
     * The current outstanding request amount.
     * <p>This method is thread-safe.
     * @return the current outstanding request amount
     */
    long requested();

    /**
     * Returns true if the downstream cancelled the sequence.
     * <p>This method is thread-safe.
     * @return true if the downstream cancelled the sequence
     */
    boolean isCancelled();

    /**
     * Ensures that calls to onNext, onError and onComplete are properly serialized.
     * @return the serialized FlowableEmitter
     */
    FlowableEmitter<T> serialize();
}
           

可以看到 此參數可以判斷是否取消訂閱  isCancelled()方法。而request()方法可以擷取被觀察者中的消息數量。

<2> 同步訂閱 控制被觀察者發送消息速度

(1) 簡介

同步訂閱中,被觀察者和觀察者處在同一線程。被觀察者中request方法得到的數等于觀察者中Subscription.request()方法中設定的數。

(2) 代碼

package com.example.mydemo.rxjava;

import android.os.Bundle;
import android.util.Log;

import androidx.appcompat.app.AppCompatActivity;

import com.example.mydemo.R;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;

public class RxJavaActivity extends AppCompatActivity {

    private Subscription mSubscription;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxjava);
        method();
    }

    /**
     * RxJava Flowable建立被觀察者 背壓
     */

    public void method() {

        Flowable.create(new FlowableOnSubscribe<Object>() {
            @Override
            public void subscribe(FlowableEmitter<Object> e) throws Exception {
                if (null == e) {
                    return;
                }

                long msgNumber = e.requested();
                Log.d("TAG", "Flowable建立被觀察者 被觀察者 msgNumber----:" + msgNumber);

                if (!e.isCancelled()) {
                    for (int i = 0; i < msgNumber; i++) {
                        e.onNext("Flowable建立被觀察者 被觀察者發送第   " + i + "   波資料");
                    }
                    e.onComplete();
                }
            }
        }, BackpressureStrategy.LATEST)
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        if (null == s) {
                            return;
                        }
                        mSubscription = s;
                        mSubscription.request(5);//觀察者設定可接收五條消息
                    }

                    @Override
                    public void onNext(Object o) {
                        if (null == o) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:" + o.toString());
                    }

                    @Override
                    public void onError(Throwable t) {
                        if (null == t) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
                    }
                });
    }

    /**
     * onDestroy方法 取消訂閱
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (null != mSubscription) {
            mSubscription.cancel();
            Log.d("TAG", "退出頁面 取消訂閱");
        }
    }

}
           

(3) 結果

D/TAG: Flowable建立被觀察者 被觀察者 msgNumber----:5


D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   0   波資料

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   1   波資料

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   2   波資料

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   3   波資料

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   4   波資料

D/TAG: Flowable建立被觀察者 觀察者 onComplete方法 
           

(4) 說明

{1} 被觀察者的 e.requested();方法具有實時性。即被觀察者發送了一條資料 e.requested();方法就會減一。

代碼 在上基礎上改動

long msgNumber = e.requested();
Log.d("TAG", "Flowable建立被觀察者 被觀察者 msgNumber----:" + msgNumber);

if (!e.isCancelled()) {
    for (int i = 0; i < msgNumber; i++) {
        e.onNext("Flowable建立被觀察者 被觀察者發送第   " + i + "   波資料");
        Log.d("TAG", "Flowable建立被觀察者 被觀察者 發送了i後e.requested()----:" + e.requested());
    }
    e.onComplete();
}
           

結果

D/TAG: Flowable建立被觀察者 被觀察者 msgNumber----:5

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   0   波資料

D/TAG: Flowable建立被觀察者 被觀察者 發送了i後e.requested()----:4

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   1   波資料

D/TAG: Flowable建立被觀察者 被觀察者 發送了i後e.requested()----:3

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   2   波資料

D/TAG: Flowable建立被觀察者 被觀察者 發送了i後e.requested()----:2

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   3   波資料

D/TAG: Flowable建立被觀察者 被觀察者 發送了i後e.requested()----:1

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   4   波資料

D/TAG: Flowable建立被觀察者 被觀察者 發送了i後e.requested()----:0

D/TAG: Flowable建立被觀察者 觀察者 onComplete方法 
           

{2} 觀察者沒有設定可接收事件數量,即無調用Subscription.request()。那麼被觀察者預設觀察者可接收事件數量 = 0,即FlowableEmitter.requested()的傳回值 = 0。

{3} 可疊加性

代碼

mSubscription.request(5);//第一次設定 觀察者設定可接收五條消息 

mSubscription.request(10);//第二條設定 觀察者設定可接收十條消息
           

結果 5+10=15

D/TAG: Flowable建立被觀察者 被觀察者 msgNumber----:15
           

<3> 異步訂閱 控制被觀察者發送消息速度

(1) 說明

異步訂閱中,被觀察者和觀察者處在不同的線程。是以被觀察者 無法通過 FlowableEmitter.requested()知道觀察者自身接收事件能力。即 被觀察者不能根據 觀察者自身接收事件的能力 控制發送事件的速度。

被觀察者中request方法得到的數==128,96,0。 和觀察者中Subscription.request()方法中設定的數不相等。

(2) 代碼

package com.example.mydemo.rxjava;

import android.os.Bundle;
import android.util.Log;

import androidx.appcompat.app.AppCompatActivity;

import com.example.mydemo.R;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.schedulers.Schedulers;

public class RxJavaActivity extends AppCompatActivity {

    private Subscription mSubscription;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxjava);
        method();
    }

    /**
     * RxJava Flowable建立被觀察者 背壓
     */

    public void method() {

        Flowable.create(new FlowableOnSubscribe<Object>() {
            @Override
            public void subscribe(FlowableEmitter<Object> e) throws Exception {
                if (null == e) {
                    return;
                }

                long msgNumber = e.requested();
                Log.d("TAG", "Flowable建立被觀察者 被觀察者 msgNumber----:" + msgNumber);

                if (!e.isCancelled()) {
                    for (int i = 0; i < msgNumber; i++) {
                        e.onNext("Flowable建立被觀察者 被觀察者發送第   " + i + "   波資料");
                    }
                    e.onComplete();
                }
            }
        }, BackpressureStrategy.LATEST)
                .subscribeOn(Schedulers.io())//被觀察者 IO線程
                .observeOn(AndroidSchedulers.mainThread())//觀察者 UI線程
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        if (null == s) {
                            return;
                        }
                        mSubscription = s;
                        mSubscription.request(5);//觀察者設定可接收五條消息
                    }

                    @Override
                    public void onNext(Object o) {
                        if (null == o) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:" + o.toString());
                    }

                    @Override
                    public void onError(Throwable t) {
                        if (null == t) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
                    }
                });
    }

    /**
     * onDestroy方法 取消訂閱
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (null != mSubscription) {
            mSubscription.cancel();
            Log.d("TAG", "退出頁面 取消訂閱");
        }
    }

}
           

(3) 結果

D/TAG: Flowable建立被觀察者 被觀察者 msgNumber----:128


D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   0   波資料

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   1   波資料

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   2   波資料

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   3   波資料

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 o.toString()----:Flowable建立被觀察者 被觀察者發送第   4   波資料
           

即 觀察者設定可以接收的消息數是五條,但是被觀察者中請求的消息數是128。

三.Flowable其他操作符使用背壓政策

<1> 前言

上述的講解都是使用的Flowable的create操作符建立的被觀察者,那麼背壓政策可以在第二個參數傳入。如下。

Flowable.create(new FlowableOnSubscribe<Object>() {
    @Override
    public void subscribe(FlowableEmitter<Object> e) throws Exception {
        if (null == e) {
            return;
        }
 
        if (!e.isCancelled()) {
            e.onNext("Flowable建立被觀察者 被觀察者發送第1波資料");
            e.onNext("Flowable建立被觀察者 被觀察者發送第2波資料");
            e.onNext("Flowable建立被觀察者 被觀察者發送第3波資料");
            e.onComplete();
        }
 
        Log.d("TAG", "Flowable建立被觀察者 被觀察者 subscribe方法 線程----:" + Thread.currentThread().getName());
    }
}, BackpressureStrategy.LATEST);
           

那麼使用其他的操作符呢,比如前節講的interval操作符。下面講解。

<2> 代碼

package com.wjn.rxdemo.rxjava;

import android.os.Bundle;
import android.util.Log;

import androidx.appcompat.app.AppCompatActivity;

import com.wjn.rxdemo.R;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.TimeUnit;

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

public class RxJavaActivity extends AppCompatActivity {

    private Subscription mSubscription;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxjava);
        method();
    }

    /**
     * RxJava Flowable建立被觀察者 背壓
     */

    private void method() {
        Flowable.interval(1, TimeUnit.MILLISECONDS)
                .onBackpressureBuffer() // 添加背壓政策封裝好的方法,此處選擇Buffer模式,即緩存區大小無限制
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        if (null == s) {
                            return;
                        }
                        mSubscription = s;
                        s.request(Long.MAX_VALUE);//觀察者設定可接收最多條數的消息
                    }

                    @Override
                    public void onNext(Long aLong) {
                        if (null == aLong) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onNext方法 aLong----:" + aLong);
                        //模拟1秒接收
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        if (null == t) {
                            return;
                        }
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onError方法 t.toString()----:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("TAG", "Flowable建立被觀察者 觀察者 onComplete方法 ");
                    }
                });
    }

    /**
     * onDestroy方法 取消訂閱
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (null != mSubscription) {
            mSubscription.cancel();
            Log.d("TAG", "退出頁面 取消訂閱");
        }
    }

}
           

<3> 結果

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 aLong----:0

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 aLong----:1

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 aLong----:2

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 aLong----:3

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 aLong----:4

D/TAG: Flowable建立被觀察者 觀察者 onNext方法 aLong----:5
           

<4> 關閉頁面

D/TAG: 退出頁面 取消訂閱
           

<5> 說明

上述代碼中有這樣的設定

.onBackpressureBuffer() // 添加背壓政策封裝好的方法,此處選擇Buffer模式,即緩存區大小無限制
           

那麼還有沒有其他的設定呢,以及每個設定對應的含義是什麼呢。

舉例 含義 對create操作符
.onBackpressureBuffer() 緩存區滿時 将緩存區設定無限大 注意記憶體溢出 BUFFER
.onBackpressureDrop() 緩存區滿時 事件丢失 保留[0,127]條資料 即 前 128條資料 DROP
onBackpressureLatest() 緩存區滿時 事件丢失 保留[0,127]條資料+超出的最後一條資料 LATEST

四.總結

<1> RxJava的背壓可以使用Flowable建立被觀察者來實作。背壓的使用大前提是 異步訂閱中,觀察者和被觀察者發送消息的速度或者數量不比對,一般是被觀察者發送的速度太快或者消息太多,觀察者來不及處理。

<2> 背壓一般應用在被觀察者和觀察者異步訂閱時,因為同步訂閱正式項目很少使用。(RxJava核心就是異步嘛)。

<3> 其實一般使用RxJava實作異步,隻要注意在适當的時機取消訂閱,防止記憶體洩漏即可。沒必要使用背壓。

<4> 其實處理被觀察者和觀察者比對問題也可以有其他的操作辦法,比如使用delay()操作符建立一個延時的被觀察者。再比如使用interval操作符建立指定時間的被觀察者等等。