每天學點node系列-stream

来源:https://www.cnblogs.com/jingh/archive/2019/07/05/11140651.html
-Advertisement-
Play Games

在編寫代碼時,我們應該有一些方法將程式像連接水管一樣連接起來 當我們需要獲取一些數據時,可以去通過"擰"其他的部分來達到目的。這也應該是IO應有的方式。 Doug McIlroy. October 11, 1964 為什麼應該使用stream? 在node中,I/O都是非同步的,所以在和硬碟以及網路的 ...


在編寫代碼時,我們應該有一些方法將程式像連接水管一樣連接起來 -- 當我們需要獲取一些數據時,可以去通過"擰"其他的部分來達到目的。這也應該是IO應有的方式。 -- Doug McIlroy. October 11, 1964

為什麼應該使用stream?

在node中,I/O都是非同步的,所以在和硬碟以及網路的交互過程中會涉及到傳遞迴調函數的過程。你之前可能會寫出這樣的代碼:

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });});
server.listen(8000);

上面的這段代碼並沒有什麼問題,但是在每次請求時,我們都會把整個data.txt文件讀入到記憶體中,然後再把結果返回給客戶端。想想看,如果data.txt文件非常大,在響應大量用戶的併發請求時,程式可能會消耗大量的記憶體,這樣很可能會造成用戶連接緩慢的問題。其次,上面的代碼可能會造成很不好的用戶體驗,因為用戶在接收到任何的內容之前首先需要等待程式將文件內容完全讀入到記憶體中。所幸的是,(req,res) 參數都是流對象,這意味著我們可以使用一種更好的方法來實現上面的需求:

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

在這裡,.pipe()方法會自動幫助我們監聽data和end事件。上面的這段代碼不僅簡潔,而且data.txt文件中每一小段數據都將源源不斷的發送到客戶端。
除此之外,使用.pipe()方法還有別的好處,比如說它可以自動控制後端壓力,以便在客戶端連接緩慢的時候node可以將儘可能少的緩存放到記憶體中。

認識NodeJS中的stream

流(stream)是 Node.js 中處理流式數據的抽象介面。·stream 模塊用於構建實現了流介面的對象。

我們用到的很多核心模塊都是stream的實例。 例如:http.clientRequest, process.stdout。

流可以是可讀的、可寫的、或者可讀可寫的。

所有的流都是 EventEmitter 的實例。

雖然我們平時開發過程中平常不會直接用到stream模塊,但是也需要瞭解其運行機制。

對於想要實現自定義stream實例的開發者來說,就得好好研究stream的擴展API了,比如gulp的內部實現就大量用到了自定義的stream類型。

stream的類型

Node.js 中有四種基本的流類型:

  • Writable - 可寫入數據的流(例如 fs.createWriteStream())。
  • Readable - 可讀取數據的流(例如 fs.createReadStream())。
  • Duplex - 可讀又可寫的流(例如 net.Socket)。
  • Transform - 在讀寫過程中可以修改或轉換數據的 Duplex 流(例如 zlib.createDeflate())。

使用Stream可實現數據的流式處理,如:

var fs = require('fs') 
// `fs.createReadStream`創建一個`Readable`對象以讀取`bigFile`的內容,並輸出到標準輸出 
// 如果使用`fs.readFile`則可能由於文件過大而失敗 
fs.createReadStream(bigFile).pipe(process.stdout)

Readable

Readable流可以產出數據,你可以將這些數據傳送到一個writable,transform或者duplex流中,只需要調用pipe()方法:

創建個readable流

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);

rs.pipe(process.stdout);

下麵運行代碼

$ node read.js
beep boop

在上面的代碼中rs.push(null)的作用是告訴rs輸出數據應該結束了。
需要註意的一點是我們在將數據輸出到process.stdout之前已經將內容推送進readable流rs中,但是所有的數據依然是可寫的。這是因為在你使用.push()將數據推進一個readable流中時,一直要到另一個東西來消耗數據之前,數據都會存在一個緩存中。然而,在更多的情況下,我們想要的是當需要數據時數據才會產生,以此來避免大量的緩存數據。

流式消耗迭代器中的數據

我們可以通過定義一個._read函數來實現按需推送數據:

const Readable = require('stream').Readable
class ToReadable extends Readable {
    constructor(iterator) {
        super()
        this.iterator = iterator
    }
    // 子類需要實現該方法
    // 這是生產數據的邏輯
    _read() {
        const res = this.iterator.next()
        if (res.done) {
            // 數據源已枯竭,調用`push(null)`通知流
            return this.push(null)
        }
        setTimeout(() => {
        // 通過`push`方法將數據添加到流中
            this.push(res.value + '\n')
        }, 0)
    }
}
module.exports = ToReadable

使用時,new ToReadable(iterator)會返回一個可讀流,下游可以流式的消耗迭代器中的數據。

