每天學點node系列-stream

来源:https://www.cnblogs.com/jingh/archive/2019/07/05/11140651.html
-Advertisement-
Play Games

在編寫代碼時,我們應該有一些方法將程式像連接水管一樣連接起來 當我們需要獲取一些數據時,可以去通過"擰"其他的部分來達到目的。這也應該是IO應有的方式。 Doug McIlroy. October 11, 1964 為什麼應該使用stream? 在node中,I/O都是非同步的,所以在和硬碟以及網路的 ...


在編寫代碼時,我們應該有一些方法將程式像連接水管一樣連接起來 -- 當我們需要獲取一些數據時,可以去通過"擰"其他的部分來達到目的。這也應該是IO應有的方式。 -- Doug McIlroy. October 11, 1964

為什麼應該使用stream?

在node中,I/O都是非同步的,所以在和硬碟以及網路的交互過程中會涉及到傳遞迴調函數的過程。你之前可能會寫出這樣的代碼:

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });});
server.listen(8000);

上面的這段代碼並沒有什麼問題,但是在每次請求時,我們都會把整個data.txt文件讀入到記憶體中,然後再把結果返回給客戶端。想想看,如果data.txt文件非常大,在響應大量用戶的併發請求時,程式可能會消耗大量的記憶體,這樣很可能會造成用戶連接緩慢的問題。其次,上面的代碼可能會造成很不好的用戶體驗,因為用戶在接收到任何的內容之前首先需要等待程式將文件內容完全讀入到記憶體中。所幸的是,(req,res) 參數都是流對象,這意味著我們可以使用一種更好的方法來實現上面的需求:

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

在這裡,.pipe()方法會自動幫助我們監聽data和end事件。上面的這段代碼不僅簡潔,而且data.txt文件中每一小段數據都將源源不斷的發送到客戶端。
除此之外,使用.pipe()方法還有別的好處,比如說它可以自動控制後端壓力,以便在客戶端連接緩慢的時候node可以將儘可能少的緩存放到記憶體中。

認識NodeJS中的stream

流(stream)是 Node.js 中處理流式數據的抽象介面。·stream 模塊用於構建實現了流介面的對象。

我們用到的很多核心模塊都是stream的實例。 例如:http.clientRequest, process.stdout。

流可以是可讀的、可寫的、或者可讀可寫的。

所有的流都是 EventEmitter 的實例。

雖然我們平時開發過程中平常不會直接用到stream模塊,但是也需要瞭解其運行機制。

對於想要實現自定義stream實例的開發者來說,就得好好研究stream的擴展API了,比如gulp的內部實現就大量用到了自定義的stream類型。

stream的類型

Node.js 中有四種基本的流類型:

  • Writable - 可寫入數據的流(例如 fs.createWriteStream())。
  • Readable - 可讀取數據的流(例如 fs.createReadStream())。
  • Duplex - 可讀又可寫的流(例如 net.Socket)。
  • Transform - 在讀寫過程中可以修改或轉換數據的 Duplex 流(例如 zlib.createDeflate())。

使用Stream可實現數據的流式處理,如:

var fs = require('fs') 
// `fs.createReadStream`創建一個`Readable`對象以讀取`bigFile`的內容,並輸出到標準輸出 
// 如果使用`fs.readFile`則可能由於文件過大而失敗 
fs.createReadStream(bigFile).pipe(process.stdout)

Readable

Readable流可以產出數據,你可以將這些數據傳送到一個writable,transform或者duplex流中,只需要調用pipe()方法:

創建個readable流

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);

rs.pipe(process.stdout);

下麵運行代碼

$ node read.js
beep boop

在上面的代碼中rs.push(null)的作用是告訴rs輸出數據應該結束了。
需要註意的一點是我們在將數據輸出到process.stdout之前已經將內容推送進readable流rs中,但是所有的數據依然是可寫的。這是因為在你使用.push()將數據推進一個readable流中時,一直要到另一個東西來消耗數據之前,數據都會存在一個緩存中。然而,在更多的情況下,我們想要的是當需要數據時數據才會產生,以此來避免大量的緩存數據。

流式消耗迭代器中的數據

我們可以通過定義一個._read函數來實現按需推送數據:

const Readable = require('stream').Readable
class ToReadable extends Readable {
    constructor(iterator) {
        super()
        this.iterator = iterator
    }
    // 子類需要實現該方法
    // 這是生產數據的邏輯
    _read() {
        const res = this.iterator.next()
        if (res.done) {
            // 數據源已枯竭,調用`push(null)`通知流
            return this.push(null)
        }
        setTimeout(() => {
        // 通過`push`方法將數據添加到流中
            this.push(res.value + '\n')
        }, 0)
    }
}
module.exports = ToReadable

使用時,new ToReadable(iterator)會返回一個可讀流,下游可以流式的消耗迭代器中的數據。

