天天看點

RxJava源碼淺析

Create

建立一個Observable比較簡單,最基礎的方法是調用Observable的create方法進行建立,貼一下示例:

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                //執行想要的操作
            }
        });
           

它的源碼實作也比較簡單,在create的時候建立出一個Observable,并且将我們自己寫的OnSubscribe傳入,在訂閱的時候代碼如下:

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("observer can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
        }
        
        // new Subscriber so onStart it
        subscriber.onStart();
        
        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would 
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(hook.onSubscribeError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    hook.onSubscribeError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r;
                }
            }
            return Subscriptions.unsubscribed();
        }
    }
           

重點是這一句:

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
           

在訂閱的時候調用我們自己寫的OnSubscribe,傳入的參數也就是自己寫的Subscriber。

Map

先貼出簡單的示例代碼:

Observable.just(1,2,3,4,5)
                .map(new Func1<Integer, String>() {

                    @Override
                    public String call(Integer integer) {
                        return integer+"test";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.d(TAG, "call: "+s);
                    }
                });
           

map方法的作用是将一種類型的Observer變成另一種類型的Observer,看一下它内部的實作

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
}
           

目前的Observer聲明的泛型參數是T,為了使用另一種泛型參數,需要單獨的在泛型方法中進行聲明。看一下Func1的定義

public interface Func1<T, R> extends Function {
    R call(T t);
}
           

很簡單的函數式接口,内部隻有一個方法,接口的實作類通過傳入一個類型為T的對象,通過調用call方法将其轉化成類型為R的對象并且傳回。

回到上面的例子中,T就是Integer,R就是String,而我們使用了匿名類來實作的Func1接口,目的是将Integer類轉化成String類。

在map方法中新出現了一個類叫OperatorMap,看一下這個類:

public final class OperatorMap<T, R> implements Operator<R, T> {

    final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        return parent;
    }
    
    static final class MapSubscriber<T, R> extends Subscriber<T> {
        
        ...
    }

}
           

因為代碼比較短就全部貼上來了。看一下這個方法的構造,傳入了一個Func1的實作類,就是我們在前面例子中所寫的匿名類,然後竟然結束了。。。結束了。。。根本沒有啥具體的資訊。。。

但是還有一點值得去看的就是類名:OperatorMap<T, R> implements Operator<R, T>,這個Operator是個啥, 找一下它定義的地方:

public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }
           

這個接口是個空接口,但是既然沒有任何方法為什麼要有這個接口存在呢?答案就是要限制Func1中泛型參數的類型,想一下如果有一個類實作了Operator接口,它所複寫的方法是什麼樣子?OperatorMap就實作類Operator,貼一下它的實作代碼:

@Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
<pre name="code" class="java"><span style="white-space:pre">	</span>MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        return parent;
           

}

可以看出如果調用Operator的call方法可以将一種類型的Subscriber轉化成另一種類型的Subscriber。而轉化的方法的是建立了一個MapSubscriber直接傳回。這個類是OperatorMap的内部類,貼一下代碼簡單看一下:

static final class MapSubscriber<T, R> extends Subscriber<T> {
        
        final Subscriber<? super R> actual;
        
        final Func1<? super T, ? extends R> mapper;

        boolean done;
        
        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }
        
        @Override
        public void onNext(T t) {
            R result;
            
            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            
            actual.onNext(result);
        }
        
        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaPluginUtils.handleException(e);
                return;
            }
            done = true;
            
            actual.onError(e);
        }
        
        
        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }
        
        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }
           

這個類繼承了抽象類Subscriber,那麼它也是一個訂閱者,也實作了訂閱者都應該實作的onNext,onError,onCompleted方法。但是MapSubscriber在這些方法中做了一些處理,以onNext方法為例:

<span style="white-space:pre">	</span>R result;
        result = mapper.call(t); 
        actual.onNext(result);
           

将方法簡化後在onNext中隻剩下這三句,mapper就是在調用map方法的時候我們傳入的Func1的匿名實作類,而actual是調用OperatorMap的call方法中傳入的參數,暫時還沒涉及到,等下涉及到了再來看。最後調用了actual的onNext方法,然後一切就結束了。

