天天看點

ReactiveCocoa 中 RACSignal 冷信号和熱信号底層實作分析

前言

關于ReactiveCocoa v2.5中冷信号和熱信号的文章中,最著名的就是美團的臧成威老師寫的3篇冷熱信号的文章:

細說ReactiveCocoa的冷信号與熱信号(一)

細說ReactiveCocoa的冷信号與熱信号(二):為什麼要區分冷熱信号

細說ReactiveCocoa的冷信号與熱信号(三):怎麼處理冷信号與熱信号

由于最近在寫關于RACSignal底層實作分析的文章,當然也逃不了關于冷熱信号操作的分析。這篇文章打算分析分析如何從冷信号轉成熱信号的底層實作。

一. 關于冷信号和熱信号的概念

冷熱信号的概念是源自于源于.NET架構Reactive Extensions(RX)中的Hot Observable和Cold Observable,

Hot Observable是主動的,盡管你并沒有訂閱事件,但是它會時刻推送,就像滑鼠移動;而Cold Observable是被動的,隻有當你訂閱的時候,它才會釋出消息。

Hot Observable可以有多個訂閱者,是一對多,集合可以與訂閱者共享資訊;而Cold Observable隻能一對一,當有不同的訂閱者,消息是重新完整發送。

在這篇文章細說ReactiveCocoa的冷信号與熱信号(一)詳細分析了冷熱信号的特點:

熱信号是主動的,即使你沒有訂閱事件,它仍然會時刻推送。而冷信号是被動的,隻有當你訂閱的時候,它才會發送消息。

熱信号可以有多個訂閱者,是一對多,信号可以與訂閱者共享資訊。而冷信号隻能一對一,當有不同的訂閱者,消息會從新完整發送。

二. RACSignal熱信号

RACSignal家族中符合熱信号的特點的信号有以下幾個。

1.RACSubject

@interface RACSubject : RACSignal <RACSubscriber>

@property (nonatomic, strong, readonly) NSMutableArray *subscribers;
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;

- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block;
+ (instancetype)subject;

@end複制代碼           

複制

首先來看看RACSubject的定義。

RACSubject是繼承自RACSignal,并且它還遵守RACSubscriber協定。這就意味着它既能訂閱信号,也能發送信号。

在RACSubject裡面有一個NSMutableArray數組,裡面裝着該信号的所有訂閱者。其次還有一個RACCompoundDisposable信号,裡面裝着該信号所有訂閱者的RACDisposable。

RACSubject之是以能稱之為熱信号,那麼它肯定是符合上述熱信号的定義的。讓我們從它的實作來看看它是如何符合的。

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);

    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

    NSMutableArray *subscribers = self.subscribers;
    @synchronized (subscribers) {
        [subscribers addObject:subscriber];
    }

    return [RACDisposable disposableWithBlock:^{
        @synchronized (subscribers) {
            NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
                return obj == subscriber;
            }];

            if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
        }
    }];
}複制代碼           

複制

上面是RACSubject的實作,它和RACSignal最大的不同在這兩行

NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
    [subscribers addObject:subscriber];
}複制代碼           

複制

RACSubject 把它的所有訂閱者全部都儲存到了NSMutableArray的數組裡。既然儲存了所有的訂閱者,那麼sendNext,sendError,sendCompleted就需要發生改變。

- (void)sendNext:(id)value {
    [self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
        [subscriber sendNext:value];
    }];
}

- (void)sendError:(NSError *)error {
    [self.disposable dispose];

    [self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
        [subscriber sendError:error];
    }];
}

- (void)sendCompleted {
    [self.disposable dispose];

    [self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
        [subscriber sendCompleted];
    }];
}複制代碼           

複制

從源碼可以看到,RACSubject中的sendNext,sendError,sendCompleted都會執行enumerateSubscribersUsingBlock:方法。

- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
    NSArray *subscribers;
    @synchronized (self.subscribers) {
        subscribers = [self.subscribers copy];
    }

    for (id<RACSubscriber> subscriber in subscribers) {
        block(subscriber);
    }
}複制代碼           