const iterator = function (limit) {
    return {
        next: function () {
            if (limit--) {
                return { done: false, value: limit + Math.random() }
            }
            return { done: true }
        }
    }
}(1e10)
const readable = new ToReadable(iterator)
// 監聽`data`事件,一次獲取一個數據
readable.on('data', data => process.stdout.write(data))
// 所有數據均已讀完
readable.on('end', () => process.stdout.write('DONE'))

執行上述代碼,將會有100億個隨機數源源不斷地寫進標準輸出流。

創建可讀流時,需要繼承Readable,並實現_read方法。 * _read方法是從底層系統讀取具體數據的邏輯,即生產數據的邏輯。 * 在_read方法中,通過調用push(data)將數據放入可讀流中供下游消耗。 * 在_read方法中,可以同步調用push(data),也可以非同步調用。 * 當全部數據都生產出來後,必須調用push(null)來結束可讀流。 * 流一旦結束,便不能再調用push(data)添加數據。

可以通過監聽data事件的方式消耗可讀流。 * 在首次監聽其data事件後,readable便會持續不斷地調用_read(),通過觸發data事件將數據輸出。 * 第一次data事件會在下一個tick中觸發,所以,可以安全地將數據輸出前的邏輯放在事件監聽後(同一個tick中)。 * 當數據全部被消耗時,會觸發end事件。

上面的例子中,process.stdout代表標準輸出流,實際是一個可寫流。

Writable

一個writable流指的是只能流進不能流出的流:

src.pipe(writableStream)

創建一個writable流

只需要定義一個._write(chunk,enc,next)函數,你就可以將一個readable流的數據釋放到其中:

const Writable = require('stream').Writable

const writable = Writable()
// 實現`_write`方法
// 這是將數據寫入底層的邏輯
writable._write = function (data, enc, next) {
  // 將流中的數據寫入底層
  process.stdout.write(data.toString().toUpperCase())
  // 寫入完成時,調用`next()`方法通知流傳入下一個數據
  process.nextTick(next)
}

// 所有數據均已寫入底層
writable.on('finish', () => process.stdout.write('DONE'))

// 將一個數據寫入流中
writable.write('a' + '\n')
writable.write('b' + '\n')
writable.write('c' + '\n')

// 再無數據寫入流時,需要調用`end`方法
writable.end()

運行結果如下:

$ node 1.js
A
B
C
DONE
  • 上游通過調用writable.write(data)將數據寫入可寫流中。write()方法會調用_write()將data寫入底層。
  • _write中,當數據成功寫入底層後,必須調用next(err)告訴流開始處理下一個數據。
  • 在從一個readable流向一個writable流傳數據的過程中,數據會自動被轉換為Buffer對象,除非你在創建writable流的時候制定了decodeStrings參數為false:Writable({decodeStrings: false})
  • 如果你需要傳遞對象,需要指定objectMode參數為trueWritable({ objectMode: true })
  • 在end方法調用後,當所有底層的寫操作均完成時,會觸發finish事件。
  • 上游必須調用writable.end(data)來結束可寫流,data是可選的。此後,不能再調用write新增數據。
  • next的調用既可以是同步的,也可以是非同步的.

_write的參數:

  • 第一個參數,chunk表寫進來的數據。
  • 第二個參數 enc 代表編碼的字元串,但是只有在opts.decodeStringfalse的時候你才可以寫一個字元串。
  • 第三個參數,next(err)是一個回調函數,使用這個回調函數你可以告訴數據消耗者可以寫更多的數據。你可以有選擇性的傳遞一個錯誤對象error,這時會在流實體上觸發一個emit事件。

向一個writable流中寫東西

如果你需要向一個writable流中寫東西,只需要調用.write(data)即可。

    process.stdout.write('beep boop\n');

為了告訴一個writable流你已經寫完畢了,只需要調用.end()方法。你也可以使用.end(data)在結束前再寫一些數據。

var fs = require('fs');
var ws = fs.createWriteStream('message.txt');

ws.write('beep ');

setTimeout(function () {
    ws.end('boop\n');
},1000);

運行結果如下所示:

$ node writing.js 
$ cat message.txt
beep boop

如果你在創建writable流時指定了highWaterMark參數,那麼當沒有更多數據寫入時,調用.write()方法將會返回false。如果你想要等待緩存情況,可以監聽drain事件。

Duplex

Duplex流是一個可讀也可寫的流,就好像一個電話,可以接收也可以發送語音。一個rpc交換是一個duplex流的最好的例子。如果你看到過下麵這樣的代碼:

a.pipe(b).pipe(a)

那麼你需要處理的就是一個duplex流對象。

實現一個Duplex

