天天看點

編寫高品質JavaScript代碼之并發并發

參考書籍:《Effective JavaScript》

并發

在JavaScript中,編寫響應多個并發事件的程式的方法非常人性化,而且強大,因為它使用了一個簡單的執行模型(有時稱為事件隊列或事件循環并發)和被稱為異步的API。

不要阻塞I/O事件隊列

在一些語言中,我們會習慣性地編寫代碼來等待某個特定的輸入。

var text = downloadSync('http://example.com/file.txt');
console.log(text);
           

形如downloadSync這樣的函數被稱為同步函數(或阻塞函數)。程式會停止做任何工作,而等待它的輸入。在這個例子中,也就是等待從網絡下載下傳檔案的結果。由于在等待下載下傳完成的期間,計算機可以做其他有用的工作,是以這樣的語言通常為程式員提供一種方法來建立多個線程,即并行執行自己算。它允許程式的一部分停下來等待(阻塞)一個低速的輸入,而程式的另一部分可以繼續進行獨立的工作。

在JavaScript中,大多的I/O操作都提供了異步的或非阻塞的API。

downloadAsync('http://example.com/file.txt', function (text) {
    console.log(text);
});
           

該API初始化下載下傳程序,然後在内部系統資料庫中存儲了回調函數後立刻傳回,而不是被網絡請求阻塞。

JavaScript有時被稱為提供一個運作到完成機制(run-to-completion)的擔保。任何目前正在運作于共享上下文的使用者代碼,比如浏覽器中的單個Web頁面或者單個運作的Web伺服器執行個體,隻有在執行完成後才能調用下一個事件處理程式。實際上,系統維護了一個按事件發生順序排列的内部事件隊列,一次調用一個已注冊的回調函數。

以用戶端(mouse moved、file downloaded)和伺服器端(file read、path resolved)應用程式事件為例,随着事件的發生,它們被添加到應用程式的事件隊列的末尾。JavaScript系統使用一個内部循環機制來執行應用程式。該循環機制每次都拉取隊列底部的事件,也就是說,以接收到這些事件的順序來調用這些已注冊的JavaScript事件處理程式,并将事件的資料作為改事件處理程式的參數。

運作到完成機制擔保的好處是當代碼運作時,你完全掌握應用程式的狀态。你根本不必擔心一些變量和對象屬性的改變由于并發執行代碼而超出你的控制。并發程式設計在JavaScript中往往比使用線程和鎖的C++、Java或C#要容易得多。

然而,運作到完成機制的不足是,實際上所有你編寫的代碼支撐着餘下應用程式的繼續執行。

JavaScript并發的一個最重要的規則是絕不要在應用程式事件隊列中使用阻塞I/O的API。

相比之下,異步的API用在基于事件的環境中是安全的,因為它們迫使應用程式邏輯在一個獨立的事件循環“輪詢”中繼續處理。

提示:

  • 異步APi使用回調函數來延緩處理代價高昂的操作以避免阻塞主應用程式。
  • JavaScript并發地接收事件,但會使用一個事件隊列按序地處理事件處理程式。
  • 在應用程式事件隊列中絕不要使用阻塞的I/O。

在異步序列中使用嵌套或命名的回調函數

了解操作序列的最簡單的方式是異步API是發起操作而不是執行操作。異步操作完成後,在事件循環的某個單獨的輪次中,被注冊的事件處理程式才會執行。

如果你需要在發起一個操作後做一些事情,如何串聯已完成的異步操作。

  • 最簡單的答案是使用嵌套。
    db.lookupAsyc('url', function(url) {
        downloadAsyc(url, function(text) {
            console.log('contents of ' + url + ': ' + text);
        });
    });
               
    嵌套的異步操作很容易,但當擴充到更長的序列時會很快變得笨拙。
    db.lookupAsync('url', function(url) {
        downloadAsync(url, function(file) {
            downloadAsync('a.txt', function(a) {
                downladAsync('b.txt', function(b) {
                    downloadAsync('c.txt', function(c) {
                        // ...
                    });
                })
            });
        });
    });
               
  • 減少過多嵌套的方法之一是将嵌套的回調函數作為命名的函數,并将它們需要的附加資料作為額外的參數傳遞。
    db.lookupAsync('url', downloadURL);
    
    function downloadURL(url) {
        downloadAsync(url, function(text) { // still nested
            showContents(url, text);
        });
    }
    
    function showContents(url, text) {
        console.log('contents of ' + url + ': ' + text);
    }
               
    上述代碼仍然使用了嵌套的回調函數,可以使用bind方法消除最深層的嵌套回調函數。
    db.lookupAsync('url', downloadURL);
    
    function downloadURL(url) {
        downloadAsync(url, showContents.bind(null, url)); // => window.showContents(url) = function(url, text) { ... } 
    }
    
    function showContents(url, text) {
        console.log('contents of ' + url + ': ' + text);
    }   
               
    這種做法導緻了代碼看起來根據順序性,但需要為操作序列的每個中間步驟命名,并且一步步地使用綁定,這可能導緻尴尬的情況。
  • 更勝一籌的方法是使用一個額外的抽象來簡化。
    function downloadFiles(url, file) {
        downloadAllAsync(['a.txt', 'b.txt', 'c.txt'], function(all) {
            var a = all[0],
                b = all[1],
                c = all[2];
            
            // ...
        });
    }
               

