天天看點

RxJava詳解(基于2.X版本的功能操作符)

本章節講述RxJava基于2.X版本的功能操作符

1.subscribe()

<1> 作用

訂閱,即連接配接觀察者 & 被觀察者。

<2> 代碼&結果

https://blog.csdn.net/weixin_37730482/article/details/69280013

有多個重載的方法。

2.Observable.subscribeOn() &Observable.observeOn() 

<1> 作用

指定被觀察者&觀察者所在的線程。

<2> 代碼&結果

https://blog.csdn.net/weixin_37730482/article/details/74460807

3.delay()

<1> 作用

使得被觀察者延遲一段時間再發送事件。

<2> 代碼

package com.example.rxjava20;

import android.os.Bundle;
import android.util.Log;

import androidx.appcompat.app.AppCompatActivity;

import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class MainActivity extends AppCompatActivity {

    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        method();
    }

    /**
     * 建立 RxJava delay功能操作符
     */

    public void method() {
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> e) throws Exception {
                if (null == e) {
                    return;
                }

                if (!e.isDisposed()) {
                    e.onNext("被觀察者發送資料");
                    Log.d("TAG", "被觀察者發送資料 開始發送資料...");
                }
            }
        }).delay(5, TimeUnit.SECONDS)//被觀察者延時5秒發送資料
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        if (null == d) {
                            return;
                        }
                        disposable = d;
                        Log.d("TAG", "觀察者 onSubscribe 方法 是否斷開連接配接----:" + disposable.isDisposed());
                    }

                    @Override
                    public void onNext(Object value) {
                        Log.d("TAG", "觀察者 onNext 方法 value.toString()----:" + value.toString());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

    }

    /**
     * onDestroy方法
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (null != disposable) {
            Log.d("TAG", "onDestroy方法 執行時是否斷開----:" + disposable.isDisposed());
            if (!disposable.isDisposed()) {//沒有斷開
                disposable.dispose();//斷開
                Log.d("TAG", "onDestroy方法 斷開訂閱");
            }
        }
    }


}
           

<3> 結果

D/TAG: 觀察者 onSubscribe 方法 是否斷開連接配接----:false


D/TAG: 被觀察者發送資料 開始發送資料...



//5秒後列印

D/TAG: 觀察者 onNext 方法 value.toString()----:被觀察者發送資料
           

<4> 關閉頁面

D/TAG: onDestroy方法 執行時是否斷開----:false

D/TAG: onDestroy方法 斷開訂閱
           

<5> delay重載方法

delay(long delay,TimeUnit unit)

參數1:時間

參數2:時間機關



delay(long delay,TimeUnit unit,mScheduler scheduler)

參數1:時間

參數2:時間機關

參數3:線程排程器


delay(long delay,TimeUnit unit,boolean delayError)

指定延遲時間  & 錯誤延遲 錯誤延遲,即:若存在Error事件,則如常執行,執行後再抛出錯誤異常。

參數1:時間

參數2:時間機關

參數3:錯誤延遲參數





delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 

指定延遲多長時間并添加排程器,錯誤通知可以設定是否延遲。

參數1:時間

參數2:時間機關

參數3:線程排程器

參數4:錯誤延遲參數
           

4.doXXX()

<1> 作用

在某個事件的生命周期中調用。如發送事件前的初始化、發送事件後的回調請求等等。

具體方法如下

doOnSubscribe():觀察者訂閱時調用。


doOnEach():被觀察者調用一次執行一次。


doOnNext():onNext方法調用前調用。


doAfterNext():onNext方法調用後調用。


doOnComplete():onComplete方法執行時調用。


doOnError():onError方法執行時調用。


doOnTerminate():正常發送完成&異常完成都執行。


doFinally():最後執行。


doAfterTerminate():真正的最後執行。
           

<2> 代碼

package com.wjn.rxdemo.rxjava;

import android.os.Bundle;
import android.util.Log;

import androidx.appcompat.app.AppCompatActivity;

import com.wjn.rxdemo.R;

import io.reactivex.Notification;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;

public class RxJavaActivity extends AppCompatActivity {

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxjava);
        method();
    }

    /**
     * 建立 RxJava doXXX功能操作符
     */

    public void method() {
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> e) throws Exception {
                if (null == e) {
                    return;
                }

                if (!e.isDisposed()) {
                    e.onNext(1);
                    e.onNext(new RuntimeException());
                    e.onComplete();
                }
            }
        }).doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) {
                Log.d("TAG", "生命周期 doOnSubscribe方法 執行...----:" + disposable.isDisposed());
            }
        }).doOnEach(new Consumer<Notification<Object>>() {
            @Override
            public void accept(Notification<Object> objectNotification) {
                Log.d("TAG", "生命周期 doOnEach方法 執行...----:" + objectNotification.toString());
            }
        }).doOnNext(new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                Log.d("TAG", "生命周期 doOnNext方法 執行...----:" + o.toString());
            }
        }).doAfterNext(new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                Log.d("TAG", "生命周期 doAfterNext方法 執行...----:" + o.toString());
            }
        }).doOnComplete(new Action() {
            @Override
            public void run() {
                Log.d("TAG", "生命周期 doOnComplete方法 執行...");
            }
        }).doOnError(new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) {
                Log.d("TAG", "生命周期 doOnError方法 執行...----:" + throwable.toString());
            }
        }).doOnTerminate(new Action() {
            @Override
            public void run() throws Exception {
                Log.d("TAG", "生命周期 doOnTerminate方法 執行...");
            }
        }).doAfterTerminate(new Action() {
            @Override
            public void run() {
                Log.d("TAG", "生命周期 doAfterTerminate方法 執行...");
            }
        }).doFinally(new Action() {
            @Override
            public void run() {
                Log.d("TAG", "生命周期 doFinally方法 執行...");
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("TAG", "觀察者 onSubscribe方法 執行...");
            }

            @Override
            public void onNext(Object value) {
                Log.d("TAG", "觀察者 onNext 方法執行...value.toString()----:" + value.toString());
            }

            @Override
            public void onError(Throwable e) {
                Log.d("TAG", "觀察者 onError 方法執行...e.toString()----:" + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d("TAG", "觀察者 onComplete方法 執行...");
            }
        });
    }

}
           

