在編寫代碼時,我們應該有一些方法將程式像連接水管一樣連接起來 當我們需要獲取一些數據時,可以去通過"擰"其他的部分來達到目的。這也應該是IO應有的方式。 Doug McIlroy. October 11, 1964 為什麼應該使用stream? 在node中,I/O都是非同步的,所以在和硬碟以及網路的 ...
在編寫代碼時,我們應該有一些方法將程式像連接水管一樣連接起來 -- 當我們需要獲取一些數據時,可以去通過"擰"其他的部分來達到目的。這也應該是IO應有的方式。 -- Doug McIlroy. October 11, 1964
為什麼應該使用stream?
在node中,I/O都是非同步的,所以在和硬碟以及網路的交互過程中會涉及到傳遞迴調函數的過程。你之前可能會寫出這樣的代碼:
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
fs.readFile(__dirname + '/data.txt', function (err, data) {
res.end(data);
});});
server.listen(8000);
上面的這段代碼並沒有什麼問題,但是在每次請求時,我們都會把整個data.txt文件讀入到記憶體中,然後再把結果返回給客戶端。想想看,如果data.txt文件非常大,在響應大量用戶的併發請求時,程式可能會消耗大量的記憶體,這樣很可能會造成用戶連接緩慢的問題。其次,上面的代碼可能會造成很不好的用戶體驗,因為用戶在接收到任何的內容之前首先需要等待程式將文件內容完全讀入到記憶體中。所幸的是,(req,res)
參數都是流對象,這意味著我們可以使用一種更好的方法來實現上面的需求:
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(res);
});
server.listen(8000);
在這裡,.pipe()方法會自動幫助我們監聽data和end事件。上面的這段代碼不僅簡潔,而且data.txt文件中每一小段數據都將源源不斷的發送到客戶端。
除此之外,使用.pipe()方法還有別的好處,比如說它可以自動控制後端壓力,以便在客戶端連接緩慢的時候node可以將儘可能少的緩存放到記憶體中。
認識NodeJS中的stream
流(stream)是 Node.js 中處理流式數據的抽象介面。·stream
模塊用於構建實現了流介面的對象。
我們用到的很多核心模塊都是stream
的實例。 例如:http.clientRequest, process.stdout。
流可以是可讀的、可寫的、或者可讀可寫的。
所有的流都是 EventEmitter 的實例。
雖然我們平時開發過程中平常不會直接用到stream
模塊,但是也需要瞭解其運行機制。
對於想要實現自定義stream實例的開發者來說,就得好好研究stream的擴展API了,比如gulp的內部實現就大量用到了自定義的stream類型。
stream的類型
Node.js 中有四種基本的流類型:
- Writable - 可寫入數據的流(例如 fs.createWriteStream())。
- Readable - 可讀取數據的流(例如 fs.createReadStream())。
- Duplex - 可讀又可寫的流(例如 net.Socket)。
- Transform - 在讀寫過程中可以修改或轉換數據的 Duplex 流(例如 zlib.createDeflate())。
使用Stream可實現數據的流式處理,如:
var fs = require('fs')
// `fs.createReadStream`創建一個`Readable`對象以讀取`bigFile`的內容,並輸出到標準輸出
// 如果使用`fs.readFile`則可能由於文件過大而失敗
fs.createReadStream(bigFile).pipe(process.stdout)
Readable
Readable流可以產出數據,你可以將這些數據傳送到一個writable,transform或者duplex流中,只需要調用pipe()方法:
創建個readable流
var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);
rs.pipe(process.stdout);
下麵運行代碼
$ node read.js
beep boop
在上面的代碼中rs.push(null)的作用是告訴rs輸出數據應該結束了。
需要註意的一點是我們在將數據輸出到process.stdout之前已經將內容推送進readable流rs中,但是所有的數據依然是可寫的。這是因為在你使用.push()將數據推進一個readable流中時,一直要到另一個東西來消耗數據之前,數據都會存在一個緩存中。然而,在更多的情況下,我們想要的是當需要數據時數據才會產生,以此來避免大量的緩存數據。
流式消耗迭代器中的數據
我們可以通過定義一個._read函數來實現按需推送數據:
const Readable = require('stream').Readable
class ToReadable extends Readable {
constructor(iterator) {
super()
this.iterator = iterator
}
// 子類需要實現該方法
// 這是生產數據的邏輯
_read() {
const res = this.iterator.next()
if (res.done) {
// 數據源已枯竭,調用`push(null)`通知流
return this.push(null)
}
setTimeout(() => {
// 通過`push`方法將數據添加到流中
this.push(res.value + '\n')
}, 0)
}
}
module.exports = ToReadable
使用時,new ToReadable(iterator)會返回一個可讀流,下游可以流式的消耗迭代器中的數據。
const iterator = function (limit) {
return {
next: function () {
if (limit--) {
return { done: false, value: limit + Math.random() }
}
return { done: true }
}
}
}(1e10)
const readable = new ToReadable(iterator)
// 監聽`data`事件,一次獲取一個數據
readable.on('data', data => process.stdout.write(data))
// 所有數據均已讀完
readable.on('end', () => process.stdout.write('DONE'))
執行上述代碼,將會有100億個隨機數源源不斷地寫進標準輸出流。
創建可讀流時,需要繼承Readable,並實現_read方法。 * _read方法是從底層系統讀取具體數據的邏輯,即生產數據的邏輯。 * 在_read方法中,通過調用push(data)將數據放入可讀流中供下游消耗。 * 在_read方法中,可以同步調用push(data),也可以非同步調用。 * 當全部數據都生產出來後,必須調用push(null)來結束可讀流。 * 流一旦結束,便不能再調用push(data)添加數據。
可以通過監聽data事件的方式消耗可讀流。 * 在首次監聽其data事件後,readable便會持續不斷地調用_read(),通過觸發data事件將數據輸出。 * 第一次data事件會在下一個tick中觸發,所以,可以安全地將數據輸出前的邏輯放在事件監聽後(同一個tick中)。 * 當數據全部被消耗時,會觸發end事件。
上面的例子中,process.stdout代表標準輸出流,實際是一個可寫流。
Writable
一個writable流指的是只能流進不能流出的流:
src.pipe(writableStream)
創建一個writable流
只需要定義一個._write(chunk,enc,next)函數,你就可以將一個readable流的數據釋放到其中:
const Writable = require('stream').Writable
const writable = Writable()
// 實現`_write`方法
// 這是將數據寫入底層的邏輯
writable._write = function (data, enc, next) {
// 將流中的數據寫入底層
process.stdout.write(data.toString().toUpperCase())
// 寫入完成時,調用`next()`方法通知流傳入下一個數據
process.nextTick(next)
}
// 所有數據均已寫入底層
writable.on('finish', () => process.stdout.write('DONE'))
// 將一個數據寫入流中
writable.write('a' + '\n')
writable.write('b' + '\n')
writable.write('c' + '\n')
// 再無數據寫入流時,需要調用`end`方法
writable.end()
運行結果如下:
$ node 1.js
A
B
C
DONE
- 上游通過調用writable.write(data)將數據寫入可寫流中。write()方法會調用_write()將data寫入底層。
- _write中,當數據成功寫入底層後,必須調用next(err)告訴流開始處理下一個數據。
- 在從一個readable流向一個writable流傳數據的過程中,數據會自動被轉換為
Buffer
對象,除非你在創建writable流的時候制定了decodeStrings
參數為false
:Writable({decodeStrings: false})
。 - 如果你需要傳遞對象,需要指定
objectMode
參數為true
,Writable({ objectMode: true })
。 - 在end方法調用後,當所有底層的寫操作均完成時,會觸發finish事件。
- 上游必須調用writable.end(data)來結束可寫流,data是可選的。此後,不能再調用write新增數據。
- next的調用既可以是同步的,也可以是非同步的.
_write的參數:
- 第一個參數,
chunk
表寫進來的數據。 - 第二個參數
enc
代表編碼的字元串,但是只有在opts.decodeString
為false
的時候你才可以寫一個字元串。 - 第三個參數,
next(err)
是一個回調函數,使用這個回調函數你可以告訴數據消耗者可以寫更多的數據。你可以有選擇性的傳遞一個錯誤對象error
,這時會在流實體上觸發一個emit
事件。
向一個writable流中寫東西
如果你需要向一個writable流中寫東西,只需要調用.write(data)即可。
process.stdout.write('beep boop\n');
為了告訴一個writable流你已經寫完畢了,只需要調用.end()方法。你也可以使用.end(data)在結束前再寫一些數據。
var fs = require('fs');
var ws = fs.createWriteStream('message.txt');
ws.write('beep ');
setTimeout(function () {
ws.end('boop\n');
},1000);
運行結果如下所示:
$ node writing.js
$ cat message.txt
beep boop
如果你在創建writable流時指定了highWaterMark參數,那麼當沒有更多數據寫入時,調用.write()方法將會返回false。如果你想要等待緩存情況,可以監聽drain事件。
Duplex
Duplex流是一個可讀也可寫的流,就好像一個電話,可以接收也可以發送語音。一個rpc交換是一個duplex流的最好的例子。如果你看到過下麵這樣的代碼:
a.pipe(b).pipe(a)
那麼你需要處理的就是一個duplex流對象。
實現一個Duplex
var Duplex = require('stream').Duplex
var duplex = Duplex()
// 可讀端底層讀取邏輯
duplex._read = function () {
this._readNum = this._readNum || 0
if (this._readNum > 1) {
this.push(null)
} else {
this.push('' + (this._readNum++))
}
}
// 可寫端底層寫邏輯
duplex._write = function (buf, enc, next) {
// a, b
process.stdout.write('_write ' + buf.toString() + '\n')
next()
}
// 0, 1
duplex.on('data', data => console.log('ondata', data.toString()))
duplex.write('a')
duplex.write('b')
duplex.end()
上面的代碼中實現了_read方法,所以可以監聽data事件來消耗Duplex產生的數據。 同時,又實現了_write方法,可作為下游去消耗數據。
因為它既可讀又可寫,所以稱它有兩端:可寫端和可讀端。 可寫端的介面與Writable一致,作為下游來使用;可讀端的介面與Readable一致,作為上游來使用。
Transform
Transform stream是Duplex stream的特例,也就是說,Transform stream也同時可讀可寫。跟Duplex stream的區別點在於,Transform stream的輸出與輸入是存在相關性的。
const Transform = require('stream').Transform
class Rotate extends Transform {
constructor(n) {
super()
// 將字母旋轉`n`個位置
this.offset = (n || 13) % 26
}
// 將可寫端寫入的數據變換後添加到可讀端
_transform (buf, enc, next) {
var res = buf.toString().split('').map(c => {
var code = c.charCodeAt(0)
if (c >= 'a' && c <= 'z') {
code += this.offset
if (code > 'z'.charCodeAt(0)) {
code -= 26
}
} else if (c >= 'A' && c <= 'Z') {
code += this.offset
if (code > 'Z'.charCodeAt(0)) {
code -= 26
}
}
return String.fromCharCode(code)
}).join('')
// 調用push方法將變換後的數據添加到可讀端
this.push(res)
// 調用next方法準備處理下一個
next()
}
}
var transform = new Rotate(3)
transform.on('data', data => process.stdout.write(data))
transform.write('hello, ')
transform.write('world!')
transform.end()
執行結果如下:
$ node 1.js
khoor, zruog!
Tranform繼承自Duplex,並已經實現了_read和_write方法,同時要求用戶實現一個_transform方法。
相關鏈接
https://nodejs.org/api/stream.html