提示:

  • 使用嵌套或命名的回調函數按順序地執行多個異步操作。
  • 嘗試在過多的嵌套的回調函數和尴尬的命名的非嵌套回調函數之間取得平衡。
  • 避免将可被并行執行的操作順序化。

當心丢棄錯誤

對于同步的代碼,通過使用try語句塊包裝一段代碼很容易一下子處理所有的錯誤。

try {
    f();
    g();
    h();
} catch (e) {
    // handle any error that occurred...
}
           

異步的API傾向于将錯誤表示為回調函數的特定參數,或使用一個附加的錯誤處理回調函數(有時被稱為errbacks)。

downloadAsync('a.txt', function(a) {
    downloadAsync('b.txt', function(b) {
        downloadAsync('c.txt', function(c) {
            console.log('Content: ' + a + b + c);   
        }, function(error) {
            console.log('Error: ' + error);
        })
    }, function(error) { // repeated error-handling logic
        console.log('Error: ' + error);
    })
}, function(error) { // repeated error-handling logic
    console.log('Error: ' + error);
})
           

上述代碼中,每一步的處理都使用了相同的錯誤處理邏輯,我們可以在一個共享的作用域中定義一個錯誤處理的函數,将重複代碼抽象出來。

function onError(error) {
    console.log('Error: ' + error);
}

downloadAsync('a.txt', function(a) {
    downloadAsync('b.txt', function(b) {
        downloadAsync('c.txt', function(c) {
            console.log('Content: ' + a + b + c);   
        }, onError)
    }, onError)
}, onError)
           

另一種錯誤處理API的風格隻需要一個回調函數,該回調函數的第一個參數如果有錯誤發生那就表示為一個錯誤,否則就位一個假值,比如null。

function onError(error) {
    console.log('Error: ' + error);
}

downloadAsync('a.txt', function(error, a) {
    if (error) return onError(error);

    downloadAsync('b.txt', function(error, b) {
        if (error) return onError(error);

        downloadAsync(url13, function(error, c) {
            if (error) return onError(error);

            console.log('Content: ' + a + b + c);
        });
    });
});
           

提示:

  • 通過編寫共享的錯誤處理函數來避免複制和粘貼錯誤處理代碼。
  • 確定明确地處理所有的錯誤條件以避免丢棄錯誤。

對異步循環使用遞歸

設想有一個函數接收一個URL的數組并嘗試依次下載下傳每個檔案。

function downloadOneSync(urls) {
    for (var i = 0, n = urls.length; i < n; i++) {
        downloadAsync(urls[i], onsuccess, function(error) {
            // ?
        });

        // loop continues
    }

    throw new Error('all downloads failed');
}
           

上述代碼将啟動所有的下載下傳,而不是等待一個完成再試下一個。

解決方案是将循環實作為一個函數,是以我們可以決定何時開始每次疊代。

function downloadOneAsync(urls, onsuccess, onfailure) {
    var n = urls.length;

    function tryNextURL(i) {
        if (i >= n) {
            onfailure('all downloads failed');
            return;
        }

        downloadAsync(urls[i], onsuccess, function() {
            tryNextURL(i + 1);
        });
    }

    tryNextURL(0);
}
           

局部函數tryNextURL是一個遞歸函數。它的實作調用了其自身。目前典型的JavaScript環境中一個遞歸函數同步調用自身過多次(例如10萬次)會導緻失敗。