再回過頭繼續看map:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }
           

跟進一下lift方法:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }
           

又出現了一個并沒有見過的類onSubscribeLift,看一下類的定義:

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
    
    static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }
}
           

代碼也比較少,是以就直接貼了。如果記性比較好應該能記得OnSubscribe是什麼作用,在這裡就不再贅述了,可以回頭看一看。而OnSubscribeLift既然是實作了onSubscribe接口

,那麼就和它有一樣的作用。在OnSubscribeLift的構造方法中接收了兩個參數,一個是原始的onSubscribe,另一個就是Operator啦,如果還有印象應該可以Operator這個接口目的是将一種類型的Subscriber轉化成另一種類型的Subscriber。

而OnSubscribe中最重要的就是call方法,也是通過create建立Observable時我們需要重寫的,OnSubscribeLift中的call方法已經給出了實作,簡單看一下。

hook.onLift(operator) 這句直接傳回的就是operator,是以這句話的作用是調用operator的call方法将R類型的Subscriber轉化成T類型的Subscriber,調用新建立出了Subscriber的onStart, 然後使用原來的OnSubscribe調用call方法,傳入的是新建立的Subscriber。

到這裡整個map的調用流程就結束了,但是相信看過了一定還是一臉懵逼,再從頭看一遍,相信一下就能明白了,我再把開始時候的例子重新貼出來:

Observable.just(1,2,3,4,5)
                .map(new Func1<Integer, String>() {

                    @Override
                    public String call(Integer integer) {
                        return integer+"test";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.d(TAG, "call: "+s);
                    }
                });
           

從後來往前看,首先是subscribe,在這個方法中調用了OnSubscribe的call方法,并且将Action1的執行個體對象封裝成subscriber傳入作為參數。這裡OnSubscribe并不是just方法調用中産生的OnSubscribe(如果你是用create建立的那麼也不是你自己寫的那個OnSubscribe),上面已經說了,map中隻是調用了lift,那麼真正起作用的OnSubscribe也應該是lift所産生的Observable中的OnSubscribe。再貼一下代碼:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }
           

這個新的OnSubscribe就是這個OnSubscribeLift,其中傳入的operator是OperatorMap的對象。

訂閱的時候調用的是OnSubscribeLift的call方法,再貼一下:

public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }
           

這個call方法中傳入的參數o就是訂閱時封裝了Action1的Subscriber。try塊中的第一行調用了operator的call方法,傳入的參數是這個subscriber,如果各位還有印象能記起來這個operator就是在map方法中建立出來的OperatorMap,再貼一下它的call方法:

public Subscriber<? super T> call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        return parent;
    }
           

在建立了一個MapSubscriber以後就傳回了。但是大家不知道有沒有注意,這裡面的傳回類型是Subscriber<? super T>,也就是傳回了一個原始類型的Subscriber,在我們這裡面也就是傳回了一個Integer類型的,原因也是因為使用了map方法後在subscribe方法中要接收的是一個R類型的Subscriber,在這裡進行了一下轉化。目前還沒有出現任何有用的邏輯。傳回了MapSubscriber對象st後,重要的一句是parent.call(st)。parent是原始的OnSubscribe,也就是我們自己複寫call方法的OnSubscribe,它想要的參數自然是Subscriber<T>,這裡由于傳它的參數是MapSubscriber,滿足了這個條件。是以現在要做的就是看一下MapSubscriber中的onNext,onError和onCompleted等方法啦(因為OnSubscribe的邏輯不就是調用這幾個方法麼)。以onNext為例:

public void onNext(T t) {
            R result;
            
            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            
            actual.onNext(result);
        }
           

這裡的mapper就是我們自定義的那個Func1的執行個體。還記得Func1的作用麼,将T轉換成R,在本例中也就是将Integer轉換成String,這裡進行了轉換,終于找到了~~~

那轉換之後呢,調用了actual的onNext方法,這個actual就是OperatorMap中call方法裡面的o,也就是最終我們在代碼中寫的封裝了Action1的Subscriber。這樣就完成了一連串的轉換和調用。

