本文是我翻譯《JavaScript Concurrency》書籍的第七章 抽取併發邏輯,該書主要以Promises、Generator、Web workers等技術來講解JavaScript併發編程方面的實踐。 完整書籍翻譯地址: "https://github.com/yzsunlei/javas ...
本文是我翻譯《JavaScript Concurrency》書籍的第七章 抽取併發邏輯,該書主要以Promises、Generator、Web workers等技術來講解JavaScript併發編程方面的實踐。
完整書籍翻譯地址:https://github.com/yzsunlei/javascript_concurrency_translation 。由於能力有限,肯定存在翻譯不清楚甚至翻譯錯誤的地方,歡迎朋友們提issue指出,感謝。
到本書這裡,我們已經在代碼中明確地模擬了併發問題。使用promises,我們同步化了兩個或更多非同步操作。使用生成器,我們按需創建數據,避免不必要的記憶體分配。最後,我們瞭解到Web worker是利用多個CPU內核的主要工具。
在本章中,我們將採用所有這些方法並將它們放入應用程式代碼的上下文中。也就是說,如果併發是預設的,那麼我們需要使併發儘可能不那麼明顯。我們將首先探索各種技術,這些技術將幫助我們在使用的組件中封裝併發機制。然後,我們將通過使用promises來幫助worker通信,直接改進前兩章的代碼。
一旦我們能夠使用promises抽象worker通信,我們將嘗試在生成器的幫助下實現惰性的worker。我們還將使用Parallel.js庫來介紹worker抽象,然後是worker線程池的概念。
編寫併發代碼
併發編程很難做到。即使是人為的示例應用程式,大部分複雜性來自併發代碼。我們顯然希望我們的代碼可讀性好,同時保持併發的好處。我們希望充分利用系統上的每個CPU。我們只想在需要時計算我們需要的東西。我們不希望義大利麵條式的代碼將多個非同步操作混在一起。在開發應用程式的同時關註併發編程的所有這些方面會削弱我們應該關註的內容 - 提供應用程式有價值的功能。
在本節中,我們將介紹可能用於將我們的應用程式的其餘部分與棘手的併發隔離的方法。這通常意味著將併發作為預設模式 - 即使在引擎下沒有發生真正的併發時也是如此。最後,我們不希望我們的代碼包含90%的併發處理技巧,而只有10%的功能。
隱藏併發機制
在我們所有的代碼中暴露併發機制的難度是,他們每一個都稍微不同於另一個。這擴大了我們可能已經發現所在的回調地獄的情況。例如,不是所有的併發操作都是從一些遠程資源獲取數據的網路請求。非同步數據可能來自一個worker或一些本身就是非同步的回調。想象一下場景我們使用了三個不同的數據源來計算一個我們需要的值,所有的這些都是非同步的。這裡是這個問題的示圖:
此圖中的數據是我們在應用程式代碼中關註的內容。從我們正在構建的功能的角度來看,我們並不關心上述任何事情。因此,我們的前端架構需要封裝與併發相關的複雜性。這意味著我們的每個組件都應該能夠以相同的方式訪問數據。除了我們所有的非同步數據源之外,還有另一個要考慮的複雜因素 - 當數據不是非同步的並且來自本地數據源呢?那麼同步本地數據源和HTTP請求呢?我們將在下一節中介紹這些。
沒有併發性
僅僅因為我們正在編寫併發JavaScript應用程式,並非每個操作本身都是併發的。例如,如果一個組件向另一個組件詢問它已經在記憶體中的數據,則它不是非同步操作並會立即返回。我們的應用程式可能到處都是這些操作,其中併發性根本就沒有意義。其中存在的挑戰 - 我們如何將非同步操作與同步操作無縫混合?
簡單的答案是我們在每處做出併發的預設假設。promise使這個問題易於處理。以下是使用promise來封裝非同步和同步操作的示圖說明:
這看起來很像前面的那個圖,但有兩個重要區別。我們添加了一個synchronous()操作; 這沒有回調函數,因為它不需要回調函數。它不是在等待其他任何東西,所以它會直接地返回。其他兩個函數就像在上圖中一樣;兩者都依賴回調函數將數據提供給我們的應用程式。第二個重要區別是有一個promise對象。這取代了sync()操作和數據概念。或者更確切地說,它將它們融合到同一個概念中。
這是promise的關鍵作用 - 它們為我們抽象同步問題提供能力。這不僅適用於網路請求,還適用於Web worker消息或依賴於回調的任何其他非同步操作。它需要一些調整來考慮下我們的數據,因為我們得保證它最終會到達這裡。但是,一旦我們消除了這種心理差距,預設情況下就會啟用併發。就我們的功能而言,併發是預設的,而我們在操作背後所做的事情並不是最具破壞性的。
現在讓我們看一些代碼。我們將創建兩個函數:一個是非同步的,另一個是簡單返回值的普通函數。這裡的目標是使運行這些函數的代碼相同,儘管生成值的方式有很大不同:
//一個非同步“fetch”函數。我們使用“setTimeout()”
//在1秒後通過“callback()”返回一些數據。
function fetchAsync(callback) {
setTimeout(() => {
callback({hello: 'world'});
}, 1000);
}
//同步操作只簡單地返回數據。
function fetchSync() {
return {hello: 'world'};
}
//對“fetchAsync()”調用的promise。
//我們通過了“resolve”函數作為回調。
var asyncPromise = new Promise((resolve, reject) => {
fetchAsync(resolve);
});
//對“fetchSync()”調用的promise。
//這個promise立即完成使用返回值。
var syncPromise = new Promise((resolve, reject) => {
resolve(fetchSync());
});
//創建一個等待兩個promise完成的promise。
//這讓我們無縫混合同步值和非同步值。
Promise.all([
asyncPromise,
syncPromise
]).then((results) => {
var [asyncResult, syncResult] = results;
console.log('async', asyncResult);
//→async {hello: 'world'}
});
console.log('sync', syncResult);
//→sync {hello:'world'}
在這裡權衡的是增加promise的複雜性,包裹它們而不是讓簡單的返回值函數馬上返回。但在現實中,封裝promise的複雜性中,如果我們不是寫一個併發應用,我們顯然需要關心這類問題本身。這些的好處是巨大的。當一切都是promise的值時,我們可以安全地排除令人討厭的導致不一致的併發錯誤。
worker與promise通信
我們現在已經知道了為什麼將原始值視為promise有益於我們的代碼。是時候將這個概念應用於web workers了。在前兩章中,我們的代碼同步來自Web worker的響應看起來有點棘手。這是因為我們基本上試圖模仿許多promise善於處理的樣板工作。我們首先嘗試通過創建輔助函數來解決這些問題,這些函數為我們包裝worker通信,返回promise。然後我們將嘗試另一種涉及在較低級別擴展Web worker的方法。最後,我們將介紹一些涉及多個worker的更複雜的同步方案,例如上一章中的那些worker方案。
輔助函數
如果我們能夠以promise解決的形式獲得Web worker響應,那將是理想的。但是,我們需要首先創造promise - 我們該怎麼做這個?好吧,我們可以手動創建promise,其中發送給worker的消息是從promise executor函數中發送的。但是,如果我們採用這種方法,我們就不會比引入promise之前好多少了。
技巧是在單個輔助函數中封裝發佈到worker的消息和從worker接收的任何消息,如下所示:
我們來看一個實現這種模式的輔助函數示例。首先,我們需要一個執行某項任務的worker - 我們將從這開始:
//吃掉一些CPU迴圈...
//源自http://adambom.github.io/parallel.js/
function work(n) {
var i = 0;
while (++i < n * n) {}
return i;
}
//當我們收到消息時,我們會發佈一條消息id,
//以及在“number”上執行“work()”的結果。
addEventListener('message', (e) => {
postMessage({
id: e.data.id,
result: work(e.data.number)
});
});
在這裡,我們有一個worker,它會對我們傳遞的任何數字進行平方。這個work()函數特意很慢,以便我們可以看到我們的應用程式作為一個整體在Web worker花費比平時更長的時間來完成任務時的表現。它還使用我們在之前的Web worker示例中看到的id,因此它可以與發送消息的代碼協調。讓我們現在實現使用此worker的輔助函數:
//這將生成唯一ID。
//我們需要它們將Web worker執行的任務
//映射到更大的創建它們的操作。
function* genID() {
var id = 0;
while (true) {
yield id++;
}
}
//創建全局“id”生成器。
var id = genID();
//這個對象包含promises的解析器函數,
//當結果從worker那裡返回時,我們通過id在這裡查看。
var resolvers = {};
//開始我們的worker...
var worker = new Worker('worker.js');
worker.addEventListener('message', (e) => {
//找到合適的解析器函數。
var resolver = resolvers[e.data.id];
//從“resolvers”對象中刪除它。
delete resolvers[e.data.id];
//通過調用解析器函數將worker數據傳遞給promise。
resolver(e.data.result);
});
//這是我們的輔助函數。
//它處理向worker發送消息,
//並將promise綁定到worker的響應。
function square(number) {
return new Promise((resolve, reject) => {
//用於將Web worker響應和解析器函數綁定在一起的id。
var msgId = id.next().value;
//存儲解析器以便以後在Web worker消息回調中可以使用。
resolvers[msgId] = resolve;
//發佈消息 - id和number參數
worker.postMessage({
id: msgId,
number: number
});
});
}
square(10).then((result) => {
console.log('square(10)', result);
//→square(10) 100
});
square(100).then((result) => {
console.log('square(100)', result);
//→square(100) 10000
});
square(1000).then((result) => {
console.log('square(1000)', result);
//→square(1000) 1000000
});
如果我們關註square()函數的使用方式,傳遞一個數字參數並將一個promise作為返回值,我們可以看到這符合我們之前關於預設情況下使代碼併發的討論。例如,我們可以從這個場景中完全刪除worker,只需更改輔助函數解析它返回的promise的方式,我們的其餘代碼將繼續保持不變。
輔助函數策略只是一種使用promises簡化worker通信的方法。也許我們可以決定我們不一定要維護一堆輔助函數。接下來,我們將看一個比輔助函數更細粒度的方法。
擴展postMessage()
我們可以採用更通用的方法,而不是積聚大量輔助功能。輔助函數本身沒有什麼問題;他們是直接而且重要的。如果我們達到了數百個這樣的函數,它們的作用就會開始大打折扣了。更通用的方法是繼續使用worker.postMessage()。
所以讓我們看看是否可以使這個方法返回一個promise,就像我們上一節中的helper函數一樣。這樣,我們繼續使用細粒度postMessage()方法,但改進了我們的同步語義。首先,看看這裡的worker代碼:
addEventListener('message', (e) => {
//我們將發回主線程的結果,
//它應該始終包含消息id。
var result = {id: e.data.id};
//基於“action”,計算響應值“value”。
//選項是單獨保留文本,
//將其轉換為大寫,或將其轉換為小寫。
if (e.data.action === 'echo') {
result.value = e.data.value;
} else if (e.data.action === 'upper') {
result.value = e.data.value.toUpperCase();
} else if (e.data.action === 'lower') {
result.value = e.data.value.toLowerCase();
}
});
//通過等待延時模擬一個運行時間很長的worker,
//它在1秒後返回結果。
setTimeout(() => {
postMessage(result);
}, 1000);
這與我們迄今為止在Web worker代碼中看到的完全不同。現在,在主線程中,我們必須弄清楚如何改變Worker的介面。我們現在就這樣做。然後,我們將嘗試向此worker發佈一些消息並將處理promises作為響應:
//這個對象包含promises的解析器函數,
//當結果從worker那裡返回時,我們通過id在這裡查看。
var resolvers = {};
//保持“postMessage()”的原始實現,
//所以我們可以稍後在我們的自定義“postMessage()”中調用它。
var postMessage = Worker.prototype.postMessage;
//用我們的自定義實現替換“postMessage()”。
Worker.prototype.postMessage = function(data) {
return new Promise((resolve, reject) => {
//用於將Web worker響應和解析器函數綁定在一起的id。
var msgId = id.next().value;
//存儲解析器以便以後可以在Web worker消息回調使用。
resolvers[msgId] = resolve;
//運行原始的“Worker.postMessage()”實現,
//實際上負責將消息發佈到worker線程。
postMessage.call(this, Object.assign({
id: msgId
}, data));
});
};
//開始我們的worker...
var worker = new Worker('worker.js');
worker.addEventListener('message', (e) => {
//找到合適的解析器函數。
var resolver = resolvers[e.data.id];
//從“resolvers”對象中刪除它。
delete resolvers[e.data.id];
//通過調用解析器函數將worker數據傳遞給promise。
resolver(e.data.value);
});
worker.postMessage({
action: 'echo',
value: 'Hello World'
}).then((value) => {
console.log('echo', `"${value}"`);
//→echo “Hello World”
});
worker.postMessage({
action: 'upper',
value: 'Hello World'
}).then((value) => {
console.log('upper', `"${value}"`);
//→upper “HELLO WORLD”
});
worker.postMessage({
action: 'lower',
value: 'Hello World'
}).then((value) => {
console.log('lower',`"${value}"`);
//→lower “hello world”
});
嗯,這正是我們需要的,對吧?我們可以直接將消息數據發佈給worker,並通過promise解析將響應數據發送給我們。作為一個額外的好處,如果我們如此傾向,我們實際上可以圍繞這個新的postMessage()函數實現包裝輔助函數。主要參與完成這項工作的技巧是存儲對原始postMessage()的引用。然後,我們覆蓋web worker屬性postMessage,而不是函數本身。最後,我們可以復用它並添加必要的協調來保證好用。
同步worker結果
該代碼在最後2段已經充分降低了web workers回調地獄到可接受的水平。在事實上,現在我們已經有了一個方法處理如何封裝web workers通信由具有的postMessage()返回一個promise,我們準備要開始簡化一些未使用這種方法的混亂的worker代碼。我們已經瞭解了這些例子的,所以到目前為止,已經從promise中獲益良多,他們是簡單的; 沒有這些抽象不會是世界末日。
那麼我們映射數據集合然後映射和迭代集合的場景呢?我們可以回顧map/reduce代碼在“第6章,實用的並行”。這主要是由於所有worker通信模板代碼與嘗試執行map/reduce操作的代碼混合在一起。讓我們看看使用promise技術是否更好。首先,我們將創建一個非常基本的worker:
//返回一個輸入數組的映射,
//它通過平方數組中的每個數字。
addEventListener('message', (e) => {
postMessage({
id: e.data.id,
value: e.data.value.map(v => v * v)
});
});
我們可以使用此worker傳遞數組進行映射。因此,我們將創建其中兩個併在兩個worker之間拆分工作負載,如下所示:
function onMessage(e) {
//找到合適的解析器函數。
var resolver = resolvers[e.data.id];
//從“resolvers”對象中刪除它。
delete resolvers[e.data.id];
//通過調用解析器函數將worker數據傳遞給promise
resolver(e.data.value);
}
//開始我們的worker...
var worker1 = new Worker('worker.js'),
worker2 = new Worker('worker.js');
//創建一些要處理的數據。
var array = new Array(50000).fill(null).map((v, i) => i);
//當worker返回數據時,找到適當的解析器函數來調用。
worker1.addEventListener('message', onMessage);
worker2.addEventListener('message', onMessage);
//將輸入數據拆分為2,給出前半部分到第一個worker,
//給出後一部分到第二個worker。在這一點上,我們有兩個promises。
var promise1 = worker1.postMessage({
value: array.slice(0, Math.floor(array.length / 2))
});
var promise2 = worker2.postMessage({
value: array.slice(Math.floor(array.length / 2))
});
//使用“Promise.all()”來同步workers
//比手動嘗試協調整個worker回調函數要容易得多。
Promise.all([promise1, promise2]).then((values) => {
console.log('reduced', [].concat(...values).reduce((r, v) => r + v));
//→reduced 41665416675000
});
這就是我們需要向worker發佈數據以及同步來自兩個或更多worker的數據時,我們實際上就有動力編寫併發代碼 - 它看起來與現在的其他應用程式代碼相同。
惰性workers
現在是我們從不同角度看待web workers的時候了。我們使用worker的根本原因是我們想要在相同的時間內計算比過去更多的數據。正如我們現在所知,這樣做涉及消息傳遞錯綜複雜,可以說是分而治之的策略。我們必須通過將數據輸入和輸出worker,通常使用數組。
生成器幫助我們實現惰性地計算。也就是說,我們不想在記憶體中計算內容或分配數據,直到我們確實需要它。web workers難以實現這一目標嗎?或者我們可以利用生成器來惰性地並行計算嗎?
在本節中,我們將探討有關在Web worker中使用生成器的方法。首先,我們將研究與Web worker相關的開銷問題。然後,我們將編寫一些代碼通過使用生成器來將數據輸入或者輸出worker。最後,我們將看看我們是否可以惰性地通過一個生成器鏈在web worker上傳遞所有數據。
減少開銷
主線程可以拆分開銷大的Web workers操作,在另一個線程中運行它們。這意味著DOM能夠渲染掛起的更新並處理掛起的用戶事件。但是,我們仍然面臨分配大型數組的開銷和更新UI所需的時間。儘管與Web worker並行處理,但我們的用戶仍然可能面臨運行緩慢,因為在處理完整個數據集之前,UI沒有更新。這是常見的模式的示圖:
這是具有單個worker的數據所採用的通用路徑; 當有多個worker時,同樣的方法也適用。使用這種方法,我們無法避免需要將數據序列化兩次這一事實,我們必須分配兩次。這些開銷僅僅是為了促進worker的通信,而與我們試圖實現的應用程式功能幾乎沒有關係。
worker通信所需的數組和序列化開銷通常不是什麼大問題。但是,對於更大的集合,我們可能會面臨真正的性能問題,這源於我們用於提高性能的機制。因此,從另一個角度看worker通信不會受到損失,即使最初沒有必要。
這是大多數worker採用的通用路徑的變體。不是預先分配和序列化大量數據,而是將單個項傳入和傳出worker。這使得UI有機會在所有處理的數據到達之前使用已處理的數據進行更新。
在workers中生成值
如果我們想要在workers生成結果時更新UI,那麼他們無法將結果集打包為數組,以便在完成所有計算後發送回主線程。當發生這種情況時,UI就停在那裡而不響應用戶。我們希望一個惰性的方法,其中值是在一段時間產生一個,這樣UI就可以越快被更新。讓我們建立一個例子,將輸入發送到該web workers,然後將結果以一個比我們之前在這本書已經看到的更細微的水平發送回來:
首先,我們將創造一個worker; 它的代碼如下:
//消耗一些CPU迴圈...
//源自http://adambom.github.io/parallel.js/
function work(n) {
var i = 0;
while(++i < n * n) {}
return i;
}
//將調用“work()”的結果發回給主線程
addEventListener('message', (e) => {
postMessage(work(e.data));
});
這裡沒有什麼可大不了的。它與我們已經習慣的通過低效率地對數字進行減慢運行的代碼的work()函數相同。worker內部沒有使用實際的生成器。這是因為我們真的不需要,我們馬上就會明白為什麼:
//創建一個“update()”協程,
//在生成結果時持續更新UI。
var update = coroutine(function* () {
var input;
while (true) {
input = yield;
console.log('result', input.data);
}
});
//創建worker,並指定“update()”協程
//作為“message”回調處理程式。
var worker = new Worker('worker.js');
worker.addEventListener('message', update);
//一個數字逐漸變大的數組
var array = new Array(10).fill(null).map((v, i) => i * 10000);
//迭代數組,將每個數字作為私有消息傳遞給worker。
for(let item of array) {
worker.postMessage(item);
}
//→
//result 1
//result 100000000
//result 400000000
//result 900000000
//result 1600000000
//result 2500000000
//result 3600000000
//result 4900000000
//result 6400000000
//result 8100000000
傳遞給我們worker的每個數字的處理成本都比前一個數字要高。總的來說,在向用戶顯示任何內容之前處理整個輸入數組會覺得應用程式掛起或出錯了。但是,這不是這種情況,因為雖然每個數字的處理開銷很高,但我們會在結果可用時將結果發佈回來。
我們通過傳入一個數組來執行和將數組作為輸出返回來執行有著相同的工作量。然而,這種方法只是改變了事情發生的順序。我們在演示中引入了協作式多任務 - 在一個任務中計算一些數據併在另一個任務中更新UI。完成工作所花費的總時間是相同的,但對於用戶來說,感覺要快得多。總得說來,用戶可感知的應用程式性能是唯一重要的性能指標。
我們將輸入作為單獨的消息傳遞。我們可以將輸入作為數組傳遞,單獨發佈結果,並獲得相同的效果。但是,這可能
僅僅是不必要的複雜性。對於模式有自然的對應關係,因為它是 - 項目輸入,項目輸出。如果你不需要就不要改變它。
惰性worker鏈
正如我們在“第4章,使用Generator實現惰性計算”看到,我們可以組裝生成器鏈。這就是我們惰性地實現複雜函數的方式;一個項流經一系列生成器函數,這些函數在生成之前將項轉換為下一個生成器,直到它到達調用者。沒有生成器,我們必須分配大量的中間數據結構,只是為了將數據從一個函數傳遞到下一個函數。
在本文之前的部分中,我們看到Web worker可以使用類似於生成器的模式。由於我們在這裡面臨類似的問題,我們不希望分配大型數據結構。我們可以通過在更細粒度級別傳遞項目來避免這樣做。這具有保持UI響應的額外好處,因為我們能夠在最後一個項目從worker到達之前更新它。鑒於我們可以與worker做很多事情,我們難道不能基於在這個想法並組裝更複雜的worker處理節點鏈嗎?
例如,假設我們有一組數字和幾個轉換。我們在UI中顯示這些轉換之前,我們需要按特定順序進行這些轉換。理想情況下,我們會設置一個worker鏈,每個worker負責執行其指定的轉換,然後將輸出傳遞給下一個worker。最終,主線程獲得一個可以在DOM中顯示的值。
這個目標的問題在於它所涉及的很棘手的通信。由於專用worker只與創建它們的主線程進行通信,因此將結果發送回主線程,然後發送到鏈中的下一個worker線程,這幾乎沒有什麼益處。好吧,事實證明,專用worker可以直接通信而不涉及主線程。我們可以在這裡使用稱為頻道消息的東西。這個想法很簡單; 它涉及創建一個頻道,它有兩個埠 - 消息在一個埠上發佈併在另一個埠上接收。
我們一直在使用消息傳遞頻道和埠。他們被卷入web workers。這是消息事件和postMessage()方法模式的來源。以下是我們如何使用頻道和埠連接我們的Web worker的示圖:
我們可以看到,每個頻道使用兩個消息傳遞埠。第一埠是用於發佈消息,而所述第二埠被使用來接收消息事件。主線程唯一一次使用是當所述處理鏈首先被用於發佈一個消息給第一通道和當該消息從第三通道被接收到的消息。不要讓worker通信所需的六個埠嚇倒我們,讓我們寫一些代碼; 也許,那裡看起來會更易於理解。首先,我們將創建鏈中使用的worker。實際上,他們是同一個worker的兩個實例。下麵是代碼:
addEventListener('message', (e) => {
//獲取用於發送和接收消息的埠。
var [port1, port2] = e.ports;
//偵聽第一個埠的傳入消息。
port1.addEventListener('message', (e) => {
//在第二個埠上響應,結果為調用“work()”。
port2.postMessage(work(e.data));
});
//啟動兩個埠。
port1.start();
port2.start();
});
這是很有趣的。在這個worker中,我們有消息埠可以使用。第一個埠用於接收輸入,第二個埠用於發送輸出。該work()函數簡單地使用我們現在熟悉的平方數消耗CPU周期來看workers如何表現。我們在主線程中想要做的是創建這個worker的兩個實例,這樣我們就可以傳遞第一個平方數的實例。然後,在不將結果傳遞迴主線程的情況下,它將結果傳遞給下一個worker,並再次對數字進行平方。通信路線應該與前面的圖表非常相似。讓我們看一下使用消息傳遞通道連接worker的一些代碼:
//開始我們的worker...
var worker1 = new Worker('worker.js');
var worker2 = new Worker('worker.js');
//創建通信所需的在兩個worker之間的消息通道。
var channel1 = new MessageChannel();
var channel2 = new MessageChannel();
var channel3 = new MessageChannel();
//我們的“update()”協程會記錄worker的響應
var update = coroutine(function* () {
var input;
while (true) {
input = yield;
console.log('result', input.data);
}
});
//使用“worker1”連接“channel1”和“channel2”。
worker1.postMessage(null, [
channel1.port2,
channel2.port1
]);
//使用“worker2”連接“channel2”和“channel3”。
worker2.postMessage(null, [
channel2.port2,
channel3.port1
]);
//將我們的協程“update()”連接到收到“channel3”任何消息。
channel3.port2.addEventListener('message', update);
channel3.port2.start();
//我們的輸入數據 - 一組數字。
var array = new array(25)
.fill(null)
.map((v, i) => i*10);
//將每個數組項發佈到“channel1”。
for (let item of array) {
channel1.port1.postMessage(item);
}
除了我們要發送給worker的數據之外,我們還可以發送一個消息埠列表,我們希望將這些消息埠傳輸到worker上下文。這就是我們對發送給worker的前兩條消息的處理方式。消息數據為空,因為我們沒有對它做任何事情。實際上,這些是我們發送的唯一消息直接給worker。通信的其餘部分通過我們創建的消息通道進行。開銷大的計算發生在worker上,因為那是消息處理程式所在的位置。
使用Parallel.js
使用Parallel.js庫的目的是為了使與Web worker交互儘可能的無縫。在事實上,它完成了這本書的一個關鍵目標,它隱藏併發機制,並讓我們能夠專註於我們正在構建的應用程式。
在本節中,我們將介紹Parallel.js對worker通信採取的方法以及將代碼傳遞給worker的通用方法。然後,我們將介紹一些使用Parallel.js生成新worker線程的代碼。最後,我們將探索這個庫已經提供的內置map/reduce功能。
它怎麼工作的
在本書中到目前為止我們使用的所有worker都是我們自己創造的。我們在worker中實現了消息事件處理,計算某些值,然後發佈響應。使用Parallel.js,我們不實現worker。相反,我們實現函數,然後將函數傳遞給由庫管理的workers。
這給我們帶來了一些麻煩。我們所有的代碼都在主線程中實現,這意味著更容易使用在主線程中實現的函數,因為我們不需要使用importScripts()將它們導入到Web worker中。我們也不需要通過腳本目錄創建Web worker並手動啟動它們。相反,我們讓Parallel.js為我們生成新的worker,然後我們可以通過將函數和數據傳遞給他們來告訴worker該做什麼。那麼,這究竟是如何工作的呢?
workers需要一個腳本參數。沒有有效的腳本,worker根本無法工作。Parallel.js有一個簡單的eval腳本。這是傳遞給庫創建的worker的內容。然後,主線程中的API將在worker中進行評估代碼,併在需要與workers通信時將其發送。
這是可行的,因為Parallel.js的目的不是暴露worker支持的大量功能。相反,目標是使worker通信機制儘可能無縫,同時提供最小的功能。這樣可以輕鬆構建與我們的應用程式相關的併發功能,而不是我們永遠不會使用的許多其他功能。
以下是我們如何使用Parallel.js和它的eval腳本將數據和代碼傳遞給worker的說明:
生成workers
Parallel.js庫有一個作業的概念。作業的主要輸入是作業要處理的數據。作業的創建並不直接與後臺worker的創建聯繫在一起。workers與Parallel.js中的作業不同;使用庫時,我們不直接與worker交互。一旦我們有了作業實例,並且它提供了我們的數據,我們就會使用一個作業方法來調用workers。
最基本的方法是spawn(),它將一個函數作為參數併在Web worker中運行它。我們傳遞給它一個函數作為參數並且在web worker中運行。我們傳遞給它的函數可以返回結果,然後將它們解析為一個thenable對象被spawn()函數返回。讓我們看一下使用Parallel.js生成由一個web worker返回的新作業的代碼:
//一個數字輸入數組。
var array = new Array(2500)
.fill(null)
.map((v, i) => i);
//創建一個新的並行作業。
//在這裡沒有worker的創建 -
//我們只傳遞我們正在使用的構造數據。
var job = new Parallel(array);
//為我們的“spawn()”作業啟動一個定時器。
console.time(`${array.length} items`);
//創建一個新的Web worker,並將我們的數據和這個函數傳遞給它。
//我們正在慢慢映射數組的每個數字到它的平方。
job.spawn((coll) => {
return coll.map((n) => {
var i = 0;
while(++i < n*n) {}
return i;
});
//“spawn()”的返回值是thenable。含義
//我們可以分配一個“then()”回調函數,
//就像返回的promise那樣。
}).then((result) => {
console.timeEnd(`${array.length} items`);
//→2500 items:3408.078ms
});
那麼現在,這很不錯; 我們不必擔心任何單調的Web worker生命周期任務。我們有一些數據和一些我們想要應用於數據的函數,我們希望與頁面上發生的其他作業並行運行。最吸引人的是熟悉的thenable,從那裡返回的spawn()方法。它適用於我們的併發應用程式,其中所有其他應用程式都被視為promise。
我們記錄處理我們提供的輸入數據的函數所需的時間。我們只為這個任務生成一個Web worker,因此在主線程中計算得到的結果與原來的時間相同。除了釋放主線程來處理DOM事件和重繪之外,沒有實際的性能提升。我們將看看是否可以使用一個不同的方法來提升併發級別。
當我們完成後,spawn()創建的worker立即終止。這為我們釋放了記憶體。但是,沒有併發級別來管理
spawn()的使用,如果我們願意,我們可以連續調用它100次。
Mapping and reducing
在上一節中,我們使用spawn()方法生成了一個worker線程。Parallel.js還有一個map()方法和一個reduce()方法。這個方法是讓事情變得更輕鬆。通過傳遞map()函數,庫將自動將其應用於作業數據中的每個項。類似的語義適用於reduce()方法。讓我們通過編寫一些代碼來看看它是如何工作的:
//一個數字輸入數組。
var array = new Array(2500)
.fill(null)
.map((v, i) => i);
//創建一個新的並行作業。
//這裡不會創建workers - 我們只傳遞我們正在使用的構造數據。
var job1 = new Parallel(array);
//為我們的“spawn()”作業啟動一個計時器。
console.time('JOB1');
//這裡的問題是Parallel.js會為每個數組元素創建一個新的worker,
//導致並行減速。
job1.map((n) => {
var i = 0;
while (++i < n*n) {}
return i;
}).reduce((pair) => {
//將數組項reduce為一個總和。
return pair[0] + pair[1];
}).then((data) => {
console.log('job1 reduced', data);
//→job1 reduced 5205208751
console.timeEnd('job1');
//→job1:59443.863ms
});
哎喲! 這是一個非常重要的性能 - 這裡發生了什麼?我們在這裡看到的是一種稱為並行減速的現象。當並行通信開銷過多時,會發生這種減速。在這個特定示例中發生這種情況的原因是由於Parallel.js在map()中處理數組的方式。每個數組項都通過一個worker。這並不意味著創建了2500個worker - 一個worker用於數組中的每個元素。創建的worker數量最多只能達到4或者我們在本書前面看到的navigator.hardwareConcurrency值。
在真正的開銷來自於發送的消息並收到了worker-5000個消息!這顯然不是最優的,因為由代碼中的定時器給證明。讓我們看看是否能夠做出一個對這些數字的大幅改善,同時保持大致相同的代碼結構:
//更快的執行。
var job2 = new Parallel(array);
console.time('job2');
//在映射數組之前,將數組拆分為較小的數組塊。
//這樣,每個Parallel.js worker都是處理數組而不是數組項。
//這避免了發送數千個Web worker消息。
job2.spawn((data) => {
var index = 0,
size = 1000,
results = [];
while (true) {
let chunk = data.slice(index, index + size);
if (chunk.length) {
results.push(chunk);
index += size;
} else {
return result;
}
}
}).map((array) => {
//返回數組塊的映射。
return array.map((n) => {
var i = 0;
while(++i < n * n) {}
return i;
});
}).reduce((pair) => {
//將數組塊或數字reduce為一個總和。
return(Array.isArray(pair[0]) ?
pair[0].reduce((r, v) => r + v) :
pair[0]) + (Array.isArray(pair[1]) ?
pair[1].reduce((r, v) => r + v) :
pair[1]);
}).then((data) => {
console.log('job2 reduced', data);
//→job2 resuced 5205208751
});
console.timeEnd('job2');
//→job2:2723.661ms
在這裡,我們可以看到的是在同樣的結果被產生,並且快得多。不同之處在於我們開始工作之前將數組切片成的陣列較小的數組塊。這些數組就是傳遞給workers的項,而不是單個的數。所以映射作業略微有好的改變,而平方一個數字,它映射一個較小的數組到平方的數組。該reduce的邏輯是稍微複雜一些,但總體來說,我們的做法是仍然是相同的。最重要的是,我們已經刪除了大量的消息傳遞瓶頸,他們在第一次執行造成不可接受的性能缺陷。
就像spawn()方法在返回時清理worker一樣,Parallel.js中的map()和reduce()方法也是如此。
釋放worker的缺點是,無論何時調用這些方法,都需要重新創建它們。我們將在下一節討論這個挑戰。
worker線程池
本章的最後一節介紹了worker線程池的概念。在上一節關於Parallel.js的介紹中,我們遇到了經常創建和終止worker的問題。這需要很多開銷。如果知道我們能夠運行的併發級別,那麼為什麼不分配一個可以承擔工作的靜態大小的worker線程池?
創建worker線程池的第一個設計任務是分配worker。下一步是通過將作業分發給池中的可用worker來計劃作業。最後,當所有worker都在運行時,我們需要考慮忙碌狀態。讓我們開始吧。
分配池
在考慮分配worker線程池之前,我們需要查看總體worker抽象池。我們如何希望它的外觀和行為是怎樣的?理想情況下,我們希望抽象池的外觀和行為類似於普通的專用worker。我們可以向線程池發佈消息並獲得promise作為響應。因此,雖然我們無法直接擴展Worker原型,但我們可以創建一個與Worker API非常相似的新的抽象。
我們現在來看一些代碼吧。這是我們將使用的初始抽象:
//表示Web worker線程的“池”,
//隱藏在後面單個Web worker介面的介面。
function WorkerPool(script) {
//併發級別,或者Web worker要創造的數量。
//這使用了“hardwareConcurrency”屬性(如果存在)。
//否則,預設為4,
//因為這是對最常見的CPU結構進行的合理猜測。
var concurrency = navigator.hardwareConcurrency || 4;
//worker實例本身存儲在Map中,作為鍵。
//我們馬上就會明白為什麼。
var workers = this.workers = new Map();
//對於發佈的消息存在隊列,所有worker都很忙。
//所以這可能永遠不會被用到的。
var queue = this.queue = [];
//用於下麵創建worker程式實例,
//以及添加事件監聽器。
var worker;
for (var i = 0; i < concurrency; i++) {
worker = new Worker(script);
worker.addEventListener('message', function(e) {
//我們使用“get()”方法來查找promise的“resolve()”函數。
//該worker是關鍵。我們調用的從worker返回的數據的解析器
//並且可以將其重置為null。
//這個很重要,因為它表示worker是空閑的,
//可以承擔更多工作。
workers.get(this)(e.data);
workers.set(this, null);
//如果有排隊的數據,我們得到第一個
//隊列中的“data”和“resolver”。
//我們用數據調用“postMessage()”之前,
//我們使用新的“resolve()”函數更新“workers”映射。
if (queue.length) {
var [data, resolver] = queue.shift();
workers.set(this, resolver);
this.postMessage(data);
}
//這是worker的初始設置,作為在“worker”映射中的鍵。
//它的值為null,意味著沒有解析函數,它可以承擔工作。
this.workers.set(worker, null);
}.bind(worker));
}
}
創建新的WorkerPool時,給定的腳本用於生成線程池中的所有worker。該worker屬性是一個Map實例,worker實例本身是作為鍵。我們將worker存儲為映射鍵的原因是我們可以輕鬆地查找適當的解析器函數來調用。
當給定的worker程式響應時,調用我們添加到每個worker的消息事件處理程式,這就是我們找的等待調用的解析器函數的地方。我們不可能調用錯誤的解析器,因為給定的worker在完成當前任務之前不會接受新的任務。
調度任務
現在我們將實現postMessage()方法。這是調用者用於將消息發佈到池中的一個worker。調用者不知道哪個worker滿足了他們的要求,他們也不關心。他們將promise作為返回值,並以worker響應作為解析值:
WorkerPool.prototype.postMessage = function(data) {
//“workers”Map映射實例,其中包含所有存儲的Web worker。
var workers = this.workers;
//當所有worker都很忙時消息被放在“queue”隊列中
var queue = this.queue;
//嘗試找一個可用的worker。
var worker = this.getWorker();
//promise會立即返回給調用者,
//即使沒有worker可用。
return new Promise(function(resolve) {
//如果找到了worker,我們可以更新Map映射,
//使用worker作為鍵,並使用“resolve()”函數作為值。
//如果沒有worker,那麼消息數據以及“resolve()”函數被推送到“queue”隊列。
if (worker) {
workers.set(worker, resolve);
worker.postMessage(data);
} else {
queue.push([data, resolve]);
}
});
};
它是promise執行器函數,實際負責查找第一個可用的worker併在那裡發佈我們的消息。當找到可用的worker時,我們還在我們的worker映射中設置了worker的解析器函數。如果池中沒有可用的worker程式,已發佈的消息則將進入隊列。此隊列在消息事件處理程式中清空。這是因為當worker返回消息時,這意味著worker是空閑的可以承擔更多工作,並且在返回空閑狀態之前檢查是否有任何worker排隊。
該getWorker()方法是一個簡單的輔助函數為我們查找下一個可用的worker。我們知道如果一個worker在workers映射中將其解析器函數設置為null,則可以執行該任務。最後,讓我們看看這個worker線程池的應用:
//創建一個新的線程池和一個負載計數器。
var pool = new WorkerPool('worker.js');
var workload = 0;
document.getElementById('work').addEventListener('click', function(e) {
//獲取我們要傳遞給worker的數據,
//併為此負載創建計數器。
var amount = +document.getElementById('amount').value,
timer = 'Workload' + (++workload);
console.time(timer);
//將消息傳遞給線程池,併在promise完成時,停止計時器。
pool.postMessage(amount).then(function(result) {
console.timeEnd(timer);
});
//如果消息開始排隊,
//我們的線程池就是過載並顯示警告。
if (pool.queue.length) {
console.warn('worker pool is getting busy...');
}
});
在這種使用場景中,我們有幾個表單控制項將參數化工作發送給worker。數字越大,工作時間越長; 它使用標準的work()函數來緩慢地對數字作平方。如果我們使用大量數字並頻繁單擊按鈕將消息發佈到線程池中,那麼最終我們將耗盡線程池中可用的資源。如果是這種情況,我們將顯示警告。但是,這僅用於故障排除,當線程池繁忙時,發佈的消息不會丟失,它們只是排隊等候。
小結
本章的重點是從代碼中刪除突兀的併發語法。它只是提高了我們應用程式成功運行的可能性,因為我們將擁有易於維護和構建的代碼。我們解決的第一個問題是通過使所有內容都是併發的方式來編寫併發代碼。當沒有所涉及的猜測成分時,我們的代碼就是一致的,不易受併發錯誤的影響。
然後,我們研究了抽象Web worker通信可以採取的各種方法。輔助函數是一個選項,因此擴展了postMessage()方法。然後,當我們需要UI響應時,我們解決了Web workers的一些限制。即使我們的大型數據集處理速度更快,我們仍然存在更新UI的問題。這是通過將Web worker作為生成器處理來完成的。
我們不必自己編寫所有這些JavaScript併發工具方法。我們花了一些時間來研究Parallel.js庫的各種功能和限制。我們以介紹Web worker線程池結束了本章。這些消除了與worker創建和終止相關的大量開銷,並且它們極大地簡化了任務的分配和結果的協調。
這些都是適用於前端的併發話題。現在是時候切換一下,使用NodeJS查看後端的JavaScript併發性。
最後補充下書籍章節目錄
- 《JavaScript併發編程》第一章 JavaScript併發簡介
- 《JavaScript併發編程》第二章 JavaScript運行模型
- 《JavaScript併發編程》第三章 使用Promises實現同步
- 《JavaScript併發編程》第四章 使用Generators實現惰性計算
- 《JavaScript併發編程》第五章 使用Web Workers
- 《JavaScript併發編程》第六章 實用的併發
- 《JavaScript併發編程》第七章 抽取併發邏輯
另外還有講解兩章nodeJs後端併發方面的,和一章項目實戰方面的,這裡就不再貼了,有興趣可轉向https://github.com/yzsunlei/javascript_concurrency_translation查看。