JavaScript環境通常在記憶體中儲存一塊固定的區域,稱為調用棧,用于記錄函數調用傳回前下一步該做什麼。

function negative(x) {
    return abs(x) * -1;
}

function abs(x) {
    return Math.abs(x);
}

console.log(negative(42));
           

當程式使用參數42調用

Math.abs

方法時,有好幾個其他的函數調用也在進行,每個都在等待另一個的調用傳回。

最新的函數調用将資訊推入棧(被表示為棧的最底層的幀),該資訊也将首先從棧中彈出。當

Math.abs

執行完畢,将會傳回給abs函數,其将傳回給negative函數,然後将傳回到最外面的腳本。

當一個程式執行中有太多的函數調用,它會耗盡棧空間,最終抛出異常,這種情況被稱為棧溢出。

downloadOneAsync函數,不是直到遞歸調用傳回後才被傳回,downloadOneAsync隻在異步回調函數中調用自身。記住異步API在其回調函數被調用前會立即傳回。是以downloadOneAsync傳回,導緻其棧幀在任何遞歸調用将新的棧幀推入棧前,會從調用棧中彈出。事實上,回調函數總在事件循環的單獨輪次中被調用,事件循環的每個輪次中調用其事件處理程式的調用棧最初是空的。是以無論downloadOneAsync需要多少次疊代,都不會耗盡棧空間。

提示:

  • 循環不能是異步的。
  • 使用遞歸函數在事件循環的單獨輪次中執行疊代。
  • 在事件循環的單獨輪次中執行遞歸,并不會導緻調用棧溢出。

不要在計算時阻塞事件隊列

如果你的應用程式需要執行代價高昂的算法你該怎麼辦呢?

也許最簡單的方法是使用像Web用戶端平台的Worker API這樣的并發機制。

但是不是所有的JavaScript平台都提供了類似Worker這樣的API,另一種方法是算法分解為多個步驟,每個步驟組成一個可管理的工作塊。

Member.prototype.inNetwork = function(other){
    var visited = {},
        worklist = [this];

    while (worklist.length > 0) {
        var member = worklist.pop();

        // ...

        if (member === other) { // found?
            return true;
        }

        // ...
    }

    return false;
};
           

如果這段程式核心的while循環代價太過高昂,搜尋工作很可能會以不可接受的時間運作而阻塞應用程式事件隊列。

幸運的是,這種算法被定義為一個步驟集的序列——while循環的疊代。我們可以通過增加一個回調參數将inNetwork轉換為一個匿名函數,将while循環替換為一個匿名的遞歸函數。

Member.prototype.inNetwork = function(other, callback) {
    var visited = {},
        worklist = [this];

    function next() {
        if (worklist.length === 0) {
            callback(false);
            return;
        }

        var number = worklist.pop();

        // ...

        if (member === other) { // found?
            callback(true);
            return;
        }

        // ...

        setTimeout(next, 0); // schedule the next iteration
    }

    setTimeout(next, 0); // schedule the next iteration
};
           

局部的next函數執行循環中的單個疊代然後排程應用程式事件隊列來異步運作下一次疊代。這使得在此期間已經發生的其他事件被處理後才繼續下一次疊代。當搜尋完成後,通過找到一個比對或周遊完整個工作表,我們使用結果值調用回調函數并通過調用沒有排程任何疊代的next來傳回,進而有效地完成循環。

要排程疊代,我們使用多數JavaScript平台都可用的、通用的setTimeout API來注冊next函數,是next函數經過一段最少時間(0毫秒)後運作。這具有幾乎立刻将回調函數添加到事件隊列上的作用。

提示:

  • 避免在主事件隊列中執行代價高昂的算法。
  • 在支援Worker API的平台,該API可以用來在一個獨立的事件隊列中運作長計算程式。
  • 在Worker API不可用或代價昂貴的環境中,考慮将計算程式分解到事件循環的多個輪次中。

使用計數器來執行并行操作

function downloadAllAsync(urls, onsuccess, onerror) {
    var result = [],
        length = urls.length;

    if (length === 0) {
        setTimeout(onsuccess.bind(null, result), 0);
        return;
    }

    urls.forEach(function(url) {
        downloadAsync(url, function(text) {
            if (result) {
                // race condition
                reuslt.push(text);

                if (result.length === urls.length) {
                    onsuccess(result);
                }
            }
        }, function(error) {
            if (result) {
                result = null;
                onerror(error);
            }
        });
    });
}
           

