天天看点

RxJava中关于Subject和Processor的使用

文章目录

    • RxJava的使用步骤
    • Subject&Processor
    • Subject
      • AsyncSubject
      • BehaviorSubject
      • PublishSubject
      • ReplaySubject
    • 总结

RxJava的使用步骤

RxJava的使用需要三步:

  • 创建Observable:

    Observable是被观察者,负责在上游发送事件,可以在这里决定异步操作模块的次数和顺序。

  • 创建Observer:

    Observer是观察者,负责接收上游发送的事件。Observer可以自由切换线程,简化了并发操作,会在未来某个时刻相应上游发送的事件而不需要阻塞等待。

  • 使用subscribe()进行订阅:

    subscribe()将Observable和Observer连接起来,使得整个上下游串联起来实现链式调用。

Subject&Processor

Subject是一种特殊的存在,它既是Observable,又是Observer,可以将其看做一个桥梁或代理。

Subject和Processor的功能几乎是一样的,区别在于Subject不支持背压,而Processor支持背压。

Subject

Subject有4种类型:AsyncSubject、BehaviorSubject、ReplaySubject、PublishSubject,分别来看他们的用法

AsyncSubject

Observer会接收AsyncSubject发送的在onComplete()之前的最后一个数据。

AsyncSubject<String> mAsyncSubject = AsyncSubject.create();
mAsyncSubject.onNext(MESSAGE1);
mAsyncSubject.onNext(MESSAGE2);
mAsyncSubject.onComplete();
mAsyncSubject.subscribe(string -> {
    Log.e(TAG, "onNext: " + string);
}, throwable -> {
    Log.e(TAG, "onError: " + throwable.getMessage());
}, () -> {
    Log.e(TAG, "onComplete");
});
mAsyncSubject.onNext(MESSAGE3);
mAsyncSubject.onNext(MESSAGE4);
           

执行结果:

2020-11-22 16:34:11.768/com.ninetripods E/TTT: onNext: message2
2020-11-22 16:34:11.768/com.ninetripods E/TTT: onComplete
           

上述代码中将onComple()放在代码最后,如下:

AsyncSubject<String> mAsyncSubject = AsyncSubject.create();
mAsyncSubject.onNext(MESSAGE1);
mAsyncSubject.onNext(MESSAGE2);
mAsyncSubject.subscribe(string -> {
    Log.e(TAG, "onNext: " + string);
}, throwable -> {
    Log.e(TAG, "onError: " + throwable.getMessage());
}, () -> {
    Log.e(TAG, "onComplete");
});
mAsyncSubject.onNext(MESSAGE3);
mAsyncSubject.onNext(MESSAGE4);
mAsyncSubject.onComplete();
           

执行结果:

2020-11-22 16:34:11.768/com.ninetripods E/TTT: onNext: message4
2020-11-22 16:34:11.768/com.ninetripods E/TTT: onComplete
           

可见Observer接收的一直是onComplete()之前的最后一个数据。

这里需要注意一点:onComplete()方法必须要执行,AsyncSubject才会发送数据,否则Observer将不会接收任何事件。

BehaviorSubject

Observer会先接收BehaviorSubject被订阅之前的最后一个事件,然后接收订阅之后发送的所有事件。如果BehaviorSubject被订阅之前没有发送任何数据,可以通过BehaviorSubject.createDefault发送一个默认的事件。

BehaviorSubject<String> mBehaviorSubject = BehaviorSubject.createDefault(MESSAGE_DEFAULT);
mBehaviorSubject.subscribe(string -> {
    Log.e(TAG, "onNext: " + string);
}, throwable -> {
    Log.e(TAG, "onError: " + throwable.getMessage());
}, () -> {
    Log.e(TAG, "onComplete");
});
mBehaviorSubject.onNext(MESSAGE1);
mBehaviorSubject.onNext(MESSAGE2);
           

执行结果:

