本文介紹了NodeJS中流(Stream)的概念、類型和應用。流通過將數據分成小塊進行處理,優化了記憶體使用和數據處理效率。文章涵蓋了四種基本流類型:可讀流、可寫流、雙工流和轉換流,並通過實例代碼演示瞭如何使用流進行高效的數據傳輸和處理。 ...
在現代應用開發中,數據處理的效率和資源管理尤為重要。NodeJS作為一種高效的JavaScript運行時環境,通過其強大的流(Stream)功能,提供了處理大規模數據的便捷方式。流式數據處理不僅能夠優化記憶體使用,還可以提高數據處理的實時性和效率。下文將介紹NodeJS中的流概念、流的類型以及如何利用流來進行數據傳輸和處理。
流的基本概念
流式數據的特點是將數據分成一個一個的chunk,每次操作只針對其中的一小部分。
因此流式數據的讀寫操作不需要將整個數據保存在記憶體中(處理完就丟掉)。
常用於視頻這種包含大量數據的應用場景,也可以在時間和空間角度上更有效地處理數據:
- 時間:從開始讀到流就可以處理數據並反饋給用戶了,不需要等待全部數據到達,例如:ChatGPT的回答,就是流式數據傳輸,一個字一個字地顯示出來;
- 空間:如上文所說,在某些場景下不需要將整個數據保存在記憶體中。
NodeJS提供的API
NodeJS中的node:stream
模塊提供了對流數據進行處理的抽象介面。
NodeJS中的所有流對象都可以監聽和觸發事件,都是EventEmitter
的實例對象。
下麵的表格列出了每一種基本流常用且重要的事件
NodeJS中有四種基本的流類型:可讀流、可寫流、雙工流和轉換流。
描述 | 案例 | 事件 | 方法 | |
---|---|---|---|---|
可讀流 Readable Streams | 可用於讀(消費)數據 | 1. http request2. fs read streams |
data end |
pipe() read |
可寫流 Writable Streams | 可用於寫(生產)數據 | 1. http responses2. fs write streams |
drain finish |
write() end() |
雙工流 Duplex Streams | 可讀可寫 | net 網路套接字 |
||
轉換流 Transform Streams | 雙工流,在讀寫的時候可修改 | zlib Gzip creation |
流式數據傳輸案例
簡介:創建一個比較大的文本文件,使用NodeJS啟動一個服務,介面分別以三種方法返迴文件內容。
代碼:
方法一 不使用流
讀取整個文件的內容之後再返回;
讀取大文件的時候不推薦這樣寫,因為整個文件會先被完整地從磁碟讀取到記憶體中,再返回給客戶端。
import fs from 'node:fs';
import http from 'node:http';
const server = http.createServer();
server.on('request', (req, res)=>{
// CORS
res.setHeader('Access-Control-Allow-Origin', '*');
// Solution 1
fs.readFile('test.txt', (err, data)=>{
if(err)console.log(err);
res.end(data);
});
});
server.listen(3000, ()=>{
console.log('listening...');
});
方法二 可讀流
使用可讀流,優點是邊讀文件邊返回,只有當前處理的chunk會占據記憶體;
import fs from 'node:fs';
import http from 'node:http';
const server = http.createServer();
server.on('request', (req, res)=>{
// CORS
res.setHeader('Access-Control-Allow-Origin', '*');
// Solution 2: Streams
const readable = fs.createReadStream('test.txt');
readable.on('data', (chunk)=>{
res.write(chunk);
});
readable.on('end', ()=>{
res.end();
});
readable.on('error', (err)=>{
console.log(err);
res.statusCode = 500;
res.end('File reading error!');
});
});
server.listen(3000, ()=>{
console.log('listening...');
});
backpressure
這裡介紹一下流控(Flow Controll)領域中的一個名詞:Backpressure(翻譯為 反壓/背壓)。
在Node.js和其他流處理系統中,backpressure(反壓/背壓)是指生產者生成數據的速度超過消費者處理數據的速度時產生的一種控制機制。
當可讀流(Readable Stream)讀取數據的速度快於可寫流(Writable Stream)寫入數據的速度時,就會產生backpressure。為了防止這種情況,可讀流會根據可寫流的消費能力進行控制,暫停或減慢讀取數據的速度。
具體機制:
- 可寫流的緩衝區:可寫流內部有一個緩衝區,用於暫存數據。如果這個緩衝區被填滿,流會返回
false
,表示消費者已經無法及時處理更多的數據。 - 暫停和恢復:當可寫流返回
false
時,可讀流會暫停讀取數據。只有在可寫流的緩衝區有足夠的空間後,可讀流才會恢復讀取。 - 事件驅動:Node.js 流通過事件驅動的方式處理backpressure。當可寫流的緩衝區有空間時,會觸發
drain
事件,通知可讀流繼續讀取數據。
示例代碼:通過手動暫停和恢複合理利用緩衝區,避免數據丟失、記憶體溢出和資源耗盡。
import fs from 'node:fs';
const readableStream = fs.createReadStream('input.txt');
const writableStream = fs.createWriteStream('output.txt');
readableStream.on('data', (chunk)=>{
const canWrite = writableStream.write(chunk);
// 可寫流的緩衝區空間不夠了,暫停讀數據(生產)
if(!canWrite){
readableStream.pause();
}
});
// 當可寫流的緩衝區空間足夠,會觸發`drain`事件
// 可以繼續讀數據
writableStream.on('drain', ()=>{
readableStream.resume();
});
// 讀取結束,停止寫入
readableStream.on('end', ()=>{
writableStream.end();
console.log('done.');
});
pipe
在 Node.js 中,pipe
方法提供了一種更簡單和自動化的方式來處理流之間的 backpressure 問題。pipe
方法可以連接可讀流和可寫流,並自動處理 backpressure,無需手動暫停和恢復流。
示例代碼:
import fs from 'node:fs';
const readableStream = fs.createReadStream('input.txt');
const writableStream = fs.createWriteStream('output.txt');
// 統一錯誤處理函數
function handleError(err) {
console.error('發生錯誤:', err);
}
// 使用 pipe 連接可讀流和可寫流,並處理錯誤
readableStream.pipe(writableStream)
.on('error', handleError);
// 處理可讀流和可寫流的錯誤
readableStream.on('error', handleError);
writableStream.on('error', handleError);
語法是:
readableSource.pipe(writableDestination);
接下來回到上文的關於流式數據網路傳輸的案例。
方法三 pipe
使用pipe
可以簡化許多代碼,核心代碼就是
readable.pipe(res);
示例代碼:
import fs from 'node:fs';
import http from 'node:http';
const server = http.createServer();
server.on('request', (req, res)=>{
// CORS
res.setHeader('Access-Control-Allow-Origin', '*');
// Solution 3: Pipe
const readable = fs.createReadStream('test.txt');
readable.pipe(res).on('error', ()=>{
res.statusCode = 500;
res.end('File reading error!');
});
});
server.listen(3000, ()=>{
console.log('listening...');
});
總結
- 流(Stream)在NodeJS中的工作原理是將數據分成一個個小塊進行處理,這樣無需將整個數據載入到記憶體中,從而優化了記憶體使用和數據處理效率。
- 流在NodeJS中有四種基本類型:可讀流、可寫流、雙工流和轉換流,每種類型都有其特定的應用場景和事件機制。
- 流的應用場景主要包括視頻播放、文件處理、實時數據傳輸等。在這些場景中,流通過邊讀邊寫、邊處理邊傳輸的方式,可以有效地提高數據處理的實時性和系統的性能。
參考
[1] B站 - NodeJS教程
[2] 知乎 - 如何形象的描述反應式編程中的背壓(Backpressure)機制?