上述代碼有錯誤。

當一個應用程式依賴于特定的事件順序才能正常工作時,這個程式會遭受資料競争(data race)。資料競争是指多個并發操作可以修改共享的資料結構,這取決于它們發生的順序。

var filenames = [
    'huge.txt',
    'tiny.txt',
    'medium.txt'
];

downloadAllAsync(filenames, function(files) {
    console.log('Huge file: ' + files[0].length); // tiny
    console.log('Tiny file: ' + files[1].length); // medium
    console.log('Medium file: ' + files[2].length); // huge
}, function(error) {
    console.log('Error: ' + error);
});
           

由于這些檔案是并行下載下傳的,事件可以以任意的順序發生。例如,如果tiny.txt先下載下傳完成,接下來是medium.txt檔案,最後是huge.txt檔案,則注冊到downloadAllAsync的回調函數并不會按照它們被建立的順序進行調用。但downloadAllAsync的實作是一旦下載下傳完成就立即将中間結果儲存在result數組的末尾。是以downloadAllAsync函數提供的儲存下載下傳檔案内容的數組的順序是未知的。

下面的方式可以實作downloadAllAsync不依賴不可預期的事件執行順序而總能提供預期結果。我們不将每個結果放置到數組末尾,而是存儲在其原始的索引位置中。

function downloadAsync(urls, onsuccess, onerror) {
    var length = urls.length,
        result = [];
    
    if (length === 0) {
        setTimeout(onsuccess.bind(null, result), 0);
        return;
    }

    urls.forEach(function(url, i) {
        downloadAsync(url, function(text) {
            if (result) {
                result[i] = text; // store at fixed index

                // race condition
                if (result.length === urls.length) {
                    onsuccess(result);
                }
            }
        }, function(error) {
            if (result) {
                result = null;
                onerror(error);
            }
        });
    });
}
           

上述代碼依然是不正确的。

假如我們有如下的一個請求。

downloadAllAsync(['huge.txt', 'medium.txt', 'tiny.txt']);
           

根據數組更新的契約,即設定一個索引屬性,總是確定數組的length屬性值大于索引。

如果tiny.txt檔案最先被下載下傳,結果數組将擷取索引未2的屬性,這将導緻

result.length

被更新為3。使用者的success回調函數被過早地調用,其參數為一個不完整的結果數組。

正确地實作應該是使用一個計數器來追蹤正在進行的操作數量。

function downloadAsync(urls, onsuccess, onerror) {
    var pending = urls.length,
        result = [];
    
    if (pending === 0) {
        setTimeout(onsuccess.bind(null, result), 0);
        return;
    }

    urls.forEach(function(url, i) {
        downloadAsync(url, function(text) {
            if (result) {
                result[i] = text; // store at fixed index
                pending--; // register the success

                // race condition
                if (pedding === 0) {
                    onsuccess(result);
                }
            }
        }, function(error) {
            if (result) {
                result = null;
                onerror(error);
            }
        });
    });
}
           

提示:

  • JavaScript應用程式中的事件發生是不确定的,即順序是不可預測的。
  • 使用計數器避免并行操作中的資料競争。

絕不要同步地調用異步的回調函數

設想有downloadAsync函數的一個變種,它持有一個緩存來避免多次下載下傳同一個檔案。

var cache = new Dict();

function downloadCachingAsync(url, onsuccess, onerror) {
    if (cache.has(url)) {
        onsuccess(cache.get(url)); // synchronous call
        return;
    }

    return downloadAsync(url, function(file) {
        cache.set(url, file);
        onsuccess(file);
    }, onerror);
};
           

通常情況下,如果可以,它似乎會立即提供資料,但這以微妙的方式違反了異步API用戶端的期望。

  • 首先,它改變了操作的預期順序。
    downloadCachingAsync('file.txt', function(file) {
        console.log('finished'); // might happen first
    });
    
    console.log('starting');
               
  • 其次,異步API的目的是維持事件循環中每輪的嚴格分離。這簡化了并發,通過減輕每輪事件循環的代碼量而不必擔心其他代碼并發地修改共享的資料結構。同步地調用異步的回調函數違反了這一分離,導緻在目前輪完成之前,代碼用于執行一輪隔離的事件循環。
    downloadCachingAsync(remaining[0], function(file) {
        remaing.shift();
    
        // ...
    });
    
    status.display('Downloading ' + remaining[0] + '...');
               
    如果同步地調用該回調函數,那麼将顯示錯誤的檔案名的消息(或者更糟糕的是,如果隊列為空會顯示undefined)。

