天天看点

RxJava与RxAndroid 接收消息通知

RxJava与RxAndroid 接收消息通知 主要通过两个类

rx.Observer

rx.Subscriber

这两个类的关系:继承关系rx.Observer被rx.Subscriber继承

作用

Provides a mechanism for receiving push-based notifications from Observables, and permits manual
* unsubscribing from these Observables.      

翻译:提供了一种机制,用于接收推送的观察者为基础的通知,并允许手动取消订阅这些观测值

Subscriber在Observer上面进行了扩展:

如: 

取消订阅.unsubscribe(),判断是否可订阅等

增加onStart()回掉方法,好像一定都执行,源码中的解释:// do nothing by default

//接收
    private Observer<String> stringObserver = new Observer<String>() {

        @Override
        public void onCompleted() {
            LogUtils.d("onCompleted():" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
            LogUtils.d("onError():" + Thread.currentThread().getName());
        }

        @Override
        public void onNext(String o) {
            LogUtils.d("onNext():data:" + o + "  线程" + Thread.currentThread().getName());
        }
    };

    //接收   stringSubscriber.unsubscribe();
    private Subscriber<String> stringSubscriber = new Subscriber<String>() {
        @Override
        public void onStart() {
            super.onStart();
            LogUtils.d("onStart():" + Thread.currentThread().getName());
        }

        @Override
        public void onCompleted() {
            LogUtils.d("onCompleted():" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
            LogUtils.d("onError():" + Thread.currentThread().getName());
        }

        @Override
        public void onNext(String s) {
            LogUtils.d("onNext():data:" + s + Thread.currentThread().getName());
        }
    };
           
rx.Subscriber在实现rx.Observer的同时,也实现了rx.Subscription      
这个类是个接口,主要体现在控制取消订阅和是否不可订阅的判断:源码:      
/**
 * Copyright 2014 Netflix, Inc.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package rx;

import rx.subscriptions.Subscriptions;

/**
 * Subscription returns from {@link Observable#subscribe(Subscriber)} to allow unsubscribing.
 * <p>
 * See the utilities in {@link Subscriptions} and the implementations in the {@code rx.subscriptions} package.
 * <p>
 * This interface is the RxJava equivalent of {@code IDisposable} in Microsoft's Rx implementation.
 */
public interface Subscription {

    /**
     * Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
     * was received.
     * <p>
     * This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
     * onCompleted is called).
     */
    void unsubscribe();

    /**
     * Indicates whether this {@code Subscription} is currently unsubscribed.
     *
     * @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
     */
    boolean isUnsubscribed();

}
           

关于事件接收或者说事件订阅的方法介绍:

onStart();最开始执行,一定执行

相对于传统观察者模式的不同,RxJava的时间回调方法除了普通时间onNext()( 相等于android的onClick/onEvent)之外,还定义了两个特殊的事件onCompleted(0和onError().

onCompleted():事件队列完结,RxJava不仅把每个事件单独处理,还会把他们看作一个队列,RxJAVA规定,当不会再有新的

onNext() 发出时,是触发onCompletd()的标志

onError() 事件队列异常,在事件处理过程中出现异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出.

onCompleted和onError()是互斥的

欢迎入群:476899320探讨RxJava等新技术

继续阅读