天天看點

RxJava與RxAndroid 接收消息通知

RxJava與RxAndroid 接收消息通知 主要通過兩個類

rx.Observer

rx.Subscriber

這兩個類的關系:繼承關系rx.Observer被rx.Subscriber繼承

作用

Provides a mechanism for receiving push-based notifications from Observables, and permits manual
* unsubscribing from these Observables.      

翻譯:提供了一種機制,用于接收推送的觀察者為基礎的通知,并允許手動取消訂閱這些觀測值

Subscriber在Observer上面進行了擴充:

如: 

取消訂閱.unsubscribe(),判斷是否可訂閱等

增加onStart()回掉方法,好像一定都執行,源碼中的解釋:// do nothing by default

//接收
    private Observer<String> stringObserver = new Observer<String>() {

        @Override
        public void onCompleted() {
            LogUtils.d("onCompleted():" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
            LogUtils.d("onError():" + Thread.currentThread().getName());
        }

        @Override
        public void onNext(String o) {
            LogUtils.d("onNext():data:" + o + "  線程" + Thread.currentThread().getName());
        }
    };

    //接收   stringSubscriber.unsubscribe();
    private Subscriber<String> stringSubscriber = new Subscriber<String>() {
        @Override
        public void onStart() {
            super.onStart();
            LogUtils.d("onStart():" + Thread.currentThread().getName());
        }

        @Override
        public void onCompleted() {
            LogUtils.d("onCompleted():" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
            LogUtils.d("onError():" + Thread.currentThread().getName());
        }

        @Override
        public void onNext(String s) {
            LogUtils.d("onNext():data:" + s + Thread.currentThread().getName());
        }
    };
           
rx.Subscriber在實作rx.Observer的同時,也實作了rx.Subscription      
這個類是個接口,主要展現在控制取消訂閱和是否不可訂閱的判斷:源碼:      
/**
 * Copyright 2014 Netflix, Inc.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package rx;

import rx.subscriptions.Subscriptions;

/**
 * Subscription returns from {@link Observable#subscribe(Subscriber)} to allow unsubscribing.
 * <p>
 * See the utilities in {@link Subscriptions} and the implementations in the {@code rx.subscriptions} package.
 * <p>
 * This interface is the RxJava equivalent of {@code IDisposable} in Microsoft's Rx implementation.
 */
public interface Subscription {

    /**
     * Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
     * was received.
     * <p>
     * This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
     * onCompleted is called).
     */
    void unsubscribe();

    /**
     * Indicates whether this {@code Subscription} is currently unsubscribed.
     *
     * @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
     */
    boolean isUnsubscribed();

}
           

關于事件接收或者說事件訂閱的方法介紹:

onStart();最開始執行,一定執行

相對于傳統觀察者模式的不同,RxJava的時間回調方法除了普通時間onNext()( 相等于android的onClick/onEvent)之外,還定義了兩個特殊的事件onCompleted(0和onError().

onCompleted():事件隊列完結,RxJava不僅把每個事件單獨處理,還會把他們看作一個隊列,RxJAVA規定,當不會再有新的

onNext() 發出時,是觸發onCompletd()的标志

onError() 事件隊列異常,在事件處理過程中出現異常時,onError()會被觸發,同時隊列自動終止,不允許再有事件發出.

onCompleted和onError()是互斥的

歡迎入群:476899320探讨RxJava等新技術

繼續閱讀