同步地調用異步的回調函數甚至可能會導緻一些微妙的問題。

  • 異步的回調函數本質上是以空的調用棧來調用,是以将異步的循環實作為遞歸函數是安全的,完全沒有累計超越調用棧空間的危險。同步的調用不能保障這一點,因而使得一個表面上的異步循環很可能會耗盡調用棧空間。
  • 另一個問題是異常。對于上面的downloadCachingAsync實作,如果回調函數抛出一個異常,它将會在每輪的事件循環中,也就是開始下載下傳時而不是期望的一個分離的回合抛出該異常。

為了確定總是異步地調用回調函數,我們可以使用通用的庫函數setTimeout在每隔一個最小的時間的逾時時間後給事件隊列增加一個回調函數。

var cache = new Dict();

function downloadCachingAsync(url, onsuccess, onerror) {
    if (cache.has(url)) {
        var cached = cache.get(url);
        setTimeout(onsuccess.bind(null, cached), 0);
        return;
    }

    return downloadAsync(url, function(file) {
        cache.set(url, file);
        onsuccess(file);
    }, onerror);
};
           

提示:

  • 即使可以立即得到資料,也絕不要同步地調用異步回調函數。
  • 同步地調用異步的回調函數擾亂了預期的操作序列,并可能導緻意想不到的交錯代碼。
  • 同步地調用異步的回調函數可能導緻棧溢出或錯誤地處理異常。
  • 使用異步的API,比如setTimeout函數來排程異步回調函數,使其運作于另一個回合。

使用promise模式清潔異步邏輯

建構異步API的一種流行的替代方式是使用promise(有時也被稱為deferred或future)模式。

基于promise的API不接收回調函數作為參數,相反,它傳回一個promise對象,該對象通過其自身的then方法接收回調函數。

var p = downloadP('file.txt');

p.then(function(file) {
    console.log('file: ' + file);
});
           

promise的力量在于它們的組合性。傳遞給then的回調函數不僅産生影響,也可以産生結果。通過回調函數傳回一個值,可以構造一個新的promise。

var fileP = downloadP('file.txt');

var lengthP = fileP.then(function(file) {
    return file.length;
});

lengthP.then(function(length) {
    console.log('length: ' + length);
});
           

promise可以非常容易地構造一個實用程式來拼接多個promise的結果。

var filesP = join(downloadP('file1.txt'), downloadP('file2.txt'), downloadP('file3.txt'));

filesP.then(function(files) {
    console.log('file1: ' + files[0]);
    console.log('file2: ' + files[1]);
    console.log('file3: ' + files[2]);
});
           

promise庫也經常提供一個叫做when的工具函數。

var fileP1 = downloadP('file1.txt'), 
    fileP2 = downloadP('file2.txt'), 
    fileP3 = downloadP('file3.txt');

when([fileP1, fileP2, fileP3], function(files) {
    console.log('file1: ' + files[0]);
    console.log('file2: ' + files[1]);
    console.log('file3: ' + files[2]);
});
           

使promise成為卓越的抽象層級的部分原因是通過then方法的傳回值來聯系結果,或者通過工具函數如join來構成promise,而不是在并行的回調函數間共享資料結構。這本質上是安全的,因為它避免了資料競争。

有時故意建立某種類的資料競争是有用的。promise為此提供了一個很好的機制。例如,一個應用程式可能需要嘗試從多個不同的伺服器上同時下載下傳同一份檔案,而選擇最先完成的那個檔案。

var fileP = select(downloadP('http://example1.com/file.txt'), 
                    downloadP('http://example1.com/file.txt'),
                    downloadP('http://example1.com/file.txt'));

fileP.then(function(file) {
    console.log('file: ' + file);
});
           

select函數的另一個用途是提供逾時來終止長時間的操作。

var fileP = select(downloadP('file.txt'), timeoutErrorP(2000));

fileP.then(function(file) {
    console.log('file: ' + file);
}, function(error) {
    console.log('I/O error or timeout: ' + error);
});
           

提示:

  • promise代表最終值,即并行完成時最終産生的結果。
  • 使用promise組合不同的并行操作。
  • 使用promise模式的API避免資料競争。
  • 在要求有意的競争條件時使用select(也被稱為choose)。