<3> 結果

被觀察者 執行 e.onNext(1); 即發生第一次正确的onNext方法


D/TAG: 生命周期 doOnSubscribe方法 執行...----:false

D/TAG: 觀察者 onSubscribe方法 執行...

D/TAG: 生命周期 doOnEach方法 執行...----:OnNextNotification[1]

D/TAG: 生命周期 doOnNext方法 執行...----:1

D/TAG: 觀察者 onNext 方法執行...value.toString()----:1

D/TAG: 生命周期 doAfterNext方法 執行...----:1


**************************************************************************


被觀察者 執行 e.onNext(new RuntimeException()); 即發生第二次錯誤的onNext方法


D/TAG: 生命周期 doOnEach方法 執行...----:OnNextNotification[java.lang.RuntimeException]

D/TAG: 生命周期 doOnNext方法 執行...----:java.lang.RuntimeException

D/TAG: 觀察者 onNext 方法執行...value.toString()----:java.lang.RuntimeException

D/TAG: 生命周期 doAfterNext方法 執行...----:java.lang.RuntimeException


*****************************************************************************

被觀察者 執行 e.onComplete(); 即發生最後接收發生資料


D/TAG: 生命周期 doOnEach方法 執行...----:OnCompleteNotification

D/TAG: 生命周期 doOnComplete方法 執行...

D/TAG: 生命周期 doOnTerminate方法 執行...

D/TAG: 觀察者 onComplete方法 執行...

D/TAG: 生命周期 doFinally方法 執行...

D/TAG: 生命周期 doAfterTerminate方法 執行...
           

<4> 上述代碼修改create操作符,隻發送onComplete。

if (!e.isDisposed()) {
    e.onComplete();
}
           

結果

D/TAG: 生命周期 doOnSubscribe方法 執行...----:false

D/TAG: 觀察者 onSubscribe方法 執行...

D/TAG: 生命周期 doOnEach方法 執行...----:OnCompleteNotification

D/TAG: 生命周期 doOnComplete方法 執行...

D/TAG: 生命周期 doOnTerminate方法 執行...

D/TAG: 觀察者 onComplete方法 執行...

D/TAG: 生命周期 doFinally方法 執行...

D/TAG: 生命周期 doAfterTerminate方法 執行...
           

<5> 上述代碼修改create操作符,隻發送onError。

if (!e.isDisposed()) {
    e.onError(new RuntimeException());
}
           

結果

D/TAG: 生命周期 doOnSubscribe方法 執行...----:false

D/TAG: 觀察者 onSubscribe方法 執行...

D/TAG: 生命周期 doOnEach方法 執行...----:OnErrorNotification[java.lang.RuntimeException]

D/TAG: 生命周期 doOnError方法 執行...----:java.lang.RuntimeException

D/TAG: 生命周期 doOnTerminate方法 執行...

D/TAG: 觀察者 onError 方法執行...e.toString()----:java.lang.RuntimeException