簡單的總結一下整個過程。因為隻有原始我們寫的OnSubscribe中有發射資料的邏輯,是以原始的那個必需得用上,但是原始的那個OnSubscribe想要的是一個T類型的Subscriber,不過我們現在有的隻是R類型的Subscriber,是以進行了一層封裝,将R類型的Subscriber轉換成T類型的Subscriber。然後在封裝類的OnNext方法中将原始的OnSubscribe裡call方法中發射的一系列資料進行一個轉換(從T轉換成R),用的就是自己寫的Func1,然後調用我們所寫的Subscriber<R>中的onNext等方法,方法需要的參數類型是R,而我們轉換生成的參數也是R,整個調用過程完成~~

我自己畫了一個簡單的流程圖如下:

RxJava源碼淺析

FlatMap

假設現在有一個商店類Shop,商店類中有位址和商品集合,每一個商品都有自己的名字,如果給出一個商店的集合,要将商店的位址輸出出來該怎麼做?

直接使用上面說過的map就可以,代碼如下(實體的代碼比較簡單就沒有貼出來):

List<Shop> shops = new ArrayList<>();
        for(int i = 1;i<=10;++i){
            Shop shop = new Shop();
            shop.setAddress(new Address("Shop"+i));
            for(int j = 1;j<=10;++j){
                Good good = new Good("Shop"+i+":Good"+j);
                shop.getGoods().add(good);
            }
            shops.add(shop);
        }

        
        Observable.from(shops)
                .map(new Func1<Shop, Address>() {

                    @Override
                    public Address call(Shop shop) {
                        return shop.getAddress();
                    }
                })
                .subscribe(new Action1<Address>() {
                    @Override
                    public void call(Address address) {
                        Log.d(TAG, "call: "+address.getName());
                    }
                });
           

但是如果想輸出商店所有商品的名字呢?

可以這樣:

Observable.from(shops)
                .map(new Func1<Shop, List<Good>>() {

                    @Override
                    public List<Good> call(Shop shop) {
                        return shop.getGoods();
                    }
                })
                .subscribe(new Action1<List<Good>>() {
                    @Override
                    public void call(List<Good> goods) {
                        for(Good g:goods){
                            Log.d(TAG, "call: "+g.getName());
                        }
                    }
                });
           

好吧把RxJava的初衷,也就是流式和簡潔破壞了,并且subscribe方法的可複用性極低,并不是我們想要看到的。

基于以上問題可以總結一點小的共性出來,還是将一個Observable<T>轉換成另一個Observable<R>,隻不過這個R并不是T中的屬性,而是T中某個集合中的元素,這時候我們可以考慮使用FlatMap。貼一下使用FlatMap後的代碼:

Observable.from(shops)
                .flatMap(new Func1<Shop, Observable<Good>>() {
                    @Override
                    public Observable<Good> call(Shop shop) {
                        return Observable.from(shop.getGoods());
                    }
                })
                .subscribe(new Action1<Good>() {
                    @Override
                    public void call(Good good) {
                        Log.d(TAG, "call: "+good.getName());
                    }
                });
           

下面來看一下FlatMap是怎麼實作的:

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        ...
        return merge(map(func));
    }
           

看到了很熟悉的map。從map傳入的Func1參數可以看出來map的作用是将Observable<T>轉換成Observable<Observable<R>>。

看一下merge的實作:

public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
        ...
        return source.lift(OperatorMerge.<T>instance(false));
    }
           

從方法的簽名可以看出merge的作用是将Observable<Observable<T>>的參數轉換成Observable <T>。看一下lift:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }
           

和剛剛介紹的map中的lift是一個方法,目的是通過一個操作符Operator将目前的Observable<T>轉換成Observable<R>。既然是轉化Observable,而且source的類型是Observable<Observable<T>>,轉換出來的東西想要Observable<T>,是以我們可以猜測OperatorMerge.<T>instance(false)所傳回的對象應該是這樣的一個形式

Operator<T, Observable<T>>

看一下OperatorMerge的定義:

public final class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
...
}
           