複制

enumerateSubscribersUsingBlock:方法會取出所有RACSubject的訂閱者,依次調用入參的block( )方法。

關于RACSubject的訂閱和發送的流程可以參考第一篇文章,大體一緻,其他的不同就是會依次對自己的訂閱者發送信号。

RACSubject就滿足了熱信号的特點,它即使沒有訂閱者,因為自己繼承了RACSubscriber協定,是以自己本身就可以發送信号。冷信号隻能被訂閱了才能發送信号。

RACSubject可以有很多訂閱者,它也會把這些訂閱者都儲存到自己的數組裡。RACSubject之後再發送信号,訂閱者就如同一起看電視,播放過的節目就看不到了,發送過的信号也接收不到了。接收信号。而RACSignal發送信号,訂閱者接收信号都隻能從頭開始接受,如同看點播節目,每次看都從頭開始看。

2. RACGroupedSignal

@interface RACGroupedSignal : RACSubject

@property (nonatomic, readonly, copy) id<NSCopying> key;
+ (instancetype)signalWithKey:(id<NSCopying>)key;
@end複制代碼           

複制

先看看RACGroupedSignal的定義。

RACGroupedSignal是在RACsignal這個方法裡面被用到的。

- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock transform:(id (^)(id object))transformBlock複制代碼           

複制

在這個方法裡面,sendNext裡面最後裡面是由RACGroupedSignal發送信号。

[groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x];複制代碼           

複制

關于groupBy的詳細分析請看這篇文章

3. RACBehaviorSubject

@interface RACBehaviorSubject : RACSubject
@property (nonatomic, strong) id currentValue;
+ (instancetype)behaviorSubjectWithDefaultValue:(id)value;
@end複制代碼           

複制

這個信号裡面存儲了一個對象currentValue,這裡存儲着這個信号的最新的值。

當然也可以調用類方法behaviorSubjectWithDefaultValue

+ (instancetype)behaviorSubjectWithDefaultValue:(id)value {
    RACBehaviorSubject *subject = [self subject];
    subject.currentValue = value;
    return subject;
}複制代碼           

複制

在這個方法裡面存儲預設的值,如果RACBehaviorSubject沒有接受到任何值,那麼這個信号就會發送這個預設的值。

當RACBehaviorSubject被訂閱:

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    RACDisposable *subscriptionDisposable = [super subscribe:subscriber];

    RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
        @synchronized (self) {
            [subscriber sendNext:self.currentValue];
        }
    }];

    return [RACDisposable disposableWithBlock:^{
        [subscriptionDisposable dispose];
        [schedulingDisposable dispose];
    }];
}複制代碼           

複制

sendNext裡面會始終發送存儲的currentValue值。調用sendNext會調用RACSubject裡面的sendNext,也會依次發送信号值給訂閱數組裡面每個訂閱者。

當RACBehaviorSubject向訂閱者sendNext的時候:

- (void)sendNext:(id)value {
    @synchronized (self) {
        self.currentValue = value;
        [super sendNext:value];
    }
}複制代碼           

複制

RACBehaviorSubject會把發送的值更新到currentValue裡面。下次發送值就會發送最後更新的值。

4. RACReplaySubject

const NSUInteger RACReplaySubjectUnlimitedCapacity = NSUIntegerMax;
@interface RACReplaySubject : RACSubject

@property (nonatomic, assign, readonly) NSUInteger capacity;
@property (nonatomic, strong, readonly) NSMutableArray *valuesReceived;
@property (nonatomic, assign) BOOL hasCompleted;
@property (nonatomic, assign) BOOL hasError;
@property (nonatomic, strong) NSError *error;
+ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity;

@end複制代碼           

複制

RACReplaySubject中會存儲RACReplaySubjectUnlimitedCapacity大小的曆史值。

+ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity {
    return [(RACReplaySubject *)[self alloc] initWithCapacity:capacity];
}

- (instancetype)init {
    return [self initWithCapacity:RACReplaySubjectUnlimitedCapacity];
}

- (instancetype)initWithCapacity:(NSUInteger)capacity {
    self = [super init];
    if (self == nil) return nil;

    _capacity = capacity;
    _valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);

    return self;
}複制代碼           

複制

在RACReplaySubject初始化中會初始化一個capacity大小的數組。

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

    RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
        @synchronized (self) {
            for (id value in self.valuesReceived) {
                if (compoundDisposable.disposed) return;

                [subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
            }

            if (compoundDisposable.disposed) return;

            if (self.hasCompleted) {
                [subscriber sendCompleted];
            } else if (self.hasError) {
                [subscriber sendError:self.error];
            } else {
                RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
                [compoundDisposable addDisposable:subscriptionDisposable];
            }
        }
    }];

    [compoundDisposable addDisposable:schedulingDisposable];

    return compoundDisposable;
}複制代碼           

複制

當RACReplaySubject被訂閱的時候,會把valuesReceived數組裡面的值都發送出去。

- (void)sendNext:(id)value {
    @synchronized (self) {
        [self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
        [super sendNext:value];

        if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
            [self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
        }
    }
}複制代碼           

複制

在sendNext中,valuesReceived會儲存每次接收到的值。調用super的sendNext,會依次把值都發送到每個訂閱者中。

這裡還會判斷數組裡面存儲了多少個值。如果存儲的值的個數大于了capacity,那麼要移除掉數組裡面從0開始的前幾個值,保證數組裡面隻裝capacity個數的值。

RACReplaySubject 和 RACSubject 的差別在于,RACReplaySubject還會把曆史的信号值都存儲起來發送給訂閱者。這一點,RACReplaySubject更像是RACSingnal 和 RACSubject 的合體版。RACSignal是冷信号,一旦被訂閱就會向訂閱者發送所有的值,這一點RACReplaySubject和RACSignal是一樣的。但是RACReplaySubject又有着RACSubject的特性,會把所有的值發送給多個訂閱者。當RACReplaySubject發送完之前存儲的曆史值之後,之後再發送信号的行為就和RACSubject完全一緻了。

三. RACSignal冷信号

在ReactiveCocoa v2.5中除了RACsignal信号以外,還有一些特殊的冷信号。

1.RACEmptySignal

@interface RACEmptySignal : RACSignal
+ (RACSignal *)empty;
@end複制代碼           

複制

這個信号隻有一個empty方法。

+ (RACSignal *)empty {
#ifdef DEBUG
    return [[[self alloc] init] setNameWithFormat:@"+empty"];
#else
    static id singleton;
    static dispatch_once_t pred;

    dispatch_once(&pred, ^{
        singleton = [[self alloc] init];
    });

    return singleton;
#endif
}複制代碼           

複制

在debug模式下,傳回一個名字叫empty的信号。在release模式下,傳回一個單例的empty信号。

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);
    return [RACScheduler.subscriptionScheduler schedule:^{
        [subscriber sendCompleted];
    }];
}複制代碼           

複制

RACEmptySignal信号一旦被訂閱就會發送sendCompleted。

2. RACReturnSignal

@interface RACReturnSignal : RACSignal
@property (nonatomic, strong, readonly) id value;
+ (RACSignal *)return:(id)value;
@end複制代碼           

複制

RACReturnSignal信号的定義也很簡單,直接根據value的值傳回一個RACSignal。