D/TAG: 生命周期 doFinally方法 執行...

D/TAG: 生命周期 doAfterTerminate方法 執行...
           

<6> 上述代碼修改create操作符,為empty操作符。

代碼

package com.wjn.rxdemo.rxjava;

import android.os.Bundle;
import android.util.Log;

import androidx.appcompat.app.AppCompatActivity;

import com.wjn.rxdemo.R;

import io.reactivex.Notification;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;

public class RxJavaActivity extends AppCompatActivity {

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxjava);
        method();
    }

    /**
     * 建立 RxJava doXXX功能操作符
     */

    public void method() {
        Observable
                .empty()
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) {
                        Log.d("TAG", "生命周期 doOnSubscribe方法 執行...----:" + disposable.isDisposed());
                    }
                })
                .doOnEach(new Consumer<Notification<Object>>() {
                    @Override
                    public void accept(Notification<Object> objectNotification) {
                        Log.d("TAG", "生命周期 doOnEach方法 執行...----:" + objectNotification.toString());
                    }
                })
                .doOnNext(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) {
                        Log.d("TAG", "生命周期 doOnNext方法 執行...----:" + o.toString());
                    }
                })
                .doAfterNext(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) {
                        Log.d("TAG", "生命周期 doAfterNext方法 執行...----:" + o.toString());
                    }
                })
                .doOnComplete(new Action() {
                    @Override
                    public void run() {
                        Log.d("TAG", "生命周期 doOnComplete方法 執行...");
                    }
                })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) {
                        Log.d("TAG", "生命周期 doOnError方法 執行...----:" + throwable.toString());
                    }
                })
                .doOnTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d("TAG", "生命周期 doOnTerminate方法 執行...");
                    }
                })
                .doAfterTerminate(new Action() {
                    @Override
                    public void run() {
                        Log.d("TAG", "生命周期 doAfterTerminate方法 執行...");
                    }
                })
                .doFinally(new Action() {
                    @Override
                    public void run() {
                        Log.d("TAG", "生命周期 doFinally方法 執行...");
                    }
                })
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d("TAG", "觀察者 onSubscribe方法 執行...");
                    }

                    @Override
                    public void onNext(Object value) {
                        Log.d("TAG", "觀察者 onNext 方法執行...value.toString()----:" + value.toString());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d("TAG", "觀察者 onError 方法執行...e.toString()----:" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("TAG", "觀察者 onComplete方法 執行...");
                    }
                });
    }

}
           

結果

D/TAG: 生命周期 doOnSubscribe方法 執行...----:true

D/TAG: 觀察者 onSubscribe方法 執行...

D/TAG: 生命周期 doOnEach方法 執行...----:OnCompleteNotification

D/TAG: 生命周期 doOnComplete方法 執行...

D/TAG: 生命周期 doOnTerminate方法 執行...

D/TAG: 觀察者 onComplete方法 執行...

D/TAG: 生命周期 doFinally方法 執行...

D/TAG: 生命周期 doAfterTerminate方法 執行...
           

5. onErrorReturn()

<1> 作用

遇到觀察者發送錯誤時,發送1個特殊事件 正常終止。

<2> 代碼

package com.example.rxjava20;

import android.os.Bundle;
import android.util.Log;

import androidx.appcompat.app.AppCompatActivity;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;