instance方法是一個單例方法,采用了靜态内部類的形式,代碼如下:

public static <T> OperatorMerge<T> instance(boolean delayErrors) {
        if (delayErrors) {
            return (OperatorMerge<T>)HolderDelayErrors.INSTANCE;
        }
        return (OperatorMerge<T>)HolderNoDelay.INSTANCE;
    }
           
private static final class HolderNoDelay {
        /** A singleton instance. */
        static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(false, Integer.MAX_VALUE);
    }
           

如果有印象的話能記起來lift方法是将operator和我們所寫的OnSubscribe封裝在一個OnSubscribeLift中,然後用這個OnSubscribe建立出一個Observable<R>。而在訂閱的時候我們寫的Observer的類型是R,是以走到了OnSubscribeLift的call方法中,在這個方法中調用了operator的call,目的是将一個Subscriber<R>轉換成一個Subscriber<T>進而能讓我們自己寫的OnSubscribe來對其進行操作。在map方法中執行這個任務的是OperatorMap類,具體的任務由其中的内部類MapSubscriber來實作,在merge操作中其實也是一樣的,看一下代碼:

@Override
    public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
        MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
        MergeProducer<T> producer = new MergeProducer<T>(subscriber);
        subscriber.producer = producer;
        
        child.add(subscriber);
        child.setProducer(producer);
        
        return subscriber;
    }
           

call方法的目的仍然是建立一個可以滿足原始的自己寫的OnSubscribe的Subscriber(這裡原始的Observable是Observable<Observable<T>>的source,如果忘記了可以回頭看一下)。而具體的任務仍然是建立一個内部類的對象來實作:

static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>> {
        final Subscriber<? super T> child;
        ...
        
        public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent) {
            this.child = child;
           ...
        }
        
        ...
       <pre name="code" class="java"> <span style="white-space:pre">	</span>@Override
        public void onNext(Observable<? extends T> t) {
            if (t == null) {
                return;
            }
            if (t == Observable.empty()) {
                emitEmpty();
            } else
            if (t instanceof ScalarSynchronousObservable) {
                tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
            } else {
                InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
                addInner(inner);
                t.unsafeSubscribe(inner);
                emit();
            }
        }
           

... }

這個類的代碼想對比較多,我們還是挑一些來看。構造方法中重點是傳入了一個child,這個對象就是最終我們自己寫的Subscriber,從它的泛型定義上也可以看的出來Subscriber<T>。

然後看一下onNext,除去各種錯誤檢查,直接看最後一個else,裡面首先建立出一個InnerSubscriber,看一下它的代碼:

static final class InnerSubscriber<T> extends Subscriber<T> {
        final MergeSubscriber<T> parent;
        final long id;
        volatile boolean done;
        volatile RxRingBuffer queue;
        int outstanding;
        static final int limit = RxRingBuffer.SIZE / 4;
        
        public InnerSubscriber(MergeSubscriber<T> parent, long id) {
            this.parent = parent;
            this.id = id;
        }
        @Override
        public void onStart() {
            outstanding = RxRingBuffer.SIZE;
            request(RxRingBuffer.SIZE);
        }
        @Override
        public void onNext(T t) {
            parent.tryEmit(this, t);
        }
        @Override
        public void onError(Throwable e) {
            done = true;
            parent.getOrCreateErrorQueue().offer(e);
            parent.emit();
        }
        @Override
        public void onCompleted() {
            done = true;
            parent.emit();
        }
        public void requestMore(long n) {
            int r = outstanding - (int)n;
            if (r > limit) {
                outstanding = r;
                return;
            }
            outstanding = RxRingBuffer.SIZE;
            int k = RxRingBuffer.SIZE - r;
            if (k > 0) {
                request(k);
            }
        }
    }
           

構造方法中将MergeSubscriber當作參數parent傳入。先不去管這個類,看MergeSubscriber的onNext方法後面做了些什麼:

t.unsafeSubscribe(inner);
           

這句比較重點,看一下實作:

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            // new Subscriber so onStart it
            subscriber.onStart();
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(this, onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                hook.onSubscribeError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }
           

