前言
ReactiveCocoa是一個(第一個?)将函數響應式程式設計範例帶入Objective-C的開源庫。ReactiveCocoa是由Josh Abernathy和Justin Spahr-Summers 兩位大神在對GitHub for Mac的開發過程中編寫的。Justin Spahr-Summers 大神在2011年11月13号下午12點35分進行的第一次送出,直到2013年2月13日上午3點05分釋出了其1.0 release,達到了第一個重要裡程碑。ReactiveCocoa社群也非常活躍,目前最新版已經完成了ReactiveCocoa 5.0.0-alpha.3,目前在5.0.0-alpha.4開發中。
ReactiveCocoa v2.5 是公認的Objective-C最穩定的版本,是以被廣大的以OC為主要語言的用戶端選中使用。ReactiveCocoa v3.x主要是基于Swift 1.2的版本,而ReactiveCocoa v4.x 主要基于Swift 2.x,ReactiveCocoa 5.0就全面支援Swift 3.0,也許還有以後的Swift 4.0。接下來幾篇部落格先以ReactiveCocoa v2.5版本為例子,分析一下OC版的RAC具體實作(也許分析完了RAC 5.0就到來了)。也算是寫在ReactiveCocoa 5.0正式版到來前夕的祝福吧。
一. 什麼是ReactiveCocoa?
ReactiveCocoa(其簡稱為RAC)是由Github 開源的一個應用于iOS和OS X開發的新架構。RAC具有函數式程式設計(FP)和響應式程式設計(RP)的特性。它主要吸取了.Net的 Reactive Extensions的設計和實作。
ReactiveCocoa 的宗旨是Streams of values over time ,随着時間變化而不斷流動的資料流。
ReactiveCocoa 主要解決了以下這些問題:
- UI資料綁定
UI控件通常需要綁定一個事件,RAC可以很友善的綁定任何資料流到控件上。
- 使用者互動事件綁定
RAC為可互動的UI控件提供了一系列能發送Signal信号的方法。這些資料流會在使用者互動中互相傳遞。
- 解決狀态以及狀态之間依賴過多的問題
有了RAC的綁定之後,可以不用在關心各種複雜的狀态,isSelect,isFinish……也解決了這些狀态在後期很難維護的問題。
- 消息傳遞機制的大統一
OC中程式設計原來消息傳遞機制有以下幾種:Delegate,Block Callback,Target-Action,Timers,KVO,objc上有一篇關于OC中這5種消息傳遞方式改如何選擇的文章Communication Patterns,推薦大家閱讀。現在有了RAC之後,以上這5種方式都可以統一用RAC來處理。
二. RAC中的核心RACSignal
ReactiveCocoa 中最核心的概念之一就是信号RACStream。RACRACStream中有兩個子類——RACSignal 和 RACSequence。本文先來分析RACSignal。
我們會經常看到以下的代碼:
RACSignal *signal = [RACSignal createSignal:
^RACDisposable *(id<RACSubscriber> subscriber)
{
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal dispose");
}];
}];
RACDisposable *disposable = [signal subscribeNext:^(id x) {
NSLog(@"subscribe value = %@", x);
} error:^(NSError *error) {
NSLog(@"error: %@", error);
} completed:^{
NSLog(@"completed");
}];
[disposable dispose];複制代碼
複制
這是一個RACSignal被訂閱的完整過程。被訂閱的過程中,究竟發生了什麼?
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
return [RACDynamicSignal createSignal:didSubscribe];
}複制代碼
複制
RACSignal調用createSignal的時候,會調用RACDynamicSignal的createSignal的方法。
RACDynamicSignal是RACSignal的子類。createSignal後面的參數是一個block。
(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe複制代碼
複制
block的傳回值是RACDisposable類型,block名叫didSubscribe。block的唯一一個參數是id類型的subscriber,這個subscriber是必須遵循RACSubscriber協定的。
RACSubscriber是一個協定,其下有以下4個協定方法:
@protocol RACSubscriber <NSObject>
@required
- (void)sendNext:(id)value;
- (void)sendError:(NSError *)error;
- (void)sendCompleted;
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;
@end複制代碼
複制
是以建立Signal的任務就全部落在了RACSignal的子類RACDynamicSignal上了。
@interface RACDynamicSignal ()
// The block to invoke for each subscriber.
@property (nonatomic, copy, readonly) RACDisposable * (^didSubscribe)(id<RACSubscriber> subscriber);
@end複制代碼
複制
RACDynamicSignal這個類很簡單,裡面就儲存了一個名字叫didSubscribe的block。
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
RACDynamicSignal *signal = [[self alloc] init];
signal->_didSubscribe = [didSubscribe copy];
return [signal setNameWithFormat:@"+createSignal:"];
}複制代碼
複制
這個方法中建立了一個RACDynamicSignal對象signal,并把傳進來的didSubscribe這個block儲存進剛剛建立對象signal裡面的didSubscribe屬性中。最後再給signal命名+createSignal:。
- (instancetype)setNameWithFormat:(NSString *)format, ... {
if (getenv("RAC_DEBUG_SIGNAL_NAMES") == NULL) return self;
NSCParameterAssert(format != nil);
va_list args;
va_start(args, format);
NSString *str = [[NSString alloc] initWithFormat:format arguments:args];
va_end(args);
self.name = str;
return self;
}複制代碼
複制
setNameWithFormat是RACStream裡面的方法,由于RACDynamicSignal繼承自RACSignal,是以它也能調用這個方法。
RACSignal的block就這樣被儲存起來了,那什麼時候會被執行呢?
block閉包在訂閱的時候才會被“釋放”出來。
RACSignal調用subscribeNext方法,傳回一個RACDisposable。
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
NSCParameterAssert(nextBlock != NULL);
NSCParameterAssert(errorBlock != NULL);
NSCParameterAssert(completedBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
return [self subscribe:o];
}複制代碼
複制
在這個方法中會建立一個RACSubscriber對象,并傳入nextBlock,errorBlock,completedBlock。
@interface RACSubscriber ()
// These callbacks should only be accessed while synchronized on self.
@property (nonatomic, copy) void (^next)(id value);
@property (nonatomic, copy) void (^error)(NSError *error);
@property (nonatomic, copy) void (^completed)(void);
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;
@end複制代碼
複制
RACSubscriber這個類很簡單,裡面隻有4個屬性,分别是nextBlock,errorBlock,completedBlock和一個RACCompoundDisposable信号。
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
RACSubscriber *subscriber = [[self alloc] init];
subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];
return subscriber;
}複制代碼
複制
subscriberWithNext方法把傳入的3個block都儲存分别儲存到自己對應的block中。
RACSignal調用subscribeNext方法,最後return的時候,會調用[self subscribe:o],這裡實際是調用了RACDynamicSignal類裡面的subscribe方法。
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}複制代碼
複制
RACDisposable有3個子類,其中一個就是RACCompoundDisposable。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAjM2EzLcd3LcJzLcJzdllmVldWYtl2PnVGcq5CaxsWajtWZo5GevwFM1EDN2UjMtUGall3LcVmdhNXLwRHdo9CXt92YucWbpRWdvx2Yx5yazF2Lc9CX6MHc0RHaiojIsJye.jpeg)
@interface RACCompoundDisposable : RACDisposable
+ (instancetype)compoundDisposable;
+ (instancetype)compoundDisposableWithDisposables:(NSArray *)disposables;
- (void)addDisposable:(RACDisposable *)disposable;
- (void)removeDisposable:(RACDisposable *)disposable;
@end複制代碼
複制
RACCompoundDisposable雖然是RACDisposable的子類,但是它裡面可以加入多個RACDisposable對象,在必要的時候可以一口氣都調用dispose方法來銷毀信号。當RACCompoundDisposable對象被dispose的時候,也會自動dispose容器内的所有RACDisposable對象。
RACPassthroughSubscriber是一個私有的類。
@interface RACPassthroughSubscriber : NSObject <RACSubscriber>
@property (nonatomic, strong, readonly) id<RACSubscriber> innerSubscriber;
@property (nonatomic, unsafe_unretained, readonly) RACSignal *signal;
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable;
@end複制代碼
複制
RACPassthroughSubscriber類就隻有這一個方法。目的就是為了把所有的信号事件從一個訂閱者subscriber傳遞給另一個還沒有disposed的訂閱者subscriber。
RACPassthroughSubscriber類中儲存了3個非常重要的對象,RACSubscriber,RACSignal,RACCompoundDisposable。RACSubscriber是待轉發的信号的訂閱者subscriber。RACCompoundDisposable是訂閱者的銷毀對象,一旦它被disposed了,innerSubscriber就再也接受不到事件流了。
這裡需要注意的是内部還儲存了一個RACSignal,并且它的屬性是unsafe_unretained。這裡和其他兩個屬性有差別, 其他兩個屬性都是strong的。這裡之是以不是weak,是因為引用RACSignal僅僅隻是一個DTrace probes動态跟蹤技術的探針。如果設定成weak,會造成沒必要的性能損失。是以這裡僅僅是unsafe_unretained就夠了。
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
NSCParameterAssert(subscriber != nil);
self = [super init];
if (self == nil) return nil;
_innerSubscriber = subscriber;
_signal = signal;
_disposable = disposable;
[self.innerSubscriber didSubscribeWithDisposable:self.disposable];
return self;
}複制代碼
複制
回到RACDynamicSignal類裡面的subscribe方法中,現在建立好了RACCompoundDisposable和RACPassthroughSubscriber對象了。
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}複制代碼
複制
RACScheduler.subscriptionScheduler是一個全局的單例。
+ (instancetype)subscriptionScheduler {
static dispatch_once_t onceToken;
static RACScheduler *subscriptionScheduler;
dispatch_once(&onceToken, ^{
subscriptionScheduler = [[RACSubscriptionScheduler alloc] init];
});
return subscriptionScheduler;
}複制代碼
複制
RACScheduler再繼續調用schedule方法。
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
block();
return nil;
}複制代碼
複制
+ (BOOL)isOnMainThread {
return [NSOperationQueue.currentQueue isEqual:NSOperationQueue.mainQueue] || [NSThread isMainThread];
}
+ (instancetype)currentScheduler {
RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
if (scheduler != nil) return scheduler;
if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;
return nil;
}複制代碼
複制
在取currentScheduler的過程中,會判斷currentScheduler是否存在,和是否在主線程中。如果都沒有,那麼就會調用背景backgroundScheduler去執行schedule。
schedule的入參就是一個block,執行schedule的時候會去執行block。也就是會去執行:
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];複制代碼
複制
這兩句關鍵的語句。之前信号裡面儲存的block就會在此處被“釋放”執行。self.didSubscribe(subscriber)這一句就執行了信号儲存的didSubscribe閉包。
在didSubscribe閉包中有sendNext,sendError,sendCompleted,執行這些語句會分别調用RACPassthroughSubscriber裡面對應的方法。
- (void)sendNext:(id)value {
if (self.disposable.disposed) return;
if (RACSIGNAL_NEXT_ENABLED()) {
RACSIGNAL_NEXT(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString([value description]));
}
[self.innerSubscriber sendNext:value];
}
- (void)sendError:(NSError *)error {
if (self.disposable.disposed) return;
if (RACSIGNAL_ERROR_ENABLED()) {
RACSIGNAL_ERROR(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString(error.description));
}
[self.innerSubscriber sendError:error];
}
- (void)sendCompleted {
if (self.disposable.disposed) return;
if (RACSIGNAL_COMPLETED_ENABLED()) {
RACSIGNAL_COMPLETED(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description));
}
[self.innerSubscriber sendCompleted];
}複制代碼
複制
這個時候的訂閱者是RACPassthroughSubscriber。RACPassthroughSubscriber裡面的innerSubscriber才是最終的實際訂閱者,RACPassthroughSubscriber會把值再繼續傳遞給innerSubscriber。
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
nextBlock(value);
}
}
- (void)sendError:(NSError *)e {
@synchronized (self) {
void (^errorBlock)(NSError *) = [self.error copy];
[self.disposable dispose];
if (errorBlock == nil) return;
errorBlock(e);
}
}
- (void)sendCompleted {
@synchronized (self) {
void (^completedBlock)(void) = [self.completed copy];
[self.disposable dispose];
if (completedBlock == nil) return;
completedBlock();
}
}複制代碼
複制
innerSubscriber是RACSubscriber,調用sendNext的時候會先把自己的self.next閉包copy一份,再調用,而且整個過程還是線程安全的,用@synchronized保護着。最終訂閱者的閉包在這裡被調用。
sendError和sendCompleted也都是同理。
總結一下:
- RACSignal調用subscribeNext方法,建立一個RACSubscriber。
- 建立的RACSubscriber會copy,nextBlock,errorBlock,completedBlock存在自己的屬性變量中。
- RACSignal的子類RACDynamicSignal調用subscribe方法。
- 建立RACCompoundDisposable和RACPassthroughSubscriber對象。RACPassthroughSubscriber分别儲存對RACSignal,RACSubscriber,RACCompoundDisposable的引用,注意對RACSignal的引用是unsafe_unretained的。
- RACDynamicSignal調用didSubscribe閉包。先調用RACPassthroughSubscriber的相應的sendNext,sendError,sendCompleted方法。
- RACPassthroughSubscriber再去調用self.innerSubscriber,即RACSubscriber的nextBlock,errorBlock,completedBlock。注意這裡調用同樣是先copy一份,再調用閉包執行。
三. RACSignal操作的核心bind實作
在RACSignal的源碼裡面包含了兩個基本操作,concat和zipWith。不過在分析這兩個操作之前,先來分析一下更加核心的一個函數,bind操作。
先來說說bind函數的作用:
- 會訂閱原始的信号。
- 任何時刻原始信号發送一個值,都會綁定的block轉換一次。
- 一旦綁定的block轉換了值變成信号,就立即訂閱,并把值發給訂閱者subscriber。
- 一旦綁定的block要終止綁定,原始的信号就complete。
- 當所有的信号都complete,發送completed信号給訂閱者subscriber。
- 如果中途信号出現了任何error,都要把這個錯誤發送給subscriber
- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
NSCParameterAssert(block != NULL);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACStreamBindBlock bindingBlock = block();
NSMutableArray *signals = [NSMutableArray arrayWithObject:self];
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) { /*這裡暫時省略*/ };
void (^addSignal)(RACSignal *) = ^(RACSignal *signal) { /*這裡暫時省略*/ };
@autoreleasepool {
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];
RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
// Manually check disposal to handle synchronous errors.
if (compoundDisposable.disposed) return;
BOOL stop = NO;
id signal = bindingBlock(x, &stop);
@autoreleasepool {
if (signal != nil) addSignal(signal);
if (signal == nil || stop) {
[selfDisposable dispose];
completeSignal(self, selfDisposable);
}
}
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(self, selfDisposable);
}
}];
selfDisposable.disposable = bindingDisposable;
}
return compoundDisposable;
}] setNameWithFormat:@"[%@] -bind:", self.name];
}複制代碼
複制
為了弄清楚bind函數究竟做了什麼,寫出測試代碼:
RACSignal *signal = [RACSignal createSignal:
^RACDisposable *(id<RACSubscriber> subscriber)
{
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal dispose");
}];
}];
RACSignal *bindSignal = [signal bind:^RACStreamBindBlock{
return ^RACSignal *(NSNumber *value, BOOL *stop){
value = @(value.integerValue * 2);
return [RACSignal return:value];
};
}];
[bindSignal subscribeNext:^(id x) {
NSLog(@"subscribe value = %@", x);
}];複制代碼
複制
由于前面第一章節詳細講解了RACSignal的建立和訂閱的全過程,這個也為了方法講解,建立RACDynamicSignal,RACCompoundDisposable,RACPassthroughSubscriber這些都略過,這裡着重分析一下bind的各個閉包傳遞建立和訂閱的過程。
為了防止接下來的分析會讓讀者看暈,這裡先把要用到的block進行編号。
RACSignal *signal = [RACSignal createSignal:
^RACDisposable *(id<RACSubscriber> subscriber)
{
// block 1
}
RACSignal *bindSignal = [signal bind:^RACStreamBindBlock{
// block 2
return ^RACSignal *(NSNumber *value, BOOL *stop){
// block 3
};
}];
[bindSignal subscribeNext:^(id x) {
// block 4
}];
- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
// block 5
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
// block 6
RACStreamBindBlock bindingBlock = block();
NSMutableArray *signals = [NSMutableArray arrayWithObject:self];
void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) {
// block 7
};
void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
// block 8
RACDisposable *disposable = [signal subscribeNext:^(id x) {
// block 9
}];
};
@autoreleasepool {
RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
// block 10
id signal = bindingBlock(x, &stop);
@autoreleasepool {
if (signal != nil) addSignal(signal);
if (signal == nil || stop) {
[selfDisposable dispose];
completeSignal(self, selfDisposable);
}
}
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(self, selfDisposable);
}
}];
}
return compoundDisposable;
}] ;
}複制代碼
複制
先建立信号signal,didSubscribe把block1 copy儲存起來。
當信号調用bind進行綁定,會調用block5,didSubscribe把block6 copy儲存起來。
當訂閱者開始訂閱bindSignal的時候,流程如下:
- bindSignal執行didSubscribe的block,即執行block6。
- 在block6 的第一句代碼,就是調用RACStreamBindBlock bindingBlock = block(),這裡的block是外面傳進來的block2,于是開始調用block2。執行完block2,會傳回一個RACStreamBindBlock的對象。
- 由于是signal調用的bind函數,是以bind函數裡面的self就是signal,在bind内部訂閱了signal的信号。subscribeNext是以會執行block1。
- 執行block1,sendNext調用訂閱者subscriber的nextBlock,于是開始執行block10。
- block10中會先調用bindingBlock,這個是之前調用block2的傳回值,這個RACStreamBindBlock對象裡面儲存的是block3。是以開始調用block3。
- 在block3中入參是一個value,這個value是signal中sendNext中發出來的value的值,在block3中可以對value進行變換,變換完成後,傳回一個新的信号signal'。
- 如果傳回的signal'為空,則會調用completeSignal,即調用block7。block7中會發送sendCompleted。如果傳回的signal'不為空,則會調用addSignal,即調用block8。block8中會繼續訂閱signal'。執行block9。
- block9 中會sendNext,這裡的subscriber是block6的入參,于是對subscriber調用sendNext,會調用到bindSignal的訂閱者的block4中。
- block9 中執行完sendNext,還會調用sendCompleted。這裡的是在執行block9裡面的completed閉包。completeSignal(signal, selfDisposable);然後又會調用completeSignal,即block7。
- 執行完block7,就完成了一次從signal 發送信号sendNext的全過程。
bind整個流程就完成了。
四. RACSignal基本操作concat和zipWith實作
接下來再來分析RACSignal中另外2個基本操作。
1. concat
寫出測試代碼:
RACSignal *signal = [RACSignal createSignal:
^RACDisposable *(id<RACSubscriber> subscriber)
{
[subscriber sendNext:@1];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal dispose");
}];
}];
RACSignal *signals = [RACSignal createSignal:
^RACDisposable *(id<RACSubscriber> subscriber)
{
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@6];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal dispose");
}];
}];
RACSignal *concatSignal = [signal concat:signals];
[concatSignal subscribeNext:^(id x) {
NSLog(@"subscribe value = %@", x);
}];複制代碼
複制
concat操作就是把兩個信号合并起來。注意合并有先後順序。
- (RACSignal *)concat:(RACSignal *)signal {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
// 發送第一個信号的值
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
// 訂閱第二個信号
RACDisposable *concattedDisposable = [signal subscribe:subscriber];
serialDisposable.disposable = concattedDisposable;
}];
serialDisposable.disposable = sourceDisposable;
return serialDisposable;
}] setNameWithFormat:@"[%@] -concat: %@", self.name, signal];
}複制代碼
複制
合并前,signal和signals分别都把各自的didSubscribe儲存copy起來。
合并之後,合并之後新的信号的didSubscribe會把block儲存copy起來。
當合并之後的信号被訂閱的時候:
- 調用新的合并信号的didSubscribe。
- 由于是第一個信号調用的concat方法,是以block中的self是前一個信号signal。合并信号的didSubscribe會先訂閱signal。
- 由于訂閱了signal,于是開始執行signal的didSubscribe,sendNext,sendError。
- 目前一個信号signal發送sendCompleted之後,就會開始訂閱後一個信号signals,調用signals的didSubscribe。
- 由于訂閱了後一個信号,于是後一個信号signals開始發送sendNext,sendError,sendCompleted。
這樣兩個信号就前後有序的拼接到了一起。
這裡有一點需要注意的是,兩個信号concat在一起之後,新的信号的結束信号在第二個信号結束的時候才結束。看上圖描述,新的信号的發送長度等于前面兩個信号長度之和,concat之後的新信号的結束信号也就是第二個信号的結束信号。
2. zipWith
寫出測試代碼:
RACSignal *concatSignal = [signal zipWith:signals];
[concatSignal subscribeNext:^(id x) {
NSLog(@"subscribe value = %@", x);
}];複制代碼
複制
源碼如下:
- (RACSignal *)zipWith:(RACSignal *)signal {
NSCParameterAssert(signal != nil);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
__block BOOL selfCompleted = NO;
NSMutableArray *selfValues = [NSMutableArray array];
__block BOOL otherCompleted = NO;
NSMutableArray *otherValues = [NSMutableArray array];
void (^sendCompletedIfNecessary)(void) = ^{
@synchronized (selfValues) {
BOOL selfEmpty = (selfCompleted && selfValues.count == 0);
BOOL otherEmpty = (otherCompleted && otherValues.count == 0);
// 如果任意一個信号完成并且數組裡面空了,就整個信号算完成
if (selfEmpty || otherEmpty) [subscriber sendCompleted];
}
};
void (^sendNext)(void) = ^{
@synchronized (selfValues) {
// 數組裡面的空了就傳回。
if (selfValues.count == 0) return;
if (otherValues.count == 0) return;
// 每次都取出兩個數組裡面的第0位的值,打包成元組
RACTuple *tuple = RACTuplePack(selfValues[0], otherValues[0]);
[selfValues removeObjectAtIndex:0];
[otherValues removeObjectAtIndex:0];
// 把元組發送出去
[subscriber sendNext:tuple];
sendCompletedIfNecessary();
}
};
// 訂閱第一個信号
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
@synchronized (selfValues) {
// 把第一個信号的值加入到數組中
[selfValues addObject:x ?: RACTupleNil.tupleNil];
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (selfValues) {
// 訂閱完成時判斷是否要發送完成信号
selfCompleted = YES;
sendCompletedIfNecessary();
}
}];
// 訂閱第二個信号
RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
@synchronized (selfValues) {
// 把第二個信号加入到數組中
[otherValues addObject:x ?: RACTupleNil.tupleNil];
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (selfValues) {
// 訂閱完成時判斷是否要發送完成信号
otherCompleted = YES;
sendCompletedIfNecessary();
}
}];
return [RACDisposable disposableWithBlock:^{
// 銷毀兩個信号
[selfDisposable dispose];
[otherDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -zipWith: %@", self.name, signal];
}複制代碼
複制
當把兩個信号通過zipWith之後,就像上面的那張圖一樣,拉鍊的兩邊被中間的拉索拉到了一起。既然是拉鍊,那麼一一的位置是有對應的,上面的拉鍊第一個位置隻能對着下面拉鍊第一個位置,這樣拉鍊才能拉到一起去。
具體實作:
zipWith裡面有兩個數組,分别會存儲兩個信号的值。
- 一旦訂閱了zipWith之後的信号,就開始執行didSubscribe閉包。
- 在閉包中會先訂閱第一個信号。這裡假設第一個信号比第二個信号先發出一個值。第一個信号發出來的每一個值都會被加入到第一個數組中儲存起來,然後調用sendNext( )閉包。在sendNext( )閉包中,會先判斷兩個數組裡面是否都為空,如果有一個數組裡面是空的,就return。由于第二個信号還沒有發送值,即第二個信号的數組裡面是空的,是以這裡第一個值發送不出來。于是第一個信号被訂閱之後,發送的值存儲到了第一個數組裡面了,沒有發出去。
- 第二個信号的值緊接着發出來了,第二個信号每發送一次值,也會存儲到第二個數組中,但是這個時候再調用sendNext( )閉包的時候,不會再return了,因為兩個數組裡面都有值了,兩個數組的第0号位置都有一個值了。有值以後就打包成元組RACTuple發送出去。并清空兩個數組0号位置存儲的值。
- 以後兩個信号每次發送一個,就先存儲在數組中,隻要有“配對”的另一個信号,就一起打包成元組RACTuple發送出去。從圖中也可以看出,zipWith之後的新信号,每個信号的發送時刻是等于兩個信号最晚發出信号的時刻。
- 新信号的完成時間,是當兩者任意一個信号完成并且數組裡面為空,就算完成了。是以最後第一個信号發送的5的那個值就被丢棄了。
第一個信号依次發送的1,2,3,4的值和第二個信号依次發送的A,B,C,D的值,一一的合在了一起,就像拉鍊把他們拉在一起。由于5沒法配對,是以拉鍊也拉不上了。
五. 最後
本來這篇文章想把Map,combineLatest,flattenMap,flatten這些也一起分析了,但是後來看到RACSingnal的操作實在有點多,于是按照源碼的檔案分開了,這裡先把RACSignal檔案裡面的操作都分析完了。RACSignal檔案裡面的操作主要就bind,concat和zipWith三個操作。下一篇再分析分析RACSignal+Operations檔案裡面的所有操作。
請大家多多指教。