const iterator = function (limit) {
    return {
        next: function () {
            if (limit--) {
                return { done: false, value: limit + Math.random() }
            }
            return { done: true }
        }
    }
}(1e10)
const readable = new ToReadable(iterator)
// 監聽`data`事件,一次獲取一個數據
readable.on('data', data => process.stdout.write(data))
// 所有數據均已讀完
readable.on('end', () => process.stdout.write('DONE'))

執行上述代碼,將會有100億個隨機數源源不斷地寫進標準輸出流。

創建可讀流時,需要繼承Readable,並實現_read方法。 * _read方法是從底層系統讀取具體數據的邏輯,即生產數據的邏輯。 * 在_read方法中,通過調用push(data)將數據放入可讀流中供下游消耗。 * 在_read方法中,可以同步調用push(data),也可以非同步調用。 * 當全部數據都生產出來後,必須調用push(null)來結束可讀流。 * 流一旦結束,便不能再調用push(data)添加數據。

可以通過監聽data事件的方式消耗可讀流。 * 在首次監聽其data事件後,readable便會持續不斷地調用_read(),通過觸發data事件將數據輸出。 * 第一次data事件會在下一個tick中觸發,所以,可以安全地將數據輸出前的邏輯放在事件監聽後(同一個tick中)。 * 當數據全部被消耗時,會觸發end事件。

上面的例子中,process.stdout代表標準輸出流,實際是一個可寫流。

Writable

一個writable流指的是只能流進不能流出的流:

src.pipe(writableStream)

創建一個writable流

只需要定義一個._write(chunk,enc,next)函數,你就可以將一個readable流的數據釋放到其中:

const Writable = require('stream').Writable

const writable = Writable()
// 實現`_write`方法
// 這是將數據寫入底層的邏輯
writable._write = function (data, enc, next) {
  // 將流中的數據寫入底層
  process.stdout.write(data.toString().toUpperCase())
  // 寫入完成時,調用`next()`方法通知流傳入下一個數據
  process.nextTick(next)
}

// 所有數據均已寫入底層
writable.on('finish', () => process.stdout.write('DONE'))

// 將一個數據寫入流中
writable.write('a' + '\n')
writable.write('b' + '\n')
writable.write('c' + '\n')

// 再無數據寫入流時,需要調用`end`方法
writable.end()

運行結果如下:

$ node 1.js
A
B
C
DONE
  • 上游通過調用writable.write(data)將數據寫入可寫流中。write()方法會調用_write()將data寫入底層。
  • _write中,當數據成功寫入底層後,必須調用next(err)告訴流開始處理下一個數據。
  • 在從一個readable流向一個writable流傳數據的過程中,數據會自動被轉換為Buffer對象,除非你在創建writable流的時候制定了decodeStrings參數為false:Writable({decodeStrings: false})
  • 如果你需要傳遞對象,需要指定objectMode參數為trueWritable({ objectMode: true })
  • 在end方法調用後,當所有底層的寫操作均完成時,會觸發finish事件。
  • 上游必須調用writable.end(data)來結束可寫流,data是可選的。此後,不能再調用write新增數據。
  • next的調用既可以是同步的,也可以是非同步的.

_write的參數:

  • 第一個參數,chunk表寫進來的數據。
  • 第二個參數 enc 代表編碼的字元串,但是只有在opts.decodeStringfalse的時候你才可以寫一個字元串。
  • 第三個參數,next(err)是一個回調函數,使用這個回調函數你可以告訴數據消耗者可以寫更多的數據。你可以有選擇性的傳遞一個錯誤對象error,這時會在流實體上觸發一個emit事件。

向一個writable流中寫東西

如果你需要向一個writable流中寫東西,只需要調用.write(data)即可。

    process.stdout.write('beep boop\n');

為了告訴一個writable流你已經寫完畢了,只需要調用.end()方法。你也可以使用.end(data)在結束前再寫一些數據。

var fs = require('fs');
var ws = fs.createWriteStream('message.txt');

ws.write('beep ');

setTimeout(function () {
    ws.end('boop\n');
},1000);

運行結果如下所示:

$ node writing.js 
$ cat message.txt
beep boop

如果你在創建writable流時指定了highWaterMark參數,那麼當沒有更多數據寫入時,調用.write()方法將會返回false。如果你想要等待緩存情況,可以監聽drain事件。

Duplex

Duplex流是一個可讀也可寫的流,就好像一個電話,可以接收也可以發送語音。一個rpc交換是一個duplex流的最好的例子。如果你看到過下麵這樣的代碼:

a.pipe(b).pipe(a)

那麼你需要處理的就是一個duplex流對象。

實現一個Duplex