var Duplex = require('stream').Duplex
var duplex = Duplex()
// 可讀端底層讀取邏輯
duplex._read = function () {
    this._readNum = this._readNum || 0
    if (this._readNum > 1) {
        this.push(null)
    } else {
        this.push('' + (this._readNum++))
    }
}
// 可寫端底層寫邏輯
duplex._write = function (buf, enc, next) {
    // a, b
    process.stdout.write('_write ' + buf.toString() + '\n')
    next()
}
// 0, 1
duplex.on('data', data => console.log('ondata', data.toString()))
duplex.write('a')
duplex.write('b')
duplex.end()

上面的代碼中實現了_read方法,所以可以監聽data事件來消耗Duplex產生的數據。 同時,又實現了_write方法,可作為下游去消耗數據。
因為它既可讀又可寫,所以稱它有兩端:可寫端和可讀端。 可寫端的介面與Writable一致,作為下游來使用;可讀端的介面與Readable一致,作為上游來使用。

Transform

Transform stream是Duplex stream的特例,也就是說,Transform stream也同時可讀可寫。跟Duplex stream的區別點在於,Transform stream的輸出與輸入是存在相關性的。

const Transform = require('stream').Transform
class Rotate extends Transform {
    constructor(n) {
        super()
        // 將字母旋轉`n`個位置
        this.offset = (n || 13) % 26
    }
    // 將可寫端寫入的數據變換後添加到可讀端
    _transform (buf, enc, next) {
        var res = buf.toString().split('').map(c => {
            var code = c.charCodeAt(0)
            if (c >= 'a' && c <= 'z') {
                code += this.offset
                if (code > 'z'.charCodeAt(0)) {
                    code -= 26
                }
            } else if (c >= 'A' && c <= 'Z') {
                code += this.offset
                if (code > 'Z'.charCodeAt(0)) {
                    code -= 26
                }
            }
            return String.fromCharCode(code)
        }).join('')
        // 調用push方法將變換後的數據添加到可讀端
        this.push(res)
        // 調用next方法準備處理下一個
        next()
    }
}
var transform = new Rotate(3)
transform.on('data', data => process.stdout.write(data))
transform.write('hello, ')
transform.write('world!')
transform.end()

執行結果如下:

$ node 1.js
khoor, zruog!

Tranform繼承自Duplex,並已經實現了_read和_write方法,同時要求用戶實現一個_transform方法。

相關鏈接

https://nodejs.org/api/stream.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.Redis單進程: 單進程模型來處理客戶端的請求。對讀寫等事件的響應是通過對epoll函數的包裝來做到的。Redis的實際處理速度完全依靠主進程的執行效率。epoll是Linux內核為處理大批量文件描述符而作了改進的epoll,是Linux下多路復用IO介面select/poll的增強版本,它能 ...
  • [TOC] 1.while迴圈 死迴圈 打斷死迴圈: 關鍵字: 2.字元串格式化: 3.運算符 4.編碼 四種(重要) 單位轉換 ...
  • 一. 安全性問題 線程安全的本質是正確性,而正確性的含義是程式按照預期執行 理論上線程安全的程式,應該要避免出現可見性問題(CPU緩存)、原子性問題(線程切換)和有序性問題(編譯優化) 需要分析是否存線上程安全問題的場景:存在共用數據且數據會發生變化,即有多個線程會同時讀寫同一個數據 針對該理論的解 ...
  • this 註意 public class ThisDemo { public static void main(String[] args) { } } class Person{ public String name; public int age; public boolean gender; ...
  • 1.TP框架基礎 1.1目錄結構 1.2配置文件 1.框架主配置文件(慣例配置文件) thinkphp/convention.php 2. 應用公共配置文件 application/config.php, application/database.php 對整個應用生效 3.模塊配置文件 appli ...
  • 一、數字 整數 Python可以處理任意大小的整數,當然包括負整數,在程式中的表示方法和數學上的寫法一模一樣,例如:1,100,-8080,0,等等。 電腦由於使用二進位,所以,有時候用十六進位表示整數比較方便,十六進位用0x首碼和0-9,a-f表示,例如:0xff00,0xa5b4c3d2,等等 ...
  • 7.6 多態性 1 什麼是多態性 多態指的是同一種事物多種形態,在程式中用繼承可以表現出多態。多態性:可以在不用考慮對象具體類型的前提下而直接使用對象下的方法 2、為什要用多態 用基類創建一套統一的規則,強制子類去遵循(使用抽象類實現),可以在不考慮對象具體的類的情況下直接參考基類的標準使用對象 7 ...
  • 一、二叉樹回憶 上一篇我們對數據結構中常用的樹做了介紹,本篇博客主要以二叉樹為例,講解一下樹的數據結構和代碼實現。回顧二叉樹:二叉樹是每個節點最多有兩個子樹的樹結構。通常子樹被稱作“左子樹”(left subtree)和“右子樹”(right subtree) 二、二叉樹比鏈表好在哪裡? 看看如下的 ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...