public class MainActivity extends AppCompatActivity {

    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        method();
    }

    /**
     * 建立 RxJava onErrorReturn功能操作符
     */

    public void method() {
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> e) throws Exception {
                if (null == e) {
                    return;
                }

                if (!e.isDisposed()) {
                    e.onNext(1);
                    e.onNext("張三");
                    e.onError(new Throwable());
                }
            }
        }).onErrorReturn(new Function<Throwable, Object>() {
            @Override
            public Object apply(Throwable throwable) throws Exception {
                return "Code:123";
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                if (null == d) {
                    return;
                }
                disposable = d;
                Log.d("TAG", "觀察者 onSubscribe 方法 是否斷開連接配接----:" + disposable.isDisposed());

            }

            @Override
            public void onNext(Object value) {
                Log.d("TAG", "觀察者 onNext 方法 value.toString()----:" + value.toString());
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

    }

    /**
     * onDestroy方法
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (null != disposable) {
            Log.d("TAG", "onDestroy方法 執行時是否斷開----:" + disposable.isDisposed());
            if (!disposable.isDisposed()) {//沒有斷開
                disposable.dispose();//斷開
                Log.d("TAG", "onDestroy方法 斷開訂閱");
            }
        }
    }

}
           

<3> 結果

D/TAG: 觀察者 onSubscribe 方法 是否斷開連接配接----:false

D/TAG: 觀察者 onNext 方法 value.toString()----:1

D/TAG: 觀察者 onNext 方法 value.toString()----:張三

D/TAG: 觀察者 onNext 方法 value.toString()----:Code:123
           

<4> 關閉頁面

D/TAG: onDestroy方法 執行時是否斷開----:true
           

6.onErrorResumeNext()&onExceptionResumeNext()

<1> 作用

遇到錯誤時,發送1個新的Observable。

onErrorResumeNext():攔截的錯誤 = Throwable。

onExceptionResumeNext():攔截的錯誤 = Exception。

<2> 代碼

package com.example.rxjava20;

import android.os.Bundle;
import android.util.Log;

import androidx.appcompat.app.AppCompatActivity;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;

public class MainActivity extends AppCompatActivity {

    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        method();
    }

    /**
     * 建立 RxJava onErrorResumeNext功能操作符
     */

    public void method() {
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> e) throws Exception {
                if (null == e) {
                    return;
                }

                if (!e.isDisposed()) {
                    e.onNext(1);
                    e.onNext("張三");
                    e.onError(new Throwable());
                }
            }
        })
                .onErrorResumeNext(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Throwable throwable) throws Exception {
                        return Observable.just("遇到觀察者發送的錯誤資料 修改後的内容");
                    }
                })
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        if (null == d) {
                            return;
                        }
                        disposable = d;
                        Log.d("TAG", "觀察者 onSubscribe 方法 是否斷開連接配接----:" + disposable.isDisposed());
                    }

                    @Override
                    public void onNext(Object value) {
                        Log.d("TAG", "觀察者 onNext 方法 value.toString()----:" + value.toString());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

    }

    /**
     * onDestroy方法
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (null != disposable) {
            Log.d("TAG", "onDestroy方法 執行時是否斷開----:" + disposable.isDisposed());
            if (!disposable.isDisposed()) {//沒有斷開
                disposable.dispose();//斷開
                Log.d("TAG", "onDestroy方法 斷開訂閱");
            }
        }
    }

}
           

<3> 結果

D/TAG: 觀察者 onSubscribe 方法 是否斷開連接配接----:false

D/TAG: 觀察者 onNext 方法 value.toString()----:1

D/TAG: 觀察者 onNext 方法 value.toString()----:張三

D/TAG: 觀察者 onNext 方法 value.toString()----:遇到觀察者發送的錯誤資料 修改後的内容
           

7. retry()

<1> 作用

重試,即當出現錯誤時,讓被觀察者(Observable)重新發射資料。Throwable 和 Exception 都可攔截。

retry():出現錯誤時,讓被觀察者重新發送資料。若一直錯誤,則一直重新發送。如果不出現錯誤,不重複執行。

retry(long time):出現錯誤時,讓被觀察者重新發送資料。可設定重試次數。如果不出現錯誤,不重複執行。

retry(long times, Predicate<? super Throwable> predicate) 出現錯誤後,判斷是否需要重新發送資料 如果需要 重試times次。如果不出現錯誤,不重複執行。

<2> 代碼

package com.example.rxjava20;

import android.os.Bundle;
import android.util.Log;

import androidx.appcompat.app.AppCompatActivity;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class MainActivity extends AppCompatActivity {

    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        method();
    }

    /**
     * 建立 RxJava retry功能操作符
     */

    public void method() {
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> e) throws Exception {
                if (null == e) {
                    return;
                }

                if (!e.isDisposed()) {
                    e.onNext(1);
                    e.onNext("張三");
                    e.onError(new Throwable());
                }
            }
        }).retry(2)//重試兩次
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        if (null == d) {
                            return;
                        }
                        disposable = d;
                        Log.d("TAG", "觀察者 onSubscribe 方法 是否斷開連接配接----:" + disposable.isDisposed());

                    }

                    @Override
                    public void onNext(Object value) {
                        Log.d("TAG", "觀察者 onNext 方法 value.toString()----:" + value.toString());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

    }

    /**
     * onDestroy方法
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (null != disposable) {
            Log.d("TAG", "onDestroy方法 執行時是否斷開----:" + disposable.isDisposed());
            if (!disposable.isDisposed()) {//沒有斷開
                disposable.dispose();//斷開
                Log.d("TAG", "onDestroy方法 斷開訂閱");
            }
        }
    }

}
           

<3> 結果

D/TAG: 觀察者 onSubscribe 方法 是否斷開連接配接----:false

D/TAG: 觀察者 onNext 方法 value.toString()----:1

D/TAG: 觀察者 onNext 方法 value.toString()----:張三





D/TAG: 觀察者 onNext 方法 value.toString()----:1

D/TAG: 觀察者 onNext 方法 value.toString()----:張三





D/TAG: 觀察者 onNext 方法 value.toString()----:1

D/TAG: 觀察者 onNext 方法 value.toString()----:張三
           

重試2次,一共執行3次。

<4> 關閉頁面

D/TAG: onDestroy方法 執行時是否斷開----:false

D/TAG: onDestroy方法 斷開訂閱
           

8.retryUntil()

<1> 作用

出現錯誤後,判斷是否需要重新發送資料。類似 retry(long times, Predicate<? super Throwable> predicate) 出現錯誤後,判斷是否需要重新發送資料 如果需要 重試times次。

<2> 代碼

package com.example.rxjava20;

import android.os.Bundle;
import android.util.Log;

import androidx.appcompat.app.AppCompatActivity;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BooleanSupplier;

public class MainActivity extends AppCompatActivity {

    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        method();
    }

    /**
     * 建立 RxJava Map變換符
     */

    public void method() {
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> e) throws Exception {
                if (null == e) {
                    return;
                }

                if (!e.isDisposed()) {
                    e.onNext(1);
                    e.onNext("張三");
                    e.onError(new Throwable());
                }
            }
        }).retryUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {
                return false;//一直重試
            }
        })
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        if (null == d) {
                            return;
                        }
                        disposable = d;
                        Log.d("TAG", "觀察者 onSubscribe 方法 是否斷開連接配接----:" + disposable.isDisposed());
                    }

                    @Override
                    public void onNext(Object value) {
                        Log.d("TAG", "觀察者 onNext 方法 value.toString()----:" + value.toString());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

    }

    /**
     * onDestroy方法
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (null != disposable) {
            Log.d("TAG", "onDestroy方法 執行時是否斷開----:" + disposable.isDisposed());
            if (!disposable.isDisposed()) {//沒有斷開
                disposable.dispose();//斷開
                Log.d("TAG", "onDestroy方法 斷開訂閱");
            }
        }
    }

}
           

<3> 結果

return false;//一直重試

return true;//不重試
           

9.retryWhen()

<1> 作用

遇到錯誤時,将發生的錯誤傳遞給一個新的被觀察者。并決定是否需要重新訂閱原始被觀察者。

<2> 代碼

<3> 結果

10.repeat()

<1> 作用

重複不斷地發送被觀察者事件。

repeat():重複不斷地發送被觀察者事件。

repeat(long times):重複 times次 發送被觀察者事件。

<2> 代碼

package com.example.rxjava20;

import android.os.Bundle;
import android.util.Log;

import androidx.appcompat.app.AppCompatActivity;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class MainActivity extends AppCompatActivity {

    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        method();
    }

    /**
     * 建立 RxJava repeat功能操作符
     */

    public void method() {
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> e) throws Exception {
                if (null == e) {
                    return;
                }

                if (!e.isDisposed()) {
                    e.onNext(1);
                    e.onNext("張三");
                    e.onComplete();
                }
            }
        }).repeat(2)//重複兩次
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        if (null == d) {
                            return;
                        }
                        disposable = d;
                        Log.d("TAG", "觀察者 onSubscribe 方法 是否斷開連接配接----:" + disposable.isDisposed());
                    }

                    @Override
                    public void onNext(Object value) {
                        Log.d("TAG", "觀察者 onNext 方法 value.toString()----:" + value.toString());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

    }

    /**
     * onDestroy方法
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (null != disposable) {
            Log.d("TAG", "onDestroy方法 執行時是否斷開----:" + disposable.isDisposed());
            if (!disposable.isDisposed()) {//沒有斷開
                disposable.dispose();//斷開
                Log.d("TAG", "onDestroy方法 斷開訂閱");
            }
        }
    }

}
           

<3> 結果

D/TAG: 觀察者 onSubscribe 方法 是否斷開連接配接----:false

D/TAG: 觀察者 onNext 方法 value.toString()----:1

D/TAG: 觀察者 onNext 方法 value.toString()----:張三





D/TAG: 觀察者 onNext 方法 value.toString()----:1

D/TAG: 觀察者 onNext 方法 value.toString()----:張三
           

<4> 關閉頁面

D/TAG: onDestroy方法 執行時是否斷開----:false

D/TAG: onDestroy方法 斷開訂閱
           

11.repeatWhen()

<1> 作用

有條件地、重複發送 被觀察者事件。

<2> 代碼

<3> 結果