本章節講述RxJava基于2.X版本的功能操作符
1.subscribe()
<1> 作用
訂閱,即連接配接觀察者 & 被觀察者。
<2> 代碼&結果
https://blog.csdn.net/weixin_37730482/article/details/69280013
有多個重載的方法。
2.Observable.subscribeOn() &Observable.observeOn()
<1> 作用
指定被觀察者&觀察者所在的線程。
<2> 代碼&結果
https://blog.csdn.net/weixin_37730482/article/details/74460807
3.delay()
<1> 作用
使得被觀察者延遲一段時間再發送事件。
<2> 代碼
package com.example.rxjava20;
import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class MainActivity extends AppCompatActivity {
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
method();
}
/**
* 建立 RxJava delay功能操作符
*/
public void method() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
if (null == e) {
return;
}
if (!e.isDisposed()) {
e.onNext("被觀察者發送資料");
Log.d("TAG", "被觀察者發送資料 開始發送資料...");
}
}
}).delay(5, TimeUnit.SECONDS)//被觀察者延時5秒發送資料
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
if (null == d) {
return;
}
disposable = d;
Log.d("TAG", "觀察者 onSubscribe 方法 是否斷開連接配接----:" + disposable.isDisposed());
}
@Override
public void onNext(Object value) {
Log.d("TAG", "觀察者 onNext 方法 value.toString()----:" + value.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
/**
* onDestroy方法
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (null != disposable) {
Log.d("TAG", "onDestroy方法 執行時是否斷開----:" + disposable.isDisposed());
if (!disposable.isDisposed()) {//沒有斷開
disposable.dispose();//斷開
Log.d("TAG", "onDestroy方法 斷開訂閱");
}
}
}
}
<3> 結果
D/TAG: 觀察者 onSubscribe 方法 是否斷開連接配接----:false
D/TAG: 被觀察者發送資料 開始發送資料...
//5秒後列印
D/TAG: 觀察者 onNext 方法 value.toString()----:被觀察者發送資料
<4> 關閉頁面
D/TAG: onDestroy方法 執行時是否斷開----:false
D/TAG: onDestroy方法 斷開訂閱
<5> delay重載方法
delay(long delay,TimeUnit unit)
參數1:時間
參數2:時間機關
delay(long delay,TimeUnit unit,mScheduler scheduler)
參數1:時間
參數2:時間機關
參數3:線程排程器
delay(long delay,TimeUnit unit,boolean delayError)
指定延遲時間 & 錯誤延遲 錯誤延遲,即:若存在Error事件,則如常執行,執行後再抛出錯誤異常。
參數1:時間
參數2:時間機關
參數3:錯誤延遲參數
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError):
指定延遲多長時間并添加排程器,錯誤通知可以設定是否延遲。
參數1:時間
參數2:時間機關
參數3:線程排程器
參數4:錯誤延遲參數
4.doXXX()
<1> 作用
在某個事件的生命周期中調用。如發送事件前的初始化、發送事件後的回調請求等等。
具體方法如下
doOnSubscribe():觀察者訂閱時調用。
doOnEach():被觀察者調用一次執行一次。
doOnNext():onNext方法調用前調用。
doAfterNext():onNext方法調用後調用。
doOnComplete():onComplete方法執行時調用。
doOnError():onError方法執行時調用。
doOnTerminate():正常發送完成&異常完成都執行。
doFinally():最後執行。
doAfterTerminate():真正的最後執行。
<2> 代碼
package com.wjn.rxdemo.rxjava;
import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;
import com.wjn.rxdemo.R;
import io.reactivex.Notification;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
public class RxJavaActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjava);
method();
}
/**
* 建立 RxJava doXXX功能操作符
*/
public void method() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
if (null == e) {
return;
}
if (!e.isDisposed()) {
e.onNext(1);
e.onNext(new RuntimeException());
e.onComplete();
}
}
}).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) {
Log.d("TAG", "生命周期 doOnSubscribe方法 執行...----:" + disposable.isDisposed());
}
}).doOnEach(new Consumer<Notification<Object>>() {
@Override
public void accept(Notification<Object> objectNotification) {
Log.d("TAG", "生命周期 doOnEach方法 執行...----:" + objectNotification.toString());
}
}).doOnNext(new Consumer<Object>() {
@Override
public void accept(Object o) {
Log.d("TAG", "生命周期 doOnNext方法 執行...----:" + o.toString());
}
}).doAfterNext(new Consumer<Object>() {
@Override
public void accept(Object o) {
Log.d("TAG", "生命周期 doAfterNext方法 執行...----:" + o.toString());
}
}).doOnComplete(new Action() {
@Override
public void run() {
Log.d("TAG", "生命周期 doOnComplete方法 執行...");
}
}).doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
Log.d("TAG", "生命周期 doOnError方法 執行...----:" + throwable.toString());
}
}).doOnTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d("TAG", "生命周期 doOnTerminate方法 執行...");
}
}).doAfterTerminate(new Action() {
@Override
public void run() {
Log.d("TAG", "生命周期 doAfterTerminate方法 執行...");
}
}).doFinally(new Action() {
@Override
public void run() {
Log.d("TAG", "生命周期 doFinally方法 執行...");
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("TAG", "觀察者 onSubscribe方法 執行...");
}
@Override
public void onNext(Object value) {
Log.d("TAG", "觀察者 onNext 方法執行...value.toString()----:" + value.toString());
}
@Override
public void onError(Throwable e) {
Log.d("TAG", "觀察者 onError 方法執行...e.toString()----:" + e.toString());
}
@Override
public void onComplete() {
Log.d("TAG", "觀察者 onComplete方法 執行...");
}
});
}
}
<3> 結果
被觀察者 執行 e.onNext(1); 即發生第一次正确的onNext方法
D/TAG: 生命周期 doOnSubscribe方法 執行...----:false
D/TAG: 觀察者 onSubscribe方法 執行...
D/TAG: 生命周期 doOnEach方法 執行...----:OnNextNotification[1]
D/TAG: 生命周期 doOnNext方法 執行...----:1
D/TAG: 觀察者 onNext 方法執行...value.toString()----:1
D/TAG: 生命周期 doAfterNext方法 執行...----:1
**************************************************************************
被觀察者 執行 e.onNext(new RuntimeException()); 即發生第二次錯誤的onNext方法
D/TAG: 生命周期 doOnEach方法 執行...----:OnNextNotification[java.lang.RuntimeException]
D/TAG: 生命周期 doOnNext方法 執行...----:java.lang.RuntimeException
D/TAG: 觀察者 onNext 方法執行...value.toString()----:java.lang.RuntimeException
D/TAG: 生命周期 doAfterNext方法 執行...----:java.lang.RuntimeException
*****************************************************************************
被觀察者 執行 e.onComplete(); 即發生最後接收發生資料
D/TAG: 生命周期 doOnEach方法 執行...----:OnCompleteNotification
D/TAG: 生命周期 doOnComplete方法 執行...
D/TAG: 生命周期 doOnTerminate方法 執行...
D/TAG: 觀察者 onComplete方法 執行...
D/TAG: 生命周期 doFinally方法 執行...
D/TAG: 生命周期 doAfterTerminate方法 執行...
<4> 上述代碼修改create操作符,隻發送onComplete。
if (!e.isDisposed()) {
e.onComplete();
}
結果
D/TAG: 生命周期 doOnSubscribe方法 執行...----:false
D/TAG: 觀察者 onSubscribe方法 執行...
D/TAG: 生命周期 doOnEach方法 執行...----:OnCompleteNotification
D/TAG: 生命周期 doOnComplete方法 執行...
D/TAG: 生命周期 doOnTerminate方法 執行...
D/TAG: 觀察者 onComplete方法 執行...
D/TAG: 生命周期 doFinally方法 執行...
D/TAG: 生命周期 doAfterTerminate方法 執行...
<5> 上述代碼修改create操作符,隻發送onError。
if (!e.isDisposed()) {
e.onError(new RuntimeException());
}
結果
D/TAG: 生命周期 doOnSubscribe方法 執行...----:false
D/TAG: 觀察者 onSubscribe方法 執行...
D/TAG: 生命周期 doOnEach方法 執行...----:OnErrorNotification[java.lang.RuntimeException]
D/TAG: 生命周期 doOnError方法 執行...----:java.lang.RuntimeException
D/TAG: 生命周期 doOnTerminate方法 執行...
D/TAG: 觀察者 onError 方法執行...e.toString()----:java.lang.RuntimeException
D/TAG: 生命周期 doFinally方法 執行...
D/TAG: 生命周期 doAfterTerminate方法 執行...
<6> 上述代碼修改create操作符,為empty操作符。
代碼
package com.wjn.rxdemo.rxjava;
import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;
import com.wjn.rxdemo.R;
import io.reactivex.Notification;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
public class RxJavaActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjava);
method();
}
/**
* 建立 RxJava doXXX功能操作符
*/
public void method() {
Observable
.empty()
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) {
Log.d("TAG", "生命周期 doOnSubscribe方法 執行...----:" + disposable.isDisposed());
}
})
.doOnEach(new Consumer<Notification<Object>>() {
@Override
public void accept(Notification<Object> objectNotification) {
Log.d("TAG", "生命周期 doOnEach方法 執行...----:" + objectNotification.toString());
}
})
.doOnNext(new Consumer<Object>() {
@Override
public void accept(Object o) {
Log.d("TAG", "生命周期 doOnNext方法 執行...----:" + o.toString());
}
})
.doAfterNext(new Consumer<Object>() {
@Override
public void accept(Object o) {
Log.d("TAG", "生命周期 doAfterNext方法 執行...----:" + o.toString());
}
})
.doOnComplete(new Action() {
@Override
public void run() {
Log.d("TAG", "生命周期 doOnComplete方法 執行...");
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
Log.d("TAG", "生命周期 doOnError方法 執行...----:" + throwable.toString());
}
})
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d("TAG", "生命周期 doOnTerminate方法 執行...");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() {
Log.d("TAG", "生命周期 doAfterTerminate方法 執行...");
}
})
.doFinally(new Action() {
@Override
public void run() {
Log.d("TAG", "生命周期 doFinally方法 執行...");
}
})
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("TAG", "觀察者 onSubscribe方法 執行...");
}
@Override
public void onNext(Object value) {
Log.d("TAG", "觀察者 onNext 方法執行...value.toString()----:" + value.toString());
}
@Override
public void onError(Throwable e) {
Log.d("TAG", "觀察者 onError 方法執行...e.toString()----:" + e.toString());
}
@Override
public void onComplete() {
Log.d("TAG", "觀察者 onComplete方法 執行...");
}
});
}
}
結果
D/TAG: 生命周期 doOnSubscribe方法 執行...----:true
D/TAG: 觀察者 onSubscribe方法 執行...
D/TAG: 生命周期 doOnEach方法 執行...----:OnCompleteNotification
D/TAG: 生命周期 doOnComplete方法 執行...
D/TAG: 生命周期 doOnTerminate方法 執行...
D/TAG: 觀察者 onComplete方法 執行...
D/TAG: 生命周期 doFinally方法 執行...
D/TAG: 生命周期 doAfterTerminate方法 執行...
5. onErrorReturn()
<1> 作用
遇到觀察者發送錯誤時,發送1個特殊事件 正常終止。
<2> 代碼
package com.example.rxjava20;
import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
public class MainActivity extends AppCompatActivity {
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
method();
}
/**
* 建立 RxJava onErrorReturn功能操作符
*/
public void method() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
if (null == e) {
return;
}
if (!e.isDisposed()) {
e.onNext(1);
e.onNext("張三");
e.onError(new Throwable());
}
}
}).onErrorReturn(new Function<Throwable, Object>() {
@Override
public Object apply(Throwable throwable) throws Exception {
return "Code:123";
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
if (null == d) {
return;
}
disposable = d;
Log.d("TAG", "觀察者 onSubscribe 方法 是否斷開連接配接----:" + disposable.isDisposed());
}
@Override
public void onNext(Object value) {
Log.d("TAG", "觀察者 onNext 方法 value.toString()----:" + value.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
/**
* onDestroy方法
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (null != disposable) {
Log.d("TAG", "onDestroy方法 執行時是否斷開----:" + disposable.isDisposed());
if (!disposable.isDisposed()) {//沒有斷開
disposable.dispose();//斷開
Log.d("TAG", "onDestroy方法 斷開訂閱");
}
}
}
}
<3> 結果
D/TAG: 觀察者 onSubscribe 方法 是否斷開連接配接----:false
D/TAG: 觀察者 onNext 方法 value.toString()----:1
D/TAG: 觀察者 onNext 方法 value.toString()----:張三
D/TAG: 觀察者 onNext 方法 value.toString()----:Code:123
<4> 關閉頁面
D/TAG: onDestroy方法 執行時是否斷開----:true
6.onErrorResumeNext()&onExceptionResumeNext()
<1> 作用
遇到錯誤時,發送1個新的Observable。
onErrorResumeNext():攔截的錯誤 = Throwable。
onExceptionResumeNext():攔截的錯誤 = Exception。
<2> 代碼
package com.example.rxjava20;
import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
public class MainActivity extends AppCompatActivity {
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
method();
}
/**
* 建立 RxJava onErrorResumeNext功能操作符
*/
public void method() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
if (null == e) {
return;
}
if (!e.isDisposed()) {
e.onNext(1);
e.onNext("張三");
e.onError(new Throwable());
}
}
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
return Observable.just("遇到觀察者發送的錯誤資料 修改後的内容");
}
})
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
if (null == d) {
return;
}
disposable = d;
Log.d("TAG", "觀察者 onSubscribe 方法 是否斷開連接配接----:" + disposable.isDisposed());
}
@Override
public void onNext(Object value) {
Log.d("TAG", "觀察者 onNext 方法 value.toString()----:" + value.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
/**
* onDestroy方法
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (null != disposable) {
Log.d("TAG", "onDestroy方法 執行時是否斷開----:" + disposable.isDisposed());
if (!disposable.isDisposed()) {//沒有斷開
disposable.dispose();//斷開
Log.d("TAG", "onDestroy方法 斷開訂閱");
}
}
}
}
<3> 結果
D/TAG: 觀察者 onSubscribe 方法 是否斷開連接配接----:false
D/TAG: 觀察者 onNext 方法 value.toString()----:1
D/TAG: 觀察者 onNext 方法 value.toString()----:張三
D/TAG: 觀察者 onNext 方法 value.toString()----:遇到觀察者發送的錯誤資料 修改後的内容
7. retry()
<1> 作用
重試,即當出現錯誤時,讓被觀察者(Observable)重新發射資料。Throwable 和 Exception 都可攔截。
retry():出現錯誤時,讓被觀察者重新發送資料。若一直錯誤,則一直重新發送。如果不出現錯誤,不重複執行。
retry(long time):出現錯誤時,讓被觀察者重新發送資料。可設定重試次數。如果不出現錯誤,不重複執行。
retry(long times, Predicate<? super Throwable> predicate) 出現錯誤後,判斷是否需要重新發送資料 如果需要 重試times次。如果不出現錯誤,不重複執行。
<2> 代碼
package com.example.rxjava20;
import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class MainActivity extends AppCompatActivity {
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
method();
}
/**
* 建立 RxJava retry功能操作符
*/
public void method() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
if (null == e) {
return;
}
if (!e.isDisposed()) {
e.onNext(1);
e.onNext("張三");
e.onError(new Throwable());
}
}
}).retry(2)//重試兩次
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
if (null == d) {
return;
}
disposable = d;
Log.d("TAG", "觀察者 onSubscribe 方法 是否斷開連接配接----:" + disposable.isDisposed());
}
@Override
public void onNext(Object value) {
Log.d("TAG", "觀察者 onNext 方法 value.toString()----:" + value.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
/**
* onDestroy方法
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (null != disposable) {
Log.d("TAG", "onDestroy方法 執行時是否斷開----:" + disposable.isDisposed());
if (!disposable.isDisposed()) {//沒有斷開
disposable.dispose();//斷開
Log.d("TAG", "onDestroy方法 斷開訂閱");
}
}
}
}
<3> 結果
D/TAG: 觀察者 onSubscribe 方法 是否斷開連接配接----:false
D/TAG: 觀察者 onNext 方法 value.toString()----:1
D/TAG: 觀察者 onNext 方法 value.toString()----:張三
D/TAG: 觀察者 onNext 方法 value.toString()----:1
D/TAG: 觀察者 onNext 方法 value.toString()----:張三
D/TAG: 觀察者 onNext 方法 value.toString()----:1
D/TAG: 觀察者 onNext 方法 value.toString()----:張三
重試2次,一共執行3次。
<4> 關閉頁面
D/TAG: onDestroy方法 執行時是否斷開----:false
D/TAG: onDestroy方法 斷開訂閱
8.retryUntil()
<1> 作用
出現錯誤後,判斷是否需要重新發送資料。類似 retry(long times, Predicate<? super Throwable> predicate) 出現錯誤後,判斷是否需要重新發送資料 如果需要 重試times次。
<2> 代碼
package com.example.rxjava20;
import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BooleanSupplier;
public class MainActivity extends AppCompatActivity {
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
method();
}
/**
* 建立 RxJava Map變換符
*/
public void method() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
if (null == e) {
return;
}
if (!e.isDisposed()) {
e.onNext(1);
e.onNext("張三");
e.onError(new Throwable());
}
}
}).retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
return false;//一直重試
}
})
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
if (null == d) {
return;
}
disposable = d;
Log.d("TAG", "觀察者 onSubscribe 方法 是否斷開連接配接----:" + disposable.isDisposed());
}
@Override
public void onNext(Object value) {
Log.d("TAG", "觀察者 onNext 方法 value.toString()----:" + value.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
/**
* onDestroy方法
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (null != disposable) {
Log.d("TAG", "onDestroy方法 執行時是否斷開----:" + disposable.isDisposed());
if (!disposable.isDisposed()) {//沒有斷開
disposable.dispose();//斷開
Log.d("TAG", "onDestroy方法 斷開訂閱");
}
}
}
}
<3> 結果
return false;//一直重試
return true;//不重試
9.retryWhen()
<1> 作用
遇到錯誤時,将發生的錯誤傳遞給一個新的被觀察者。并決定是否需要重新訂閱原始被觀察者。
<2> 代碼
略
<3> 結果
略
10.repeat()
<1> 作用
重複不斷地發送被觀察者事件。
repeat():重複不斷地發送被觀察者事件。
repeat(long times):重複 times次 發送被觀察者事件。
<2> 代碼
package com.example.rxjava20;
import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class MainActivity extends AppCompatActivity {
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
method();
}
/**
* 建立 RxJava repeat功能操作符
*/
public void method() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
if (null == e) {
return;
}
if (!e.isDisposed()) {
e.onNext(1);
e.onNext("張三");
e.onComplete();
}
}
}).repeat(2)//重複兩次
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
if (null == d) {
return;
}
disposable = d;
Log.d("TAG", "觀察者 onSubscribe 方法 是否斷開連接配接----:" + disposable.isDisposed());
}
@Override
public void onNext(Object value) {
Log.d("TAG", "觀察者 onNext 方法 value.toString()----:" + value.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
/**
* onDestroy方法
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (null != disposable) {
Log.d("TAG", "onDestroy方法 執行時是否斷開----:" + disposable.isDisposed());
if (!disposable.isDisposed()) {//沒有斷開
disposable.dispose();//斷開
Log.d("TAG", "onDestroy方法 斷開訂閱");
}
}
}
}
<3> 結果
D/TAG: 觀察者 onSubscribe 方法 是否斷開連接配接----:false
D/TAG: 觀察者 onNext 方法 value.toString()----:1
D/TAG: 觀察者 onNext 方法 value.toString()----:張三
D/TAG: 觀察者 onNext 方法 value.toString()----:1
D/TAG: 觀察者 onNext 方法 value.toString()----:張三
<4> 關閉頁面
D/TAG: onDestroy方法 執行時是否斷開----:false
D/TAG: onDestroy方法 斷開訂閱
11.repeatWhen()
<1> 作用
有條件地、重複發送 被觀察者事件。
<2> 代碼
略
<3> 結果
略