可以看出除了稍有不同外就是正常的訂閱,邏輯也都類似,那麼正常訂閱的時候最終會調用到Subscriber的onNext方法中,這裡的Subscriber是哪一個呢?沒錯就是剛才所看到的InnerSubscribe。看一下它的onNext:

@Override
        public void onNext(T t) {
            parent.tryEmit(this, t);
        }
           

調用了parent,也就是MergeSubscriber的tryEmit,我們接着來看一下:

void tryEmit(InnerSubscriber<T> subscriber, T value) {
            boolean success = false;
            long r = producer.get();
            if (r != 0L) {
                synchronized (this) {
                    // if nobody is emitting and child has available requests
                    r = producer.get();
                    if (!emitting && r != 0L) {
                        emitting = true;
                        success = true;
                    }
                }
            }
            if (success) {
                emitScalar(subscriber, value, r);
            } else {
                queueScalar(subscriber, value);
            }
        }
           

比較關鍵的是if(success)裡面的方法emitScalar,看一下代碼實作:

protected void emitScalar(InnerSubscriber<T> subscriber, T value, long r) {
            boolean skipFinal = false;
            try {
                try {
                    child.onNext(value);
                } catch (Throwable t) {
                    if (!delayErrors) {
                        Exceptions.throwIfFatal(t);
                        skipFinal = true;
                        subscriber.unsubscribe();
                        subscriber.onError(t);
                        return;
                    }
                    getOrCreateErrorQueue().offer(t);
                }
                if (r != Long.MAX_VALUE) {
                    producer.produced(1);
                }
                subscriber.requestMore(1);
                // check if some state changed while emitting
                synchronized (this) {
                    skipFinal = true;
                    if (!missed) {
                        emitting = false;
                        return;
                    }
                    missed = false;
                }
            } finally {
                if (!skipFinal) {
                    synchronized (this) {
                        emitting = false;
                    }
                }
            }
            /*
             * In the synchronized block below request(1) we check
             * if there was a concurrent emission attempt and if there was,
             * we stay in emission mode and enter the emission loop
             * which will take care all the queued up state and 
             * emission possibilities.
             */
            emitLoop();
        }
           

終于看到了我麼想看的代碼:

child.onNext(value);
           

記性好的還能想起來child就是MergeSubscriber構造方法傳入的我們自己寫的Subscriber。

是以整個主線流程到這裡就結束啦,看得還是暈暈的,下面再上一張FlatMap的流程圖:

RxJava源碼淺析

subscribeOn和observerOn

RxJava神奇之處還有一個地方就是它可以将線程的轉換也簡單的通過流式來處理了,下面來看一下例子:

Observable.just(1,2,3,4,5)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        
                    }
                });
           

這段代碼沒有任何實際的含義,僅僅是調用了一下Api,下面來看一下源碼,首先是subscribeOn:

public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }
           

這裡也很簡單,主要是建立了一個新的Observable,并且這個Observable和原始的Observable都是一樣的泛型參數。如果現在訂閱了這個新的Subscriber,就會走到OperatorSubscribeOn這個類的call方法中,來看一下:

@Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        
        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                    
                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };
                
                source.unsafeSubscribe(s);
            }
        }
           

我們先隻關注第二個call方法,也就是被嵌套的call方法中的内容。首先以我們自己寫的Subscriber建立出了一個Subscriber,然後調用了source的unsafeSubscribe方法,從上面看下來的同學應該知道這個方法其實和正常的subscribe主體邏輯上沒有什麼差別,這裡就不貼代碼了,忘記了可以自己看一下源碼。那麼既然傳遞的參數是我們用自己寫的Subscriber建立出來的新的Subscriber,那麼久看一下新的Subscriber的onNext等方法:

@Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
           

可以看出僅僅是簡單調用了一下原Subscriber(也就是我們自己寫的Subscriber)的onNext方法。看起來所做的都沒有什麼卵用。

這個時候我們可以簡單的猜想一下,其實在inner.schedule方法執行中就已經切換了線程。我們來看一下源碼。

