前言
關于ReactiveCocoa v2.5中冷信号和熱信号的文章中,最著名的就是美團的臧成威老師寫的3篇冷熱信号的文章:
細說ReactiveCocoa的冷信号與熱信号(一)
細說ReactiveCocoa的冷信号與熱信号(二):為什麼要區分冷熱信号
細說ReactiveCocoa的冷信号與熱信号(三):怎麼處理冷信号與熱信号
由于最近在寫關于RACSignal底層實作分析的文章,當然也逃不了關于冷熱信号操作的分析。這篇文章打算分析分析如何從冷信号轉成熱信号的底層實作。
一. 關于冷信号和熱信号的概念
冷熱信号的概念是源自于源于.NET架構Reactive Extensions(RX)中的Hot Observable和Cold Observable,
Hot Observable是主動的,盡管你并沒有訂閱事件,但是它會時刻推送,就像滑鼠移動;而Cold Observable是被動的,隻有當你訂閱的時候,它才會釋出消息。
Hot Observable可以有多個訂閱者,是一對多,集合可以與訂閱者共享資訊;而Cold Observable隻能一對一,當有不同的訂閱者,消息是重新完整發送。
在這篇文章細說ReactiveCocoa的冷信号與熱信号(一)詳細分析了冷熱信号的特點:
熱信号是主動的,即使你沒有訂閱事件,它仍然會時刻推送。而冷信号是被動的,隻有當你訂閱的時候,它才會發送消息。
熱信号可以有多個訂閱者,是一對多,信号可以與訂閱者共享資訊。而冷信号隻能一對一,當有不同的訂閱者,消息會從新完整發送。
二. RACSignal熱信号
RACSignal家族中符合熱信号的特點的信号有以下幾個。
1.RACSubject
@interface RACSubject : RACSignal <RACSubscriber>
@property (nonatomic, strong, readonly) NSMutableArray *subscribers;
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;
- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block;
+ (instancetype)subject;
@end複制代碼
複制
首先來看看RACSubject的定義。
RACSubject是繼承自RACSignal,并且它還遵守RACSubscriber協定。這就意味着它既能訂閱信号,也能發送信号。
在RACSubject裡面有一個NSMutableArray數組,裡面裝着該信号的所有訂閱者。其次還有一個RACCompoundDisposable信号,裡面裝着該信号所有訂閱者的RACDisposable。
RACSubject之是以能稱之為熱信号,那麼它肯定是符合上述熱信号的定義的。讓我們從它的實作來看看它是如何符合的。
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}
return [RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];
if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
}
}];
}複制代碼
複制
上面是RACSubject的實作,它和RACSignal最大的不同在這兩行
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}複制代碼
複制
RACSubject 把它的所有訂閱者全部都儲存到了NSMutableArray的數組裡。既然儲存了所有的訂閱者,那麼sendNext,sendError,sendCompleted就需要發生改變。
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:value];
}];
}
- (void)sendError:(NSError *)error {
[self.disposable dispose];
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendError:error];
}];
}
- (void)sendCompleted {
[self.disposable dispose];
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendCompleted];
}];
}複制代碼
複制
從源碼可以看到,RACSubject中的sendNext,sendError,sendCompleted都會執行enumerateSubscribersUsingBlock:方法。
- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
NSArray *subscribers;
@synchronized (self.subscribers) {
subscribers = [self.subscribers copy];
}
for (id<RACSubscriber> subscriber in subscribers) {
block(subscriber);
}
}複制代碼
複制
enumerateSubscribersUsingBlock:方法會取出所有RACSubject的訂閱者,依次調用入參的block( )方法。
關于RACSubject的訂閱和發送的流程可以參考第一篇文章,大體一緻,其他的不同就是會依次對自己的訂閱者發送信号。
RACSubject就滿足了熱信号的特點,它即使沒有訂閱者,因為自己繼承了RACSubscriber協定,是以自己本身就可以發送信号。冷信号隻能被訂閱了才能發送信号。
RACSubject可以有很多訂閱者,它也會把這些訂閱者都儲存到自己的數組裡。RACSubject之後再發送信号,訂閱者就如同一起看電視,播放過的節目就看不到了,發送過的信号也接收不到了。接收信号。而RACSignal發送信号,訂閱者接收信号都隻能從頭開始接受,如同看點播節目,每次看都從頭開始看。
2. RACGroupedSignal
@interface RACGroupedSignal : RACSubject
@property (nonatomic, readonly, copy) id<NSCopying> key;
+ (instancetype)signalWithKey:(id<NSCopying>)key;
@end複制代碼
複制
先看看RACGroupedSignal的定義。
RACGroupedSignal是在RACsignal這個方法裡面被用到的。
- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock transform:(id (^)(id object))transformBlock複制代碼
複制
在這個方法裡面,sendNext裡面最後裡面是由RACGroupedSignal發送信号。
[groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x];複制代碼
複制
關于groupBy的詳細分析請看這篇文章
3. RACBehaviorSubject
@interface RACBehaviorSubject : RACSubject
@property (nonatomic, strong) id currentValue;
+ (instancetype)behaviorSubjectWithDefaultValue:(id)value;
@end複制代碼
複制
這個信号裡面存儲了一個對象currentValue,這裡存儲着這個信号的最新的值。
當然也可以調用類方法behaviorSubjectWithDefaultValue
+ (instancetype)behaviorSubjectWithDefaultValue:(id)value {
RACBehaviorSubject *subject = [self subject];
subject.currentValue = value;
return subject;
}複制代碼
複制
在這個方法裡面存儲預設的值,如果RACBehaviorSubject沒有接受到任何值,那麼這個信号就會發送這個預設的值。
當RACBehaviorSubject被訂閱:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
[subscriber sendNext:self.currentValue];
}
}];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
[schedulingDisposable dispose];
}];
}複制代碼
複制
sendNext裡面會始終發送存儲的currentValue值。調用sendNext會調用RACSubject裡面的sendNext,也會依次發送信号值給訂閱數組裡面每個訂閱者。
當RACBehaviorSubject向訂閱者sendNext的時候:
- (void)sendNext:(id)value {
@synchronized (self) {
self.currentValue = value;
[super sendNext:value];
}
}複制代碼
複制
RACBehaviorSubject會把發送的值更新到currentValue裡面。下次發送值就會發送最後更新的值。
4. RACReplaySubject
const NSUInteger RACReplaySubjectUnlimitedCapacity = NSUIntegerMax;
@interface RACReplaySubject : RACSubject
@property (nonatomic, assign, readonly) NSUInteger capacity;
@property (nonatomic, strong, readonly) NSMutableArray *valuesReceived;
@property (nonatomic, assign) BOOL hasCompleted;
@property (nonatomic, assign) BOOL hasError;
@property (nonatomic, strong) NSError *error;
+ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity;
@end複制代碼
複制
RACReplaySubject中會存儲RACReplaySubjectUnlimitedCapacity大小的曆史值。
+ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity {
return [(RACReplaySubject *)[self alloc] initWithCapacity:capacity];
}
- (instancetype)init {
return [self initWithCapacity:RACReplaySubjectUnlimitedCapacity];
}
- (instancetype)initWithCapacity:(NSUInteger)capacity {
self = [super init];
if (self == nil) return nil;
_capacity = capacity;
_valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
return self;
}複制代碼
複制
在RACReplaySubject初始化中會初始化一個capacity大小的數組。
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
for (id value in self.valuesReceived) {
if (compoundDisposable.disposed) return;
[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
}
if (compoundDisposable.disposed) return;
if (self.hasCompleted) {
[subscriber sendCompleted];
} else if (self.hasError) {
[subscriber sendError:self.error];
} else {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
[compoundDisposable addDisposable:subscriptionDisposable];
}
}
}];
[compoundDisposable addDisposable:schedulingDisposable];
return compoundDisposable;
}複制代碼
複制
當RACReplaySubject被訂閱的時候,會把valuesReceived數組裡面的值都發送出去。
- (void)sendNext:(id)value {
@synchronized (self) {
[self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
[super sendNext:value];
if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
[self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
}
}
}複制代碼
複制
在sendNext中,valuesReceived會儲存每次接收到的值。調用super的sendNext,會依次把值都發送到每個訂閱者中。
這裡還會判斷數組裡面存儲了多少個值。如果存儲的值的個數大于了capacity,那麼要移除掉數組裡面從0開始的前幾個值,保證數組裡面隻裝capacity個數的值。
RACReplaySubject 和 RACSubject 的差別在于,RACReplaySubject還會把曆史的信号值都存儲起來發送給訂閱者。這一點,RACReplaySubject更像是RACSingnal 和 RACSubject 的合體版。RACSignal是冷信号,一旦被訂閱就會向訂閱者發送所有的值,這一點RACReplaySubject和RACSignal是一樣的。但是RACReplaySubject又有着RACSubject的特性,會把所有的值發送給多個訂閱者。當RACReplaySubject發送完之前存儲的曆史值之後,之後再發送信号的行為就和RACSubject完全一緻了。
三. RACSignal冷信号
在ReactiveCocoa v2.5中除了RACsignal信号以外,還有一些特殊的冷信号。
1.RACEmptySignal
@interface RACEmptySignal : RACSignal
+ (RACSignal *)empty;
@end複制代碼
複制
這個信号隻有一個empty方法。
+ (RACSignal *)empty {
#ifdef DEBUG
return [[[self alloc] init] setNameWithFormat:@"+empty"];
#else
static id singleton;
static dispatch_once_t pred;
dispatch_once(&pred, ^{
singleton = [[self alloc] init];
});
return singleton;
#endif
}複制代碼
複制
在debug模式下,傳回一個名字叫empty的信号。在release模式下,傳回一個單例的empty信号。
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendCompleted];
}];
}複制代碼
複制
RACEmptySignal信号一旦被訂閱就會發送sendCompleted。
2. RACReturnSignal
@interface RACReturnSignal : RACSignal
@property (nonatomic, strong, readonly) id value;
+ (RACSignal *)return:(id)value;
@end複制代碼
複制
RACReturnSignal信号的定義也很簡單,直接根據value的值傳回一個RACSignal。
+ (RACSignal *)return:(id)value {
#ifndef DEBUG
if (value == RACUnit.defaultUnit) {
static RACReturnSignal *unitSingleton;
static dispatch_once_t unitPred;
dispatch_once(&unitPred, ^{
unitSingleton = [[self alloc] init];
unitSingleton->_value = RACUnit.defaultUnit;
});
return unitSingleton;
} else if (value == nil) {
static RACReturnSignal *nilSingleton;
static dispatch_once_t nilPred;
dispatch_once(&nilPred, ^{
nilSingleton = [[self alloc] init];
nilSingleton->_value = nil;
});
return nilSingleton;
}
#endif
RACReturnSignal *signal = [[self alloc] init];
signal->_value = value;
#ifdef DEBUG
[signal setNameWithFormat:@"+return: %@", value];
#endif
return signal;
}複制代碼
複制
在debug模式下直接建立一個RACReturnSignal信号裡面的值存儲的是入參value。在release模式下,會依照value的值是否是空,來建立對應的單例RACReturnSignal。
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendNext:self.value];
[subscriber sendCompleted];
}];
}複制代碼
複制
RACReturnSignal在被訂閱的時候,就隻會發送一個value值的信号,發送完畢之後就sendCompleted。
3. RACDynamicSignal
這個信号是建立RACSignal createSignal:的真身。關于RACDynamicSignal詳細過程請看第一篇文章。
4. RACErrorSignal
@interface RACErrorSignal : RACSignal
@property (nonatomic, strong, readonly) NSError *error;
+ (RACSignal *)error:(NSError *)error;
@end複制代碼
複制
RACErrorSignal信号裡面就存儲了一個NSError。
+ (RACSignal *)error:(NSError *)error {
RACErrorSignal *signal = [[self alloc] init];
signal->_error = error;
#ifdef DEBUG
[signal setNameWithFormat:@"+error: %@", error];
#else
signal.name = @"+error:";
#endif
return signal;
}複制代碼
複制
RACErrorSignal初始化的時候把外界傳進來的Error儲存起來。當被訂閱的時候就發送這個Error出去。
5. RACChannelTerminal
@interface RACChannelTerminal : RACSignal <RACSubscriber>
- (id)init __attribute__((unavailable("Instantiate a RACChannel instead")));
@property (nonatomic, strong, readonly) RACSignal *values;
@property (nonatomic, strong, readonly) id<RACSubscriber> otherTerminal;
- (id)initWithValues:(RACSignal *)values otherTerminal:(id<RACSubscriber>)otherTerminal;
@end複制代碼
複制
RACChannelTerminal在RAC日常開發中,用來雙向綁定的。它和RACSubject一樣,既繼承自RACSignal,同樣又遵守RACSubscriber協定。雖然具有RACSubject的發送和接收信号的特性,但是它依舊是冷信号,因為它無法一對多,它發送信号還是隻能一對一。
RACChannelTerminal無法手動初始化,需要靠RACChannel去初始化。
- (id)init {
self = [super init];
if (self == nil) return nil;
RACReplaySubject *leadingSubject = [[RACReplaySubject replaySubjectWithCapacity:0] setNameWithFormat:@"leadingSubject"];
RACReplaySubject *followingSubject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"followingSubject"];
[[leadingSubject ignoreValues] subscribe:followingSubject];
[[followingSubject ignoreValues] subscribe:leadingSubject];
_leadingTerminal = [[[RACChannelTerminal alloc] initWithValues:leadingSubject otherTerminal:followingSubject] setNameWithFormat:@"leadingTerminal"];
_followingTerminal = [[[RACChannelTerminal alloc] initWithValues:followingSubject otherTerminal:leadingSubject] setNameWithFormat:@"followingTerminal"];
return self;
}複制代碼
複制
在RACChannel的初始化中會調用RACChannelTerminal的initWithValues:方法,這裡的入參都是RACReplaySubject類型的。是以訂閱RACChannelTerminal過程的時候:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
return [self.values subscribe:subscriber];
}複制代碼
複制
self.values其實就是一個RACReplaySubject,就相當于訂閱RACReplaySubject。訂閱過程同上面RACReplaySubject的訂閱過程。
- (void)sendNext:(id)value {
[self.otherTerminal sendNext:value];
}
- (void)sendError:(NSError *)error {
[self.otherTerminal sendError:error];
}
- (void)sendCompleted {
[self.otherTerminal sendCompleted];
}複制代碼
複制
self.otherTerminal也是RACReplaySubject類型的,RACChannelTerminal管道兩邊都是RACReplaySubject類型的信号。當RACChannelTerminal開始sendNext,sendError,sendCompleted是調用的管道另外一個的RACReplaySubject進行這些對應的操作的。
平時使用RACChannelTerminal的地方在View和ViewModel的雙向綁定上面。
例如在登入界面,輸入密碼文本框TextField和ViewModel的Password雙向綁定
RACChannelTerminal *passwordTerminal = [_passwordTextField rac_newTextChannel];
RACChannelTerminal *viewModelPasswordTerminal = RACChannelTo(_viewModel, password);
[viewModelPasswordTerminal subscribe:passwordTerminal];
[passwordTerminal subscribe:viewModelPasswordTerminal];複制代碼
複制
雙向綁定的兩個信号都會因為對方的改變而收到新的信号。
至此所有的RACSignal的分類就都理順了,按照冷信号和熱信号的分類也分好了。
四. 冷信号是如何轉換成熱信号的
為何有時候需要把冷信号轉換成熱信号呢?詳情可以看這篇文章裡面舉的例子:細說ReactiveCocoa的冷信号與熱信号(二):為什麼要區分冷熱信号
根據RACSignal訂閱和發送信号的流程,我們可以知道,每訂閱一次冷信号RACSignal,就會執行一次didSubscribe閉包。這個時候就是可能出現問題的地方。如果RACSignal是被用于網絡請求,那麼在didSubscribe閉包裡面會被重複的請求。上面文中提到了信号被訂閱了6次,網絡請求也會請求6次。這并不是我們想要的。網絡請求隻需要請求1次。
如何做到信号隻執行一次didSubscribe閉包,最重要的一點是RACSignal冷信号隻能被訂閱一次。由于冷信号隻能一對一,那麼想一對多就隻能交給熱信号去處理了。這時候就需要把冷信号轉換成熱信号。
在ReactiveCocoa v2.5中,冷信号轉換成熱信号需要用到RACMulticastConnection 這個類。
@interface RACMulticastConnection : NSObject
@property (nonatomic, strong, readonly) RACSignal *signal;
- (RACDisposable *)connect;
- (RACSignal *)autoconnect;
@end
@interface RACMulticastConnection () {
RACSubject *_signal;
int32_t volatile _hasConnected;
}
@property (nonatomic, readonly, strong) RACSignal *sourceSignal;
@property (strong) RACSerialDisposable *serialDisposable;
@end複制代碼
複制
看看RACMulticastConnection類的定義。最主要的是儲存了兩個信号,一個是RACSubject,一個是sourceSignal(RACSignal類型)。在.h中暴露給外面的是RACSignal,在.m中實際使用的是RACSubject。看它的定義就能猜到接下去它會做什麼:用sourceSignal去發送信号,内部再用RACSubject去訂閱sourceSignal,然後RACSubject會把sourceSignal的信号值依次發給它的訂閱者們。
用一個不恰當的比喻來形容RACMulticastConnection,它就像上圖中心的那個“地球”,“地球”就是訂閱了sourceSignal的RACSubject,RACSubject把值發送給各個“連接配接”者(訂閱者)。sourceSignal隻有内部的RACSubject一個訂閱者,是以就完成了我們隻想執行didSubscribe閉包一次,但是能把值發送給各個訂閱者的願望。
在看看RACMulticastConnection的初始化
- (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
NSCParameterAssert(source != nil);
NSCParameterAssert(subject != nil);
self = [super init];
if (self == nil) return nil;
_sourceSignal = source;
_serialDisposable = [[RACSerialDisposable alloc] init];
_signal = subject;
return self;
}複制代碼
複制
初始化方法就是把外界傳進來的RACSignal儲存成sourceSignal,把外界傳進來的RACSubject儲存成自己的signal屬性。
RACMulticastConnection有兩個連接配接方法。
- (RACDisposable *)connect {
BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);
if (shouldConnect) {
self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
}
return self.serialDisposable;
}複制代碼
複制
這裡出現了一個不多見的函數OSAtomicCompareAndSwap32Barrier,它是原子運算的操作符,主要用于Compare and swap,原型如下:
bool OSAtomicCompareAndSwap32Barrier( int32_t __oldValue, int32_t __newValue, volatile int32_t *__theValue );複制代碼
複制
關鍵字volatile隻確定每次擷取volatile變量時都是從記憶體加載變量,而不是使用寄存器裡面的值,但是它不保證代碼通路變量是正确的。
如果用僞代碼去實作這個函數:
f (*__theValue == __oldValue) {
*__theValue = __newValue;
return 1;
} else {
return 0;
}複制代碼
複制
如果_hasConnected為0,意味着沒有連接配接,OSAtomicCompareAndSwap32Barrier傳回1,shouldConnect就應該連接配接。如果_hasConnected為1,意味着已經連接配接過了,OSAtomicCompareAndSwap32Barrier傳回0,shouldConnect不會再次連接配接。
所謂連接配接的過程就是RACMulticastConnection内部用RACSubject訂閱self.sourceSignal。sourceSignal是RACSignal,會把訂閱者RACSubject儲存到RACPassthroughSubscriber中,sendNext的時候就會調用RACSubject sendNext,這時就會把sourceSignal的信号都發送給各個訂閱者了。
- (RACSignal *)autoconnect {
__block volatile int32_t subscriberCount = 0;
return [[RACSignal
createSignal:^(id<RACSubscriber> subscriber) {
OSAtomicIncrement32Barrier(&subscriberCount);
RACDisposable *subscriptionDisposable = [self.signal subscribe:subscriber];
RACDisposable *connectionDisposable = [self connect];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
if (OSAtomicDecrement32Barrier(&subscriberCount) == 0) {
[connectionDisposable dispose];
}
}];
}]
setNameWithFormat:@"[%@] -autoconnect", self.signal.name];
}複制代碼
複制
OSAtomicIncrement32Barrier 和 OSAtomicDecrement32Barrier也是原子運算的操作符,分别是+1和-1操作。在autoconnect為了保證線程安全,用到了一個subscriberCount的類似信号量的volatile變量,保證第一個訂閱者能連接配接上。傳回的新的信号的訂閱者訂閱RACSubject,RACSubject也會去訂閱内部的sourceSignal。
把冷信号轉換成熱信号用以下5種方式,5種方法都會用到RACMulticastConnection。接下來一一分析它們的具體實作。
1. multicast:
- (RACMulticastConnection *)multicast:(RACSubject *)subject {
[subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
return connection;
}複制代碼
複制
multicast:的操作就是初始化一個RACMulticastConnection對象,SourceSignal是self,内部的RACSubject是入參subject。
RACMulticastConnection *connection = [signal multicast:[RACSubject subject]];
[connection.signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
[connection connect];複制代碼
複制
調用 multicast:把冷信号轉換成熱信号有一個點不友善的是,需要自己手動connect。注意轉換完之後的熱信号在RACMulticastConnection的signal屬性中,是以需要訂閱的是connection.signal。
2. publish
- (RACMulticastConnection *)publish {
RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
RACMulticastConnection *connection = [self multicast:subject];
return connection;
}複制代碼
複制
publish方法隻不過是去調用了multicast:方法,publish内部會建立好一個RACSubject,并把它當成入參傳遞給RACMulticastConnection。
RACMulticastConnection *connection = [signal publish];
[connection.signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
[connection connect];複制代碼
複制
同樣publish方法也需要手動的調用connect方法。
3. replay
- (RACSignal *)replay {
RACReplaySubject *subject = [[RACReplaySubject subject] setNameWithFormat:@"[%@] -replay", self.name];
RACMulticastConnection *connection = [self multicast:subject];
[connection connect];
return connection.signal;
}複制代碼
複制
replay方法會把RACReplaySubject當成RACMulticastConnection的RACSubject傳遞進去,初始化好了RACMulticastConnection,再自動調用connect方法,傳回的信号就是轉換好的熱信号,即RACMulticastConnection裡面的RACSubject信号。
這裡必須是RACReplaySubject,因為在replay方法裡面先connect了。如果用RACSubject,那信号在connect之後就會通過RACSubject把原信号發送給各個訂閱者了。用RACReplaySubject把信号儲存起來,即使replay方法裡面先connect,訂閱者後訂閱也是可以拿到之前的信号值的。
4. replayLast
- (RACSignal *)replayLast {
RACReplaySubject *subject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"[%@] -replayLast", self.name];
RACMulticastConnection *connection = [self multicast:subject];
[connection connect];
return connection.signal;
}複制代碼
複制
replayLast 和 replay的實作基本一樣,唯一的不同就是傳入的RACReplaySubject的Capacity是1,意味着隻能儲存最新的值。是以使用replayLast,訂閱之後就隻能拿到原信号最新的值。
5. replayLazily
- (RACSignal *)replayLazily {
RACMulticastConnection *connection = [self multicast:[RACReplaySubject subject]];
return [[RACSignal
defer:^{
[connection connect];
return connection.signal;
}]
setNameWithFormat:@"[%@] -replayLazily", self.name];
}複制代碼
複制
replayLazily 的實作也和 replayLast、replay實作很相似。隻不過把connect放到了defer的操作裡面去了。
defer操作的實作如下:
+ (RACSignal *)defer:(RACSignal * (^)(void))block {
NSCParameterAssert(block != NULL);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [block() subscribe:subscriber];
}] setNameWithFormat:@"+defer:"];
}複制代碼
複制
defer 單詞的字面意思是延遲的。也和這個函數實作的效果是一緻的。隻有當defer傳回的新信号被訂閱的時候,才會執行入參block( )閉包。訂閱者會訂閱這個block( )閉包的傳回值RACSignal。
block( )閉包被延遲建立RACSignal了,這就是defer。如果block( )閉包含有和時間有關的操作,或者副作用,想要延遲執行,就可以用defer。
還有一個類似的操作,then
- (RACSignal *)then:(RACSignal * (^)(void))block {
NSCParameterAssert(block != nil);
return [[[self
ignoreValues]
concat:[RACSignal defer:block]]
setNameWithFormat:@"[%@] -then:", self.name];
}複制代碼
複制
then的操作也是延遲,隻不過它是把block( )閉包延遲到原信号發送complete之後。通過then信号變化得到的新的信号,在原信号發送值的期間的時間内,都不會發送任何值,因為ignoreValues了,一旦原信号sendComplete之後,就緊接着block( )閉包産生的信号。
回到replayLazily操作上來,作用同樣是把冷信号轉換成熱信号,隻不過sourceSignal是在傳回的新信号第一次被訂閱的時候才被訂閱。原因就是defer延遲了block( )閉包的執行了。
最後
關于RACSignal的變換操作還剩下高階信号操作,下篇接着繼續分析。最後請大家多多指教。