RxJava與RxAndroid 接收消息通知 主要通過兩個類
rx.Observer
rx.Subscriber
這兩個類的關系:繼承關系rx.Observer被rx.Subscriber繼承
作用
Provides a mechanism for receiving push-based notifications from Observables, and permits manual
* unsubscribing from these Observables.
翻譯:提供了一種機制,用于接收推送的觀察者為基礎的通知,并允許手動取消訂閱這些觀測值
Subscriber在Observer上面進行了擴充:
如:
取消訂閱.unsubscribe(),判斷是否可訂閱等
增加onStart()回掉方法,好像一定都執行,源碼中的解釋:// do nothing by default
//接收
private Observer<String> stringObserver = new Observer<String>() {
@Override
public void onCompleted() {
LogUtils.d("onCompleted():" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
LogUtils.d("onError():" + Thread.currentThread().getName());
}
@Override
public void onNext(String o) {
LogUtils.d("onNext():data:" + o + " 線程" + Thread.currentThread().getName());
}
};
//接收 stringSubscriber.unsubscribe();
private Subscriber<String> stringSubscriber = new Subscriber<String>() {
@Override
public void onStart() {
super.onStart();
LogUtils.d("onStart():" + Thread.currentThread().getName());
}
@Override
public void onCompleted() {
LogUtils.d("onCompleted():" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
LogUtils.d("onError():" + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
LogUtils.d("onNext():data:" + s + Thread.currentThread().getName());
}
};
rx.Subscriber在實作rx.Observer的同時,也實作了rx.Subscription
這個類是個接口,主要展現在控制取消訂閱和是否不可訂閱的判斷:源碼:
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;
import rx.subscriptions.Subscriptions;
/**
* Subscription returns from {@link Observable#subscribe(Subscriber)} to allow unsubscribing.
* <p>
* See the utilities in {@link Subscriptions} and the implementations in the {@code rx.subscriptions} package.
* <p>
* This interface is the RxJava equivalent of {@code IDisposable} in Microsoft's Rx implementation.
*/
public interface Subscription {
/**
* Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
* was received.
* <p>
* This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
* onCompleted is called).
*/
void unsubscribe();
/**
* Indicates whether this {@code Subscription} is currently unsubscribed.
*
* @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
*/
boolean isUnsubscribed();
}
關于事件接收或者說事件訂閱的方法介紹:
onStart();最開始執行,一定執行
相對于傳統觀察者模式的不同,RxJava的時間回調方法除了普通時間onNext()( 相等于android的onClick/onEvent)之外,還定義了兩個特殊的事件onCompleted(0和onError().
onCompleted():事件隊列完結,RxJava不僅把每個事件單獨處理,還會把他們看作一個隊列,RxJAVA規定,當不會再有新的
onNext() 發出時,是觸發onCompletd()的标志
onError() 事件隊列異常,在事件處理過程中出現異常時,onError()會被觸發,同時隊列自動終止,不允許再有事件發出.
onCompleted和onError()是互斥的
歡迎入群:476899320探讨RxJava等新技術