在例子中使用的Scheduler是Schedulers.io,然後在OperatorSubscribeOn的call方法中調用了Schedulers.io傳回的Scheduler的createWorker方法,跟進看一下:

public static Scheduler io() {
        return INSTANCE.ioScheduler;
    }
           
Scheduler io = hook.getIOScheduler();
        if (io != null) {
            ioScheduler = io;
        } else {
            ioScheduler = RxJavaSchedulersHook.createIoScheduler();
        }
           
public static Scheduler createIoScheduler() {
        return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
    }
           
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
        if (threadFactory == null) throw new NullPointerException("threadFactory == null");
        return new CachedThreadScheduler(threadFactory);
    }
           

最終到了CachedThreadScheduler中,這個類繼承了抽象類Scheduler。在OperatorSubscribeOn的call方法中調用createWorker方法建立出了一個Worker對象,由于這個方法是抽象的,我們看一下這個方法在具體類中的實作:

@Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
           

看一下EventLoopWorker的構造方法:

EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.threadWorker = pool.get();
        }
           

有一個很重要的屬性threadWorker,它是從pool中得到的,那這個pool是什麼,跟進看一下:

public CachedThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }
           

原來pool在這個類構造方法中被建立,而且它是一個引用包裝類,這樣就知道了get其實傳回的隻是它的包裝類,來看一下包裝的NONE:

static {
        NONE = new CachedWorkerPool(null, 0, null);
        NONE.shutdown();
    }
           

找到了,這個對象是CachedWorkerPool的執行個體。

我們再回頭看一下OperatorSubscribeOn的call方法,調用了inner.schedule,這個inner就是EventLoopWorker,它的schedule方法如下:

@Override
        public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
            if (innerSubscription.isUnsubscribed()) {
                // don't schedule, we are unsubscribed
                return Subscriptions.unsubscribed();
            }

            ScheduledAction s = threadWorker.scheduleActual(new Action0() {
                @Override
                public void call() {
                    if (isUnsubscribed()) {
                        return;
                    }
                    action.call();
                }
            }, delayTime, unit);
            innerSubscription.add(s);
            s.addParent(innerSubscription);
            return s;
        }
           

這裡面的threadWorker就是我們剛剛提到的CacheWorkerPool中get方法建立出來的:

ThreadWorker get() {
            if (allWorkers.isUnsubscribed()) {
                return SHUTDOWN_THREADWORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            // No cached worker found, so create a new one.
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }
           

看一下ThreadWorker:

private static final class ThreadWorker extends NewThreadWorker {
        private long expirationTime;

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }

        public long getExpirationTime() {
            return expirationTime;
        }

        public void setExpirationTime(long expirationTime) {
            this.expirationTime = expirationTime;
        }
    }
           

這個類基本沒什麼方法,是以上它的父類去找一下:

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        Action0 decoratedAction = schedulersHook.onSchedule(action);
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }
           

在這裡找到了我們想要的scheduleActual方法,形式上很眼熟,executor.submit明顯是線程池去執行了一個Runnable,但是這個線程池是哪裡來的呢,我們再找一下:

public NewThreadWorker(ThreadFactory threadFactory) {
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
        // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
        boolean cancelSupported = tryEnableCancelPolicy(exec);
        if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
            registerExecutor((ScheduledThreadPoolExecutor)exec);
        }
        schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
        executor = exec;
    }
           

找到了,是在NewThreadWorker的構造方法中建立的。

至此subscribeOn整體就分析完成了,大體流程也不難,主要就是代碼的層次有點深,不太好找。這裡再畫一個圖來總結一下:

RxJava源碼淺析

下面看一下observerOn:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
    }
           

又看見了熟悉的lift,這個方法應該已經熟悉的不能再熟悉了,lift建立了一個Observable,然後建立了一個OnSubscribeLift并且傳入一個Operator,在訂閱的時候首先利用Operator建立出一個Subscriber<T>,然後由我們自己寫的OnSubscribe對其進行操作。大緻過程就是這樣,重點在Operator的實作類上,在這裡是OperatorObserverOn,進入看一下:

@Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }
           

