Scalaz(46)- scalaz-stream 基礎介紹

来源:http://www.cnblogs.com/tiger-xc/archive/2016/07/07/5651014.html
-Advertisement-
Play Games

scalaz-stream是一個泛函數據流配件庫(functional stream combinator library),特別適用於函數式編程。scalar-stream是由一個以上各種狀態的Process串聯組成。stream代表一連串的元素,可能是自動產生或者由外部的源頭輸入,如:一連串滑鼠 ...


    scalaz-stream是一個泛函數據流配件庫(functional stream combinator library),特別適用於函數式編程。scalar-stream是由一個以上各種狀態的Process串聯組成。stream代表一連串的元素,可能是自動產生或者由外部的源頭輸入,如:一連串滑鼠位置;文件中的文字行;資料庫記錄;又或者一連串的HTTP請求等。Process就是stream轉換器(transducer),它可以把一種stream轉換成另一種stream。Process的類型款式如下:

sealed trait Process[+F[_], +O]

其中F是個高階類,是一種演算法,O是Process的運算值。從類型款式上看Process是個對O類型值進行F運算的節點,那麼scalaz-stream就應該是個運算流了。Process包含以下幾種狀態:

case class Emit[+O](seq: Seq[O]) extends HaltEmitOrAwait[Nothing, O] with EmitOrAwait[Nothing, O]

case class Await[+F[_], A, +O](
    req: F[A]
    , rcv: (EarlyCause \/ A) => Trampoline[Process[F, O]] @uncheckedVariance
    , preempt : A => Trampoline[Process[F,Nothing]] @uncheckedVariance = (_:A) => Trampoline.delay(halt:Process[F,Nothing])
    ) extends HaltEmitOrAwait[F, O] with EmitOrAwait[F, O] {
...
}
case class Halt(cause: Cause) extends HaltEmitOrAwait[Nothing, Nothing] with HaltOrStep[Nothing, Nothing]

case class Append[+F[_], +O](
    head: HaltEmitOrAwait[F, O]
    , stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance
    ) extends Process[F, O] {
...
}   

scalaz-stream是個主動讀取模式的流(pull model stream),Process轉換stream的方式不是以Stream[I] => Stream[O]這種函數方式,而是一種狀態轉換方式進行(state transition),所以這些狀態就等於向一個驅動程式發出的請求:

Emit[+O]:請求發一個O值

Await[+F[_],A,+O]:要求運算F[A],得出F[A]的結果A後輸入函數rcv再運算得出下一個Process狀態。這個是flatMap函數的結構化版本

Halt:停止發送

Append:連接前後兩個Process

可以看到Emit,Await,Halt,Append都是Process類型的結構化狀態。其中Await就是flatMap函數的結構化,Emit就像Return,所以Process就是一個Free Monad。

Emit的作用是發出一個O值,Await的作用是運算F然後連接下一個Process, Append的作用則是把前一個Process的信息傳遞到下一個Process。Await和Append分別是不同方式的Process連接方式。

Process又分以下幾類:

  type Process0[+O] = Process[Nothing,O]

  /**
   * A single input stream transducer. Accepts input of type `I`,
   * and emits values of type `O`.
   */
  type Process1[-I,+O] = Process[Env[I,Any]#Is, O]

  /**
   * A stream transducer that can read from one of two inputs,
   * the 'left' (of type `I`) or the 'right' (of type `I2`).
   * `Process1[I,O] <: Tee[I,I2,O]`.
   */
  type Tee[-I,-I2,+O] = Process[Env[I,I2]#T, O]

  /**
   * A stream transducer that can read from one of two inputs,
   * non-deterministically.
   */
  type Wye[-I,-I2,+O] = Process[Env[I,I2]#Y, O]

  /**
   * An effectful sink, to which we can send values. Modeled
   * as a source of effectful functions.
   */
  type Sink[+F[_],-O] = Process[F, O => F[Unit]]

  /**
   * An effectful channel, to which we can send values and
   * get back responses. Modeled as a source of effectful
   * functions.
   */
  type Channel[+F[_],-I,O] = Process[F, I => F[O]]

Process[F[_],O]:source:運算流源點,由此發送F[O]運算

Process0[+O]:>>>Process[Nothing,+O]:source:純數據流源點,發送O類型元素

Process1[-I,+O]:一對一的數據轉換節點:接收一個I類型輸入,經過處理轉換成O類型數據輸出

Tee[-I1,-I2,+O]:二對一的有序輸入數據轉換節點:從左右兩邊一左一右有順接受I1,I2類型輸入後轉換成O類型數據輸出

Wye[-I1,-I2,+O]:二對一的無序輸入數據轉換節點:不按左右順序,按上游數據發送情況接受I1,I2類型輸入後轉換成O類型數據輸出

Sink[+F[_],-O]:運算終點,在此對O類型數據進行F運算,不返回值:O => F[Unit]

Channel[+F[_],-I,O]:運算終點,接受I類型輸入,進行F運算後返回F[O]:I => F[O]

以下是一些簡單的Process構建方法:

1  Process.emit(1)                                  //> res0: scalaz.stream.Process0[Int] = Emit(Vector(1))
2  Process.emitAll(Seq(1,2,3))                      //> res1: scalaz.stream.Process0[Int] = Emit(List(1, 2, 3))
3  Process.halt                                     //> res2: scalaz.stream.Process0[Nothing] = Halt(End)
4  Process.range(1,2,3)           //> res3: scalaz.stream.Process0[Int] = Append(Halt(End),Vector(<function1>))

這些是純數據流的構建方法。scalaz-stream通常把Task作為F運算,下麵是Task運算流的構建或者轉換方法:

1 val p: Process[Task,Int] = Process.emitAll(Seq(1,2,3))  //> p  : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Halt(End),Vector(<function1>))
2  Process.range(1,2,3).toSource                   //> res4: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Halt(End),Vector(<function1>))
3  //把F[A]升格成Process[F,A]
4 Process.eval(Task.delay {5 * 8})                 //> res5: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@56aac163,<function1>,<function1>)

對stream的Process進行運算有下麵幾種run方法:

/**
   * Collect the outputs of this `Process[F,O]` into a Monoid `B`, given a `Monad[F]` in
   * which we can catch exceptions. This function is not tail recursive and
   * relies on the `Monad[F]` to ensure stack safety.
   */
final def runFoldMap[F2[x] >: F[x], B](f: O => B)(implicit F: Monad[F2], C: Catchable[F2], B: Monoid[B]): F2[B] = {
...}
/**
   * Collect the outputs of this `Process[F,O]`, given a `Monad[F]` in
   * which we can catch exceptions. This function is not tail recursive and
   * relies on the `Monad[F]` to ensure stack safety.
   */
final def runLog[F2[x] >: F[x], O2 >: O](implicit F: Monad[F2], C: Catchable[F2]): F2[Vector[O2]] = {
...}
/** Run this `Process` solely for its final emitted value, if one exists. */
final def runLast[F2[x] >: F[x], O2 >: O](implicit F: Monad[F2], C: Catchable[F2]): F2[Option[O2]] = {
...}
/** Run this `Process` solely for its final emitted value, if one exists, using `o2` otherwise. */
final def runLastOr[F2[x] >: F[x], O2 >: O](o2: => O2)(implicit F: Monad[F2], C: Catchable[F2]): F2[O2] =
    runLast[F2, O2] map { _ getOrElse o2 }
/** Run this `Process`, purely for its effects. */
final def run[F2[x] >: F[x]](implicit F: Monad[F2], C: Catchable[F2]): F2[Unit] =
    F.void(drain.runLog(F, C))

這幾個函數都返回F2運算,如果F2是Task的話那麼我們就可以用Task.run來獲取結果值: 

 1  //runFoldMap就好比Monoid的sum
 2  p.runFoldMap(identity).run                       //> res6: Int = 6
 3  p.runFoldMap(i => i * 2).run                     //> res7: Int = 12
 4  p.runFoldMap(_.toString).run                     //> res8: String = 123
 5  //runLog把收到的元素放入vector中
 6  p.runLog.run                                     //> res9: Vector[Int] = Vector(1, 2, 3)
 7  //runLast取最後一個元素,返回Option
 8  p.runLast.run                                    //> res10: Option[Int] = Some(3)
 9  Process.halt.toSource.runLast.run                //> res11: Option[Nothing] = None
10  Process.halt.toSource.runLastOr(65).run          //> res12: Int = 65
11  //run只進行F的運算,放棄所有元素
12  p.run      //> res13: scalaz.concurrent.Task[Unit] = scalaz.concurrent.Task@26b3fd41
13  p.run.run  //Task[Unit] 返回Unit
14  Process.emit(print("haha")).toSource.run.run     //> haha

與List和Stream操作相似,我們同樣可以對scalar-stream Process施用同樣的操作函數,也就是一些stream轉換函數:

1  p.take(2).runLog.run                             //> res14: Vector[Int] = Vector(1, 2)
2  p.filter {_ > 2}.runLog.run                      //> res15: Vector[Int] = Vector(3)
3  p.last.runLog.run                                //> res16: Vector[Int] = Vector(3)
4  p.drop(1).runLog.run                             //> res17: Vector[Int] = Vector(2, 3)
5  p.exists{_ > 5}.runLog.run                       //> res18: Vector[Boolean] = Vector(false)

以上這些函數與scala標準庫的stream很相似。再看看map,flatMap吧:

1  p.map{i => s"Int:$i"}.runLog.run                 //> res19: Vector[String] = Vector(Int:1, Int:2, Int:3)
2  p.flatMap{i => Process(i,i-1)}.runLog.run        //> res20: Vector[Int] = Vector(1, 0, 2, 1, 3, 2)

仔細檢查可以看出來上面的這些轉換操作都是針對Process1類型的,都是元素在流通過程中得到轉換。我們會在下篇討論中介紹一些更複雜的Process操作,如:Sink,Tee,Wyn...,然後是scalaz-stream的具體應用

 

 

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • jQuery訪問json文件 ajax訪問json文件 ...
  • 這兩天面試了一兩個公司,由於簡歷中的最近一個項目用到了JMS,然而面試官似乎對這個很感興趣,所以都被問到了,但可惜的是,我除了說我們使用了JMS外,面對他們提出的一些關於JMS的問題,我回答得相當差,直接結果就是面試失敗。同時我也深深的覺得自己對於技術的掌握是多麼的浮淺,本著從哪裡跌倒就從哪裡爬起來 ...
  • 一.流的分類 Java的流類大部分都是由InputStream、OutputStream、Reader和Writer這四個抽象類派生出來的 (1)按數據流向 輸入流(InputStream類和Reader類的子類) 輸出流(OutputStream類和Writer類的子類) (2)按數據類型 位元組流 ...
  • 向資料庫發送多條sql語句 create database batch use batch create table batch_table( id int primary key auto_increment, name varchar(20) ) insert into batch_table ...
  • N!!!java中無參無返回值方法的使用 1,定義方法 eg: public void show(){ System.out.println("HelloWorld!") } 方法要在一對大括弧中實現特定的操作 命名規範,第一個單詞字母小寫,其他單詞首字母大寫 調用方法,先創建對象,然後通過 對象名 ...
  • 上述代碼編譯運行皆沒有問題,但是用valgrind檢測會提示錯誤: Why? 此代碼可以實現功能要求,但是健壯性並不好,假設在map.erase之後再次使用map當前的iterator,即 代碼運行就會出現錯誤,因為it目前指向的對象已經被刪掉了。 為了避免程式出現這樣的錯誤,我們應該保證在iter ...
  • 徒手使用python和go語言搭建最簡單的web頁面-使用模板,無持久化 也許我們會接觸到很多語言的web應用,譬如php,java,包括今天介紹的python和go,實際上我們在使用這些語言構建web應用的時候,很多時候變成了單純的調用包和api,而忽略底層的原理。不過呢,所有的web應用,模型都 ...
  • 異常是什麼,它是怎麼發生的?Java系統是怎麼處理的,和正常return有什麼區別?應用程式能如何處理?Throwable是什麼,它有哪些屬性和方法?Java都有哪些異常類,有什麼區別,為什麼要那麼多類?... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...