var Duplex = require('stream').Duplex
var duplex = Duplex()
// 可讀端底層讀取邏輯
duplex._read = function () {
    this._readNum = this._readNum || 0
    if (this._readNum > 1) {
        this.push(null)
    } else {
        this.push('' + (this._readNum++))
    }
}
// 可寫端底層寫邏輯
duplex._write = function (buf, enc, next) {
    // a, b
    process.stdout.write('_write ' + buf.toString() + '\n')
    next()
}
// 0, 1
duplex.on('data', data => console.log('ondata', data.toString()))
duplex.write('a')
duplex.write('b')
duplex.end()

上面的代碼中實現了_read方法,所以可以監聽data事件來消耗Duplex產生的數據。 同時,又實現了_write方法,可作為下游去消耗數據。
因為它既可讀又可寫,所以稱它有兩端:可寫端和可讀端。 可寫端的介面與Writable一致,作為下游來使用;可讀端的介面與Readable一致,作為上游來使用。

Transform

Transform stream是Duplex stream的特例,也就是說,Transform stream也同時可讀可寫。跟Duplex stream的區別點在於,Transform stream的輸出與輸入是存在相關性的。

const Transform = require('stream').Transform
class Rotate extends Transform {
    constructor(n) {
        super()
        // 將字母旋轉`n`個位置
        this.offset = (n || 13) % 26
    }
    // 將可寫端寫入的數據變換後添加到可讀端
    _transform (buf, enc, next) {
        var res = buf.toString().split('').map(c => {
            var code = c.charCodeAt(0)
            if (c >= 'a' && c <= 'z') {
                code += this.offset
                if (code > 'z'.charCodeAt(0)) {
                    code -= 26
                }
            } else if (c >= 'A' && c <= 'Z') {
                code += this.offset
                if (code > 'Z'.charCodeAt(0)) {
                    code -= 26
                }
            }
            return String.fromCharCode(code)
        }).join('')
        // 調用push方法將變換後的數據添加到可讀端
        this.push(res)
        // 調用next方法準備處理下一個
        next()
    }
}
var transform = new Rotate(3)
transform.on('data', data => process.stdout.write(data))
transform.write('hello, ')
transform.write('world!')
transform.end()

執行結果如下:

$ node 1.js
khoor, zruog!

Tranform繼承自Duplex,並已經實現了_read和_write方法,同時要求用戶實現一個_transform方法。

相關鏈接

https://nodejs.org/api/stream.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.Redis單進程: 單進程模型來處理客戶端的請求。對讀寫等事件的響應是通過對epoll函數的包裝來做到的。Redis的實際處理速度完全依靠主進程的執行效率。epoll是Linux內核為處理大批量文件描述符而作了改進的epoll,是Linux下多路復用IO介面select/poll的增強版本,它能 ...
  • [TOC] 1.while迴圈 死迴圈 打斷死迴圈: 關鍵字: 2.字元串格式化: 3.運算符 4.編碼 四種(重要) 單位轉換 ...
  • 一. 安全性問題 線程安全的本質是正確性,而正確性的含義是程式按照預期執行 理論上線程安全的程式,應該要避免出現可見性問題(CPU緩存)、原子性問題(線程切換)和有序性問題(編譯優化) 需要分析是否存線上程安全問題的場景:存在共用數據且數據會發生變化,即有多個線程會同時讀寫同一個數據 針對該理論的解 ...
  • this 註意 public class ThisDemo { public static void main(String[] args) { } } class Person{ public String name; public int age; public boolean gender; ...
  • 1.TP框架基礎 1.1目錄結構 1.2配置文件 1.框架主配置文件(慣例配置文件) thinkphp/convention.php 2. 應用公共配置文件 application/config.php, application/database.php 對整個應用生效 3.模塊配置文件 appli ...
  • 一、數字 整數 Python可以處理任意大小的整數,當然包括負整數,在程式中的表示方法和數學上的寫法一模一樣,例如:1,100,-8080,0,等等。 電腦由於使用二進位,所以,有時候用十六進位表示整數比較方便,十六進位用0x首碼和0-9,a-f表示,例如:0xff00,0xa5b4c3d2,等等 ...
  • 7.6 多態性 1 什麼是多態性 多態指的是同一種事物多種形態,在程式中用繼承可以表現出多態。多態性:可以在不用考慮對象具體類型的前提下而直接使用對象下的方法 2、為什要用多態 用基類創建一套統一的規則,強制子類去遵循(使用抽象類實現),可以在不考慮對象具體的類的情況下直接參考基類的標準使用對象 7 ...
  • 一、二叉樹回憶 上一篇我們對數據結構中常用的樹做了介紹,本篇博客主要以二叉樹為例,講解一下樹的數據結構和代碼實現。回顧二叉樹:二叉樹是每個節點最多有兩個子樹的樹結構。通常子樹被稱作“左子樹”(left subtree)和“右子樹”(right subtree) 二、二叉樹比鏈表好在哪裡? 看看如下的 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...