和之前的套路都是一樣的,建立出了一個内部類ObserveOnSubscriber,然後我們自己寫的OnSubscribe調用方法的時候最終将會調用到這個内部類的相應方法中,這裡以onNext為例,看一下ObserveOnSubscriber的onNext實作:

@Override
        public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            }
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }
           

首先是調用了queue.offer(on.next(t))方法,這裡面on.next(t)直接傳回的就是t,這句簡單的将t入隊,然後調用schedule,看一下:

protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(this);
            }
        }
           

重要的是recursiveScheduler,看一下它是什麼東西:

final Scheduler.Worker recursiveScheduler;

public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
            this.child = child;
            this.recursiveScheduler = scheduler.createWorker();
            this.delayError = delayError;
            this.on = NotificationLite.instance();
            int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
            // this formula calculates the 75% of the bufferSize, rounded up to the next integer
            this.limit = calculatedSize - (calculatedSize >> 2);
            if (UnsafeAccess.isUnsafeAvailable()) {
                queue = new SpscArrayQueue<Object>(calculatedSize);
            } else {
                queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
            }
            // signal that this is an async operator capable of receiving this many
            request(calculatedSize);
        }
           

看到這裡也就非常熟悉了,和上文所講的subscribeOn是一個道理,都是用了Worker。

回頭看onNext,這個Worker的schedule方法需要一個Action0,正好ObserveOnSubscriber實作了Action0,那麼看一下它的call方法,也就是在新線程中被調用的方法:

@Override
        public void call() {
            long missed = 1L;
            long currentEmission = emitted;

            // these are accessed in a tight loop around atomics so
            // loading them into local variables avoids the mandatory re-reading
            // of the constant fields
            final Queue<Object> q = this.queue;
            final Subscriber<? super T> localChild = this.child;
            final NotificationLite<T> localOn = this.on;
            
            // requested and counter are not included to avoid JIT issues with register spilling
            // and their access is is amortized because they are part of the outer loop which runs
            // less frequently (usually after each bufferSize elements)
            
            for (;;) {
                long requestAmount = requested.get();
                
                while (requestAmount != currentEmission) {
                    boolean done = finished;
                    Object v = q.poll();
                    boolean empty = v == null;
                    
                    if (checkTerminated(done, empty, localChild, q)) {
                        return;
                    }
                    
                    if (empty) {
                        break;
                    }
                    
                    localChild.onNext(localOn.getValue(v));

                    currentEmission++;
                    if (currentEmission == limit) {
                        requestAmount = BackpressureUtils.produced(requested, currentEmission);
                        request(currentEmission);
                        currentEmission = 0L;
                    }
                }
                
                if (requestAmount == currentEmission) {
                    if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                        return;
                    }
                }

                emitted = currentEmission;
                missed = counter.addAndGet(-missed);
                if (missed == 0L) {
                    break;
                }
            }
        }
           

并不是很長,找主線代碼來看一下。有一個死循環,在死循環中不斷的從queue中取出元素,這個元素就是剛剛在onNext中我們放進queue中的。取出後執行localChild.onNext(localOn.getValue(t))這句,localOn.getValue(t)其實就是給t帶上一個泛型然後傳回,那麼localChild是什麼:

public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
            this.child = child;
            this.recursiveScheduler = scheduler.createWorker();
            this.delayError = delayError;
            this.on = NotificationLite.instance();
            int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
            // this formula calculates the 75% of the bufferSize, rounded up to the next integer
            this.limit = calculatedSize - (calculatedSize >> 2);
            if (UnsafeAccess.isUnsafeAvailable()) {
                queue = new SpscArrayQueue<Object>(calculatedSize);
            } else {
                queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
            }
            // signal that this is an async operator capable of receiving this many
            request(calculatedSize);
        }
           

就是這裡面的child,也就是我們自己寫的subscriber。

到這裡observerOn就結束了,有之前的subscribeOn做鋪墊看起來就比較輕松。下面還是總結一下畫一張圖:

RxJava源碼淺析

RxJava基礎的一些操作就到這裡了,如果發現出現問題請務必告知,謝謝

繼續閱讀