scalaz-stream是一個泛函數據流配件庫(functional stream combinator library),特別適用於函數式編程。scalar-stream是由一個以上各種狀態的Process串聯組成。stream代表一連串的元素,可能是自動產生或者由外部的源頭輸入,如:一連串滑鼠 ...
scalaz-stream是一個泛函數據流配件庫(functional stream combinator library),特別適用於函數式編程。scalar-stream是由一個以上各種狀態的Process串聯組成。stream代表一連串的元素,可能是自動產生或者由外部的源頭輸入,如:一連串滑鼠位置;文件中的文字行;資料庫記錄;又或者一連串的HTTP請求等。Process就是stream轉換器(transducer),它可以把一種stream轉換成另一種stream。Process的類型款式如下:
sealed trait Process[+F[_], +O]
其中F是個高階類,是一種演算法,O是Process的運算值。從類型款式上看Process是個對O類型值進行F運算的節點,那麼scalaz-stream就應該是個運算流了。Process包含以下幾種狀態:
case class Emit[+O](seq: Seq[O]) extends HaltEmitOrAwait[Nothing, O] with EmitOrAwait[Nothing, O] case class Await[+F[_], A, +O]( req: F[A] , rcv: (EarlyCause \/ A) => Trampoline[Process[F, O]] @uncheckedVariance , preempt : A => Trampoline[Process[F,Nothing]] @uncheckedVariance = (_:A) => Trampoline.delay(halt:Process[F,Nothing]) ) extends HaltEmitOrAwait[F, O] with EmitOrAwait[F, O] { ... } case class Halt(cause: Cause) extends HaltEmitOrAwait[Nothing, Nothing] with HaltOrStep[Nothing, Nothing] case class Append[+F[_], +O]( head: HaltEmitOrAwait[F, O] , stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance ) extends Process[F, O] { ... }
scalaz-stream是個主動讀取模式的流(pull model stream),Process轉換stream的方式不是以Stream[I] => Stream[O]這種函數方式,而是一種狀態轉換方式進行(state transition),所以這些狀態就等於向一個驅動程式發出的請求:
Emit[+O]:請求發一個O值
Await[+F[_],A,+O]:要求運算F[A],得出F[A]的結果A後輸入函數rcv再運算得出下一個Process狀態。這個是flatMap函數的結構化版本
Halt:停止發送
Append:連接前後兩個Process
可以看到Emit,Await,Halt,Append都是Process類型的結構化狀態。其中Await就是flatMap函數的結構化,Emit就像Return,所以Process就是一個Free Monad。
Emit的作用是發出一個O值,Await的作用是運算F然後連接下一個Process, Append的作用則是把前一個Process的信息傳遞到下一個Process。Await和Append分別是不同方式的Process連接方式。
Process又分以下幾類:
type Process0[+O] = Process[Nothing,O]
/**
* A single input stream transducer. Accepts input of type `I`,
* and emits values of type `O`.
*/
type Process1[-I,+O] = Process[Env[I,Any]#Is, O]
/**
* A stream transducer that can read from one of two inputs,
* the 'left' (of type `I`) or the 'right' (of type `I2`).
* `Process1[I,O] <: Tee[I,I2,O]`.
*/
type Tee[-I,-I2,+O] = Process[Env[I,I2]#T, O]
/**
* A stream transducer that can read from one of two inputs,
* non-deterministically.
*/
type Wye[-I,-I2,+O] = Process[Env[I,I2]#Y, O]
/**
* An effectful sink, to which we can send values. Modeled
* as a source of effectful functions.
*/
type Sink[+F[_],-O] = Process[F, O => F[Unit]]
/**
* An effectful channel, to which we can send values and
* get back responses. Modeled as a source of effectful
* functions.
*/
type Channel[+F[_],-I,O] = Process[F, I => F[O]]
Process[F[_],O]:source:運算流源點,由此發送F[O]運算
Process0[+O]:>>>Process[Nothing,+O]:source:純數據流源點,發送O類型元素
Process1[-I,+O]:一對一的數據轉換節點:接收一個I類型輸入,經過處理轉換成O類型數據輸出
Tee[-I1,-I2,+O]:二對一的有序輸入數據轉換節點:從左右兩邊一左一右有順接受I1,I2類型輸入後轉換成O類型數據輸出
Wye[-I1,-I2,+O]:二對一的無序輸入數據轉換節點:不按左右順序,按上游數據發送情況接受I1,I2類型輸入後轉換成O類型數據輸出
Sink[+F[_],-O]:運算終點,在此對O類型數據進行F運算,不返回值:O => F[Unit]
Channel[+F[_],-I,O]:運算終點,接受I類型輸入,進行F運算後返回F[O]:I => F[O]
以下是一些簡單的Process構建方法:
1 Process.emit(1) //> res0: scalaz.stream.Process0[Int] = Emit(Vector(1))
2 Process.emitAll(Seq(1,2,3)) //> res1: scalaz.stream.Process0[Int] = Emit(List(1, 2, 3))
3 Process.halt //> res2: scalaz.stream.Process0[Nothing] = Halt(End)
4 Process.range(1,2,3) //> res3: scalaz.stream.Process0[Int] = Append(Halt(End),Vector(<function1>))
這些是純數據流的構建方法。scalaz-stream通常把Task作為F運算,下麵是Task運算流的構建或者轉換方法:
1 val p: Process[Task,Int] = Process.emitAll(Seq(1,2,3)) //> p : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Halt(End),Vector(<function1>))
2 Process.range(1,2,3).toSource //> res4: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Halt(End),Vector(<function1>))
3 //把F[A]升格成Process[F,A]
4 Process.eval(Task.delay {5 * 8}) //> res5: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@56aac163,<function1>,<function1>)
對stream的Process進行運算有下麵幾種run方法:
/**
* Collect the outputs of this `Process[F,O]` into a Monoid `B`, given a `Monad[F]` in
* which we can catch exceptions. This function is not tail recursive and
* relies on the `Monad[F]` to ensure stack safety.
*/
final def runFoldMap[F2[x] >: F[x], B](f: O => B)(implicit F: Monad[F2], C: Catchable[F2], B: Monoid[B]): F2[B] = {
...}
/**
* Collect the outputs of this `Process[F,O]`, given a `Monad[F]` in
* which we can catch exceptions. This function is not tail recursive and
* relies on the `Monad[F]` to ensure stack safety.
*/
final def runLog[F2[x] >: F[x], O2 >: O](implicit F: Monad[F2], C: Catchable[F2]): F2[Vector[O2]] = {
...}
/** Run this `Process` solely for its final emitted value, if one exists. */
final def runLast[F2[x] >: F[x], O2 >: O](implicit F: Monad[F2], C: Catchable[F2]): F2[Option[O2]] = {
...}
/** Run this `Process` solely for its final emitted value, if one exists, using `o2` otherwise. */
final def runLastOr[F2[x] >: F[x], O2 >: O](o2: => O2)(implicit F: Monad[F2], C: Catchable[F2]): F2[O2] =
runLast[F2, O2] map { _ getOrElse o2 }
/** Run this `Process`, purely for its effects. */
final def run[F2[x] >: F[x]](implicit F: Monad[F2], C: Catchable[F2]): F2[Unit] =
F.void(drain.runLog(F, C))
這幾個函數都返回F2運算,如果F2是Task的話那麼我們就可以用Task.run來獲取結果值:
1 //runFoldMap就好比Monoid的sum
2 p.runFoldMap(identity).run //> res6: Int = 6
3 p.runFoldMap(i => i * 2).run //> res7: Int = 12
4 p.runFoldMap(_.toString).run //> res8: String = 123
5 //runLog把收到的元素放入vector中
6 p.runLog.run //> res9: Vector[Int] = Vector(1, 2, 3)
7 //runLast取最後一個元素,返回Option
8 p.runLast.run //> res10: Option[Int] = Some(3)
9 Process.halt.toSource.runLast.run //> res11: Option[Nothing] = None
10 Process.halt.toSource.runLastOr(65).run //> res12: Int = 65
11 //run只進行F的運算,放棄所有元素
12 p.run //> res13: scalaz.concurrent.Task[Unit] = scalaz.concurrent.Task@26b3fd41
13 p.run.run //Task[Unit] 返回Unit
14 Process.emit(print("haha")).toSource.run.run //> haha
與List和Stream操作相似,我們同樣可以對scalar-stream Process施用同樣的操作函數,也就是一些stream轉換函數:
1 p.take(2).runLog.run //> res14: Vector[Int] = Vector(1, 2)
2 p.filter {_ > 2}.runLog.run //> res15: Vector[Int] = Vector(3)
3 p.last.runLog.run //> res16: Vector[Int] = Vector(3)
4 p.drop(1).runLog.run //> res17: Vector[Int] = Vector(2, 3)
5 p.exists{_ > 5}.runLog.run //> res18: Vector[Boolean] = Vector(false)
以上這些函數與scala標準庫的stream很相似。再看看map,flatMap吧:
1 p.map{i => s"Int:$i"}.runLog.run //> res19: Vector[String] = Vector(Int:1, Int:2, Int:3)
2 p.flatMap{i => Process(i,i-1)}.runLog.run //> res20: Vector[Int] = Vector(1, 0, 2, 1, 3, 2)
仔細檢查可以看出來上面的這些轉換操作都是針對Process1類型的,都是元素在流通過程中得到轉換。我們會在下篇討論中介紹一些更複雜的Process操作,如:Sink,Tee,Wyn...,然後是scalaz-stream的具體應用