2020-11-22 17:04:39 E/TTT: onNext: message_default
2020-11-22 17:04:39 E/TTT: onNext: message1
2020-11-22 17:04:39 E/TTT: onNext: message2
           

改一下代码,在subscribe之前,发送message1:

BehaviorSubject<String> mBehaviorSubject = BehaviorSubject.createDefault(MESSAGE_DEFAULT);
mBehaviorSubject.onNext(MESSAGE1);
mBehaviorSubject.subscribe(string -> {
    Log.e(TAG, "onNext: " + string);
}, throwable -> {
    Log.e(TAG, "onError: " + throwable.getMessage());
}, () -> {
    Log.e(TAG, "onComplete");
});
mBehaviorSubject.onNext(MESSAGE2);
           

执行结果:

2020-11-22 17:06:38 E/TTT: onNext: message1
2020-11-22 17:06:38 E/TTT: onNext: message2
           

PublishSubject

Observer只接收PublishSubject被订阅之后发送的事件。

PublishSubject<String> mPublishSubject = PublishSubject.create();
mPublishSubject.onNext(MESSAGE1);
mPublishSubject.subscribe(string -> {
    Log.e(TAG, "onNext: " + string);
}, throwable -> {
    Log.e(TAG, "onError: " + throwable.getMessage());
}, () -> {
    Log.e(TAG, "onComplete");
});
mPublishSubject.onNext(MESSAGE2);
mPublishSubject.onNext(MESSAGE3);
mPublishSubject.onComplete();
           

执行结果:

2020-11-22 17:21:30.650 E/TTT: onNext: message2
2020-11-22 17:21:30.650 E/TTT: onNext: message3
2020-11-22 17:21:30.650 E/TTT: onComplete
           

虽然PublishSubject开始发送了message1,但是是在subscribe()之前发送的,Observer并不会接收。

ReplaySubject

Observer会接收ReplaySubject发送的所有事件,无论subscribe订阅是何时开始的。

ReplaySubject<String> mReplaySubject = ReplaySubject.create();
mReplaySubject.onNext(MESSAGE1);
mReplaySubject.onNext(MESSAGE2);
mReplaySubject.subscribe(string -> {
    Log.e(TAG, "onNext: " + string);
}, throwable -> {
    Log.e(TAG, "onError: " + throwable.getMessage());
}, () -> {
    Log.e(TAG, "onComplete");
});
mReplaySubject.onNext(MESSAGE3);
mReplaySubject.onComplete();
           

执行结果:

2020-11-22 17:36:49 E/TTT: onNext: message1
2020-11-22 17:36:49 E/TTT: onNext: message2
2020-11-22 17:36:49 E/TTT: onNext: message3
2020-11-22 17:36:49 E/TTT: onComplete
           

改一下代码,

ReplaySubject.create()

改为

ReplaySubject.createWithSize(1)

,表示只缓存subscribe订阅前发送的最后一个事件,执行结果为:

2020-11-22 17:36:49 E/TTT: onNext: message2
2020-11-22 17:36:49 E/TTT: onNext: message3
2020-11-22 17:36:49 E/TTT: onComplete
           

此时执行效果与BehaviorSubject相同,但是ReplaySubject.createWithSize(1)内部逻辑处理更复杂一些。

ReplaySubject除了可以限制缓存的数量之外,还可以使用

createWithTime()

限制缓存的时间,或者使用

createWithTimeAndSize()

同时限制缓存时间和数量。

总结

Subject不支持背压,Processor支持背压。

  • AsyncSubject: 无论订阅发生在什么时候,Observer只会接收AsyncSubject发送的在onComplete()之前的最后一个数据,且onComplete()是必须要调用的。
  • BehaviorSubject:Observer会先接收BehaviorSubject被订阅之前的最后一个事件,然后接收订阅之后发送的所有事件。
  • PublishSubject: Observer只接收PublishSubject被订阅之后发送的事件。
  • ReplaySubject:无论subscribe订阅是何时开始的,Observer会接收ReplaySubject发送的所有事件。

继续阅读