+ (RACSignal *)return:(id)value {
#ifndef DEBUG
    if (value == RACUnit.defaultUnit) {
        static RACReturnSignal *unitSingleton;
        static dispatch_once_t unitPred;

        dispatch_once(&unitPred, ^{
            unitSingleton = [[self alloc] init];
            unitSingleton->_value = RACUnit.defaultUnit;
        });

        return unitSingleton;
    } else if (value == nil) {
        static RACReturnSignal *nilSingleton;
        static dispatch_once_t nilPred;

        dispatch_once(&nilPred, ^{
            nilSingleton = [[self alloc] init];
            nilSingleton->_value = nil;
        });

        return nilSingleton;
    }
#endif

    RACReturnSignal *signal = [[self alloc] init];
    signal->_value = value;

#ifdef DEBUG
    [signal setNameWithFormat:@"+return: %@", value];
#endif

    return signal;
}複制代碼           

複制

在debug模式下直接建立一個RACReturnSignal信号裡面的值存儲的是入參value。在release模式下,會依照value的值是否是空,來建立對應的單例RACReturnSignal。

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);

 return [RACScheduler.subscriptionScheduler schedule:^{
    [subscriber sendNext:self.value];
    [subscriber sendCompleted];
 }];
}複制代碼           

複制

RACReturnSignal在被訂閱的時候,就隻會發送一個value值的信号,發送完畢之後就sendCompleted。

3. RACDynamicSignal

這個信号是建立RACSignal createSignal:的真身。關于RACDynamicSignal詳細過程請看第一篇文章。

4. RACErrorSignal

@interface RACErrorSignal : RACSignal
@property (nonatomic, strong, readonly) NSError *error;
+ (RACSignal *)error:(NSError *)error;
@end複制代碼           

複制

RACErrorSignal信号裡面就存儲了一個NSError。

+ (RACSignal *)error:(NSError *)error {
    RACErrorSignal *signal = [[self alloc] init];
    signal->_error = error;

#ifdef DEBUG
    [signal setNameWithFormat:@"+error: %@", error];
#else
    signal.name = @"+error:";
#endif

    return signal;
}複制代碼           

複制

RACErrorSignal初始化的時候把外界傳進來的Error儲存起來。當被訂閱的時候就發送這個Error出去。

5. RACChannelTerminal

@interface RACChannelTerminal : RACSignal <RACSubscriber>

- (id)init __attribute__((unavailable("Instantiate a RACChannel instead")));

@property (nonatomic, strong, readonly) RACSignal *values;
@property (nonatomic, strong, readonly) id<RACSubscriber> otherTerminal;
- (id)initWithValues:(RACSignal *)values otherTerminal:(id<RACSubscriber>)otherTerminal;

@end複制代碼           

複制

RACChannelTerminal在RAC日常開發中,用來雙向綁定的。它和RACSubject一樣,既繼承自RACSignal,同樣又遵守RACSubscriber協定。雖然具有RACSubject的發送和接收信号的特性,但是它依舊是冷信号,因為它無法一對多,它發送信号還是隻能一對一。

RACChannelTerminal無法手動初始化,需要靠RACChannel去初始化。

- (id)init {
    self = [super init];
    if (self == nil) return nil;

    RACReplaySubject *leadingSubject = [[RACReplaySubject replaySubjectWithCapacity:0] setNameWithFormat:@"leadingSubject"];
    RACReplaySubject *followingSubject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"followingSubject"];

    [[leadingSubject ignoreValues] subscribe:followingSubject];
    [[followingSubject ignoreValues] subscribe:leadingSubject];

    _leadingTerminal = [[[RACChannelTerminal alloc] initWithValues:leadingSubject otherTerminal:followingSubject] setNameWithFormat:@"leadingTerminal"];
    _followingTerminal = [[[RACChannelTerminal alloc] initWithValues:followingSubject otherTerminal:leadingSubject] setNameWithFormat:@"followingTerminal"];

    return self;
}複制代碼           

複制

在RACChannel的初始化中會調用RACChannelTerminal的initWithValues:方法,這裡的入參都是RACReplaySubject類型的。是以訂閱RACChannelTerminal過程的時候:

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    return [self.values subscribe:subscriber];
}複制代碼           

複制

self.values其實就是一個RACReplaySubject,就相當于訂閱RACReplaySubject。訂閱過程同上面RACReplaySubject的訂閱過程。

- (void)sendNext:(id)value {
    [self.otherTerminal sendNext:value];
}

- (void)sendError:(NSError *)error {
    [self.otherTerminal sendError:error];
}

- (void)sendCompleted {
    [self.otherTerminal sendCompleted];
}複制代碼           

複制

self.otherTerminal也是RACReplaySubject類型的,RACChannelTerminal管道兩邊都是RACReplaySubject類型的信号。當RACChannelTerminal開始sendNext,sendError,sendCompleted是調用的管道另外一個的RACReplaySubject進行這些對應的操作的。

平時使用RACChannelTerminal的地方在View和ViewModel的雙向綁定上面。

例如在登入界面,輸入密碼文本框TextField和ViewModel的Password雙向綁定

RACChannelTerminal *passwordTerminal = [_passwordTextField rac_newTextChannel];
    RACChannelTerminal *viewModelPasswordTerminal = RACChannelTo(_viewModel, password);
    [viewModelPasswordTerminal subscribe:passwordTerminal];
    [passwordTerminal subscribe:viewModelPasswordTerminal];複制代碼           

複制

雙向綁定的兩個信号都會因為對方的改變而收到新的信号。

至此所有的RACSignal的分類就都理順了,按照冷信号和熱信号的分類也分好了。

ReactiveCocoa 中 RACSignal 冷信号和熱信号底層實作分析

四. 冷信号是如何轉換成熱信号的

為何有時候需要把冷信号轉換成熱信号呢?詳情可以看這篇文章裡面舉的例子:細說ReactiveCocoa的冷信号與熱信号(二):為什麼要區分冷熱信号

根據RACSignal訂閱和發送信号的流程,我們可以知道,每訂閱一次冷信号RACSignal,就會執行一次didSubscribe閉包。這個時候就是可能出現問題的地方。如果RACSignal是被用于網絡請求,那麼在didSubscribe閉包裡面會被重複的請求。上面文中提到了信号被訂閱了6次,網絡請求也會請求6次。這并不是我們想要的。網絡請求隻需要請求1次。

如何做到信号隻執行一次didSubscribe閉包,最重要的一點是RACSignal冷信号隻能被訂閱一次。由于冷信号隻能一對一,那麼想一對多就隻能交給熱信号去處理了。這時候就需要把冷信号轉換成熱信号。

在ReactiveCocoa v2.5中,冷信号轉換成熱信号需要用到RACMulticastConnection 這個類。

@interface RACMulticastConnection : NSObject
@property (nonatomic, strong, readonly) RACSignal *signal;
- (RACDisposable *)connect;
- (RACSignal *)autoconnect;
@end


@interface RACMulticastConnection () {
    RACSubject *_signal;
    int32_t volatile _hasConnected;
}
@property (nonatomic, readonly, strong) RACSignal *sourceSignal;
@property (strong) RACSerialDisposable *serialDisposable;
@end複制代碼           

複制

看看RACMulticastConnection類的定義。最主要的是儲存了兩個信号,一個是RACSubject,一個是sourceSignal(RACSignal類型)。在.h中暴露給外面的是RACSignal,在.m中實際使用的是RACSubject。看它的定義就能猜到接下去它會做什麼:用sourceSignal去發送信号,内部再用RACSubject去訂閱sourceSignal,然後RACSubject會把sourceSignal的信号值依次發給它的訂閱者們。

用一個不恰當的比喻來形容RACMulticastConnection,它就像上圖中心的那個“地球”,“地球”就是訂閱了sourceSignal的RACSubject,RACSubject把值發送給各個“連接配接”者(訂閱者)。sourceSignal隻有内部的RACSubject一個訂閱者,是以就完成了我們隻想執行didSubscribe閉包一次,但是能把值發送給各個訂閱者的願望。

在看看RACMulticastConnection的初始化

- (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
    NSCParameterAssert(source != nil);
    NSCParameterAssert(subject != nil);

    self = [super init];
    if (self == nil) return nil;

    _sourceSignal = source;
    _serialDisposable = [[RACSerialDisposable alloc] init];
    _signal = subject;

    return self;
}複制代碼           

複制

初始化方法就是把外界傳進來的RACSignal儲存成sourceSignal,把外界傳進來的RACSubject儲存成自己的signal屬性。

RACMulticastConnection有兩個連接配接方法。

- (RACDisposable *)connect {
    BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);

    if (shouldConnect) {
        self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
    }
    return self.serialDisposable;
}複制代碼           

複制

這裡出現了一個不多見的函數OSAtomicCompareAndSwap32Barrier,它是原子運算的操作符,主要用于Compare and swap,原型如下:

bool    OSAtomicCompareAndSwap32Barrier( int32_t __oldValue, int32_t __newValue, volatile int32_t *__theValue );複制代碼           

複制

關鍵字volatile隻確定每次擷取volatile變量時都是從記憶體加載變量,而不是使用寄存器裡面的值,但是它不保證代碼通路變量是正确的。

如果用僞代碼去實作這個函數:

f (*__theValue == __oldValue) {  
    *__theValue = __newValue;  
    return 1;  
} else {  
    return 0;  
}複制代碼           

複制

如果_hasConnected為0,意味着沒有連接配接,OSAtomicCompareAndSwap32Barrier傳回1,shouldConnect就應該連接配接。如果_hasConnected為1,意味着已經連接配接過了,OSAtomicCompareAndSwap32Barrier傳回0,shouldConnect不會再次連接配接。

所謂連接配接的過程就是RACMulticastConnection内部用RACSubject訂閱self.sourceSignal。sourceSignal是RACSignal,會把訂閱者RACSubject儲存到RACPassthroughSubscriber中,sendNext的時候就會調用RACSubject sendNext,這時就會把sourceSignal的信号都發送給各個訂閱者了。

- (RACSignal *)autoconnect {
    __block volatile int32_t subscriberCount = 0;

    return [[RACSignal
        createSignal:^(id<RACSubscriber> subscriber) {
            OSAtomicIncrement32Barrier(&subscriberCount);

            RACDisposable *subscriptionDisposable = [self.signal subscribe:subscriber];
            RACDisposable *connectionDisposable = [self connect];

            return [RACDisposable disposableWithBlock:^{
                [subscriptionDisposable dispose];

                if (OSAtomicDecrement32Barrier(&subscriberCount) == 0) {
                    [connectionDisposable dispose];
                }
            }];
        }]
        setNameWithFormat:@"[%@] -autoconnect", self.signal.name];
}複制代碼           

複制

OSAtomicIncrement32Barrier 和 OSAtomicDecrement32Barrier也是原子運算的操作符,分别是+1和-1操作。在autoconnect為了保證線程安全,用到了一個subscriberCount的類似信号量的volatile變量,保證第一個訂閱者能連接配接上。傳回的新的信号的訂閱者訂閱RACSubject,RACSubject也會去訂閱内部的sourceSignal。

把冷信号轉換成熱信号用以下5種方式,5種方法都會用到RACMulticastConnection。接下來一一分析它們的具體實作。

1. multicast:

- (RACMulticastConnection *)multicast:(RACSubject *)subject {
    [subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
    RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
    return connection;
}複制代碼           

複制

multicast:的操作就是初始化一個RACMulticastConnection對象,SourceSignal是self,内部的RACSubject是入參subject。

RACMulticastConnection *connection = [signal multicast:[RACSubject subject]];
    [connection.signal subscribeNext:^(id x) {
        NSLog(@"%@",x);
    }];
    [connection connect];複制代碼           

複制

調用 multicast:把冷信号轉換成熱信号有一個點不友善的是,需要自己手動connect。注意轉換完之後的熱信号在RACMulticastConnection的signal屬性中,是以需要訂閱的是connection.signal。

2. publish

- (RACMulticastConnection *)publish {
    RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
    RACMulticastConnection *connection = [self multicast:subject];
    return connection;
}複制代碼           

複制

publish方法隻不過是去調用了multicast:方法,publish内部會建立好一個RACSubject,并把它當成入參傳遞給RACMulticastConnection。

RACMulticastConnection *connection = [signal publish];
    [connection.signal subscribeNext:^(id x) {
        NSLog(@"%@",x);
    }];
    [connection connect];複制代碼           

複制

同樣publish方法也需要手動的調用connect方法。

3. replay

- (RACSignal *)replay {
    RACReplaySubject *subject = [[RACReplaySubject subject] setNameWithFormat:@"[%@] -replay", self.name];

    RACMulticastConnection *connection = [self multicast:subject];
    [connection connect];

    return connection.signal;
}複制代碼           

複制

replay方法會把RACReplaySubject當成RACMulticastConnection的RACSubject傳遞進去,初始化好了RACMulticastConnection,再自動調用connect方法,傳回的信号就是轉換好的熱信号,即RACMulticastConnection裡面的RACSubject信号。

這裡必須是RACReplaySubject,因為在replay方法裡面先connect了。如果用RACSubject,那信号在connect之後就會通過RACSubject把原信号發送給各個訂閱者了。用RACReplaySubject把信号儲存起來,即使replay方法裡面先connect,訂閱者後訂閱也是可以拿到之前的信号值的。

4. replayLast

- (RACSignal *)replayLast {
    RACReplaySubject *subject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"[%@] -replayLast", self.name];

    RACMulticastConnection *connection = [self multicast:subject];
    [connection connect];

    return connection.signal;
}複制代碼           

複制

replayLast 和 replay的實作基本一樣,唯一的不同就是傳入的RACReplaySubject的Capacity是1,意味着隻能儲存最新的值。是以使用replayLast,訂閱之後就隻能拿到原信号最新的值。

5. replayLazily

- (RACSignal *)replayLazily {
    RACMulticastConnection *connection = [self multicast:[RACReplaySubject subject]];
    return [[RACSignal
        defer:^{
            [connection connect];
            return connection.signal;
        }]
        setNameWithFormat:@"[%@] -replayLazily", self.name];
}複制代碼           

複制

replayLazily 的實作也和 replayLast、replay實作很相似。隻不過把connect放到了defer的操作裡面去了。

defer操作的實作如下:

+ (RACSignal *)defer:(RACSignal * (^)(void))block {
    NSCParameterAssert(block != NULL);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return [block() subscribe:subscriber];
    }] setNameWithFormat:@"+defer:"];
}複制代碼           

複制

defer 單詞的字面意思是延遲的。也和這個函數實作的效果是一緻的。隻有當defer傳回的新信号被訂閱的時候,才會執行入參block( )閉包。訂閱者會訂閱這個block( )閉包的傳回值RACSignal。

block( )閉包被延遲建立RACSignal了,這就是defer。如果block( )閉包含有和時間有關的操作,或者副作用,想要延遲執行,就可以用defer。

還有一個類似的操作,then

- (RACSignal *)then:(RACSignal * (^)(void))block {
    NSCParameterAssert(block != nil);

    return [[[self
        ignoreValues]
        concat:[RACSignal defer:block]]
        setNameWithFormat:@"[%@] -then:", self.name];
}複制代碼           

複制

then的操作也是延遲,隻不過它是把block( )閉包延遲到原信号發送complete之後。通過then信号變化得到的新的信号,在原信号發送值的期間的時間内,都不會發送任何值,因為ignoreValues了,一旦原信号sendComplete之後,就緊接着block( )閉包産生的信号。

回到replayLazily操作上來,作用同樣是把冷信号轉換成熱信号,隻不過sourceSignal是在傳回的新信号第一次被訂閱的時候才被訂閱。原因就是defer延遲了block( )閉包的執行了。

最後

關于RACSignal的變換操作還剩下高階信号操作,下篇接着繼續分析。最後請大家多多指教。