scalaz-stream庫的主要設計目標是實現函數式的I/O編程(functional I/O)。這樣用戶就能使用功能單一的基礎I/O函數組合成為功能完整的I/O程式。還有一個目標就是保證資源的安全使用(resource safety):使用scalaz-stream編寫的I/O程式能確保資源的安 ...
scalaz-stream庫的主要設計目標是實現函數式的I/O編程(functional I/O)。這樣用戶就能使用功能單一的基礎I/O函數組合成為功能完整的I/O程式。還有一個目標就是保證資源的安全使用(resource safety):使用scalaz-stream編寫的I/O程式能確保資源的安全使用,特別是在完成一項I/O任務後自動釋放所有占用的資源包括file handle、memory等等。我們在上一篇的討論里籠統地解釋了一下scalaz-stream核心類型Process的基本情況,不過大部分時間都用在了介紹Process1這個通道類型。在這篇討論里我們會從實際應用的角度來介紹整個scalaz-stream鏈條的設計原理及應用目的。我們提到過Process具有Emit/Await/Halt三個狀態,而Append是一個鏈接stream節點的重要類型。先看看這幾個類型在scalaz-stream里的定義:
case class Emit[+O](seq: Seq[O]) extends HaltEmitOrAwait[Nothing, O] with EmitOrAwait[Nothing, O]
case class Await[+F[_], A, +O](
req: F[A]
, rcv: (EarlyCause \/ A) => Trampoline[Process[F, O]] @uncheckedVariance
, preempt : A => Trampoline[Process[F,Nothing]] @uncheckedVariance = (_:A) => Trampoline.delay(halt:Process[F,Nothing])
) extends HaltEmitOrAwait[F, O] with EmitOrAwait[F, O]
case class Halt(cause: Cause) extends HaltEmitOrAwait[Nothing, Nothing] with HaltOrStep[Nothing, Nothing]
case class Append[+F[_], +O](
head: HaltEmitOrAwait[F, O]
, stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance
) extends Process[F, O]
我們看到Process[F,O]被包嵌在Trampoline類型里,所以Process是通過Trampoline來實現函數結構化的,可以有效解決大量stream運算堆棧溢出問題(StackOverflowError)。撇開Trampoline等複雜的語法,以上類型可以簡化成以下理論結構:
1 rait Process[+F[_],+O]
2 case object Cause
3
4 case class Emit[O](out: O) extends Process[Nothing, O]
5
6 case class Halt(cause: Cause) extends Process[Nothing,Nothing]
7
8 case class Await[+F[_],E,+O](
9 req: F[E],
10 rcv: E => Process[F,O],
11 preempt: E => Process[F,Nothing] = Halt) extends Process[F,O]
12
13 case class Append[+F[_],+O](
14 head: Process[F,O],
15 stack: Vector[Cause => Process[F,O]]) extends Process[F,O]
我們來說明一下:
Process[F[_],O]:從它的類型可以推斷出scalaz-stream可以在輸出O類型元素的過程中進行可能含副作用的F類型運算。
Emit[O](out: O):發送一個O類型元素;不可能進行任何附加運算
Halt(cause: Cause):停止發送;cause是停止的原因:End-完成發送,Err-出錯終止,Kill-強行終止
Await[+F[_],E,+O]:這個是運算流的核心Process狀態。先進行F運算req,得出結果E後輸入函數rcv轉換到下一個Process狀態,完成後執行preempt這個事後清理函數。這不就是個flatMap函數結構版嘛。值得註意的是E類型是個內部類型,由F運算產生後輸入rcv後就不再引用了。我們可以在preepmt函數里進行資源釋放。如果我們需要構建一個運算流,看來就只有使用這個Await類型了
Append[+F[_],+O]:Append是一個Process[F,O]鏈接類型。首先它不但擔負了元素O的傳送,更重要的是它還可以把上一節點的F運算傳到下一個節點。這樣才能在下麵節點時運行對上一個節點的事後處置函數(finalizer)。Append可以把多個節點結成一個大節點:head是第一個節點,stack是一串函數,每個函數接受上一個節點完成狀態後運算出下一個節點狀態
一個完整的scalaz-stream由三個類型的節點組成Source(源點)/Transducer(傳換點)/Sink(終點)。節點間通過Await或者Append來鏈接。我們再來看看Source/Transducer/Sink的類型款式:
上游:Source >>> Process0[O] >>> Process[F[_],O]
中游:Transduce >>> Process1[I,O]
下游:Sink/Channel >>> Process[F[_],O => F[Unit]], Channel >>> Process[F[_],I => F[O]]
我們可以用一個文件處理流程來描述完整scalaz-stream鏈條的作用:
Process[F[_],O],用F[O]方式讀取文件中的O值,這時F是有副作用的
>>> Process[I,O],I代表從文件中讀取的原始數據,O代表經過篩選、處理產生的輸出數據
>>> O => F[Unit]是一個不返回結果的函數,代表對輸入的O類型數據進行F運算,如把O類型數據存寫入一個文件
/>> I => F[O]是個返回結果的函數,對輸入I進行F運算後返回O,如把一條記錄寫入資料庫後返回寫入狀態
以上流程簡單描述:從文件中讀出數據->加工處理讀出數據->寫入另一個文件。雖然從描述上看起來很簡單,但我們的目的是資源安全使用:無論在任何終止情況下:正常讀寫、中途強行停止、出錯終止,scalaz-stream都會主動關閉開啟的文件、停止使用的線程、釋放占用的記憶體等其它資源。這樣看來到不是那麼簡單了。我們先試著分析Source/Transducer/Sink這幾種類型的作用:
1 import Process._
2 emit(0) //> res0: scalaz.stream.Process0[Int] = Emit(Vector(0))
3 emitAll(Seq(1,2,3)) //> res1: scalaz.stream.Process0[Int] = Emit(List(1, 2, 3))
4 Process(1,2,3) //> res2: scalaz.stream.Process0[Int] = Emit(WrappedArray(1, 2, 3))
5 Process() //> res3: scalaz.stream.Process0[Nothing] = Emit(List())
以上都是Process0的構建方式,也算是數據源。但它們只是代表了記憶體中的一串值,對我們來說沒什麼意義,因為我們希望從外設獲取這些值,比如從文件或者資料庫里讀取數據,也就是說需要F運算效果。Process0[O] >>> Process[Nothing,O],而我們需要的是Process[F,O]。那麼我們這樣寫如何呢?
1 val p: Process[Task,Int] = emitAll(Seq(1,2,3))
2 //> p : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3))
3
4 emitAll(Seq(1,2,3)).toSource
5 //> res4: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3))
6
類型倒是匹配了,但表達式Emit(...)里沒有任何Task的影子,這個無法滿足我們對Source的需要。看來只有以下這種方式了:
1 await(Task.delay{3})(emit)
2 //> res5: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@57855c9a,<function1>,<function1>)
3 eval(Task.delay{3})
4 //> res6: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@63e2203c,<function1>,<function1>)
現在不但類型匹配,而且表達式里還包含了Task運算。我們通過Task.delay可以進行文件讀取等帶有副作用的運算,這是因為Await將會運行req:F[E] >>> Task[Int]。這正是我們需要的Source。那我們能不能用這個Source來發出一串數據呢?
1 def emitSeq[A](xa: Seq[A]):Process[Task,A] =
2 xa match {
3 case h :: t => await(Task.delay {h})(emit) ++ emitSeq(t)
4 case Nil => halt
5 } //> emitSeq: [A](xa: Seq[A])scalaz.stream.Process[scalaz.concurrent.Task,A]
6 val es1 = emitSeq(Seq(1,2,3)) //> es1 : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Await(scalaz.concurrent.Task@2d6eabae,<function1>,<function1>),Vector(<function1>))
7 val es2 = emitSeq(Seq("a","b","c")) //> es2 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Await(
8 scalaz.concurrent.Task@4e7dc304,<function1>,<function1>),Vector(<function1>))
9 es1.runLog.run //> res7: Vector[Int] = Vector(1, 2, 3)
10 es2.runLog.run //> res8: Vector[String] = Vector(a, b, c)
以上示範中我們用await運算了Task,然後返回了Process[Task,?],一個可能帶副作用運算的Source。實際上我們在很多情況下都需要從外部的源頭用Task來獲取一些數據,通常這些數據源都對數據獲取進行了非同步(asynchronous)運算處理,然後通過callback方式來提供這些數據。我們可以用Task.async函數來把這些callback函數轉變成Task,下一步我們只需要用Process.eval或者await就可以把這個Task升格成Process[Task,?]。我們先看個簡單的例子:假如我們用scala.concurrent.Future來進行非同步數據讀取,可以這樣把Future轉換成Process:
1 def getData(dbName: String): Task[String] = Task.async { cb =>
2 import scala.concurrent._
3 import scala.concurrent.ExecutionContext.Implicits.global
4 import scala.util.{Success,Failure}
5 Future { s"got data from $dbName" }.onComplete {
6 case Success(a) => cb(a.right)
7 case Failure(e) => cb(e.left)
8 }
9 } //> getData: (dbName: String)scalaz.concurrent.Task[String]
10 val procGetData = eval(getData("MySQL")) //> procGetData : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@dd3b207,<function1>,<function1>)
11 procGetData.runLog.run //> res9: Vector[String] = Vector(got data from MySQL)
我們也可以把java的callback轉變成Task:
1 import com.ning.http.client._
2 val asyncHttpClient = new AsyncHttpClient() //> asyncHttpClient : com.ning.http.client.AsyncHttpClient = com.ning.http.client.AsyncHttpClient@245b4bdc
3 def get(s: String): Task[Response] = Task.async[Response] { callback =>
4 asyncHttpClient.prepareGet(s).execute(
5 new AsyncCompletionHandler[Unit] {
6 def onCompleted(r: Response): Unit = callback(r.right)
7
8 def onError(e: Throwable): Unit = callback(e.left)
9 }
10 )
11 } //> get: (s: String)scalaz.concurrent.Task[com.ning.http.client.Response]
12 val prcGet = Process.eval(get("http://sina.com"))
13 //> prcGet : scalaz.stream.Process[scalaz.concurrent.Task,com.ning.http.client.Response] = Await(scalaz.concurrent.Task@222545dc,<function1>,<function1>)
14 prcGet.run.run //> 12:25:27.852 [New I/O worker #1] DEBUG c.n.h.c.p.n.r.NettyConnectListener -Request using non cached Channel '[id: 0x23fa1307, /192.168.200.3:50569 =>sina.com/66.102.251.33:80]':
如果直接按照scalaz Task callback的類型款式 def async(callback:(Throwable \/ Unit) => Unit):
1 def read(callback: (Throwable \/ Array[Byte]) => Unit): Unit = ???
2 //> read: (callback: scalaz.\/[Throwable,Array[Byte]] => Unit)Unit
3 val t: Task[Array[Byte]] = Task.async(read) //> t : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@1a677343
4 val t2: Task[Array[Byte]] = for {
5 bytes <- t
6 moarBytes <- t
7 } yield (bytes ++ moarBytes) //> t2 : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@15de0b3c
8 val prct2 = Process.eval(t2) //> prct2 : scalaz.stream.Process[scalaz.concurrent.Task,Array[Byte]] = Await(scalaz.concurrent.Task@15de0b3c,<function1>,<function1>)
9
10 def asyncRead(succ: Array[Byte] => Unit, fail: Throwable => Unit): Unit = ???
11 //> asyncRead: (succ: Array[Byte] => Unit, fail: Throwable => Unit)Unit
12 val t3: Task[Array[Byte]] = Task.async { callback =>
13 asyncRead(b => callback(b.right), err => callback(err.left))
14 } //> t3 : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@489115ef
15 val t4: Task[Array[Byte]] = t3.flatMap(b => Task(b))
16 //> t4 : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@3857f613
17 val prct4 = Process.eval(t4) //> prct4 : scalaz.stream.Process[scalaz.concurrent.Task,Array[Byte]] = Await(scalaz.concurrent.Task@3857f613,<function1>,<function1>)
我們也可以用timer來產生Process[Task,A]:
1 import scala.concurrent.duration._
2 implicit val scheduler = java.util.concurrent.Executors.newScheduledThreadPool(3)
3 //> scheduler : java.util.concurrent.ScheduledExecutorService = java.util.concurrent.ScheduledThreadPoolExecutor@516be40f[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
4 val fizz = time.awakeEvery(3.seconds).map(_ => "fizz")
5 //> fizz : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@5762806e,<function1>,<function1>)
6 val fizz3 = fizz.take(3) //> fizz3 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Halt(End),Vector(<function1>))
7 fizz3.runLog.run //> res9: Vector[String] = Vector(fizz, fizz, fizz)
Queue、Top和Signal都可以作為帶副作用數據源的構建器。我們先看看Queue是如何產生數據源的:
1 type BigStringResult = String
2 val qJobResult = async.unboundedQueue[BigStringResult]
3 //> qJobResult : scalaz.stream.async.mutable.Queue[demo.ws.blogStream.BigStringResult] = scalaz.stream.async.mutable.Queue$$anon$1@25d250c6
4 def longGet(jobnum: Int): BigStringResult = {
5 Thread.sleep(2000)
6 s"Some large data sets from job#${jobnum}"
7 } //> longGet: (jobnum: Int)demo.ws.blogStream.BigStringResult
8
9 // multi-tasking
10 val start = System.currentTimeMillis() //> start : Long = 1468556250797
11 Task.fork(qJobResult.enqueueOne(longGet(1))).unsafePerformAsync{case _ => ()}
12 Task.fork(qJobResult.enqueueOne(longGet(2))).unsafePerformAsync{case _ => ()}
13 Task.fork(qJobResult.enqueueOne(longGet(3))).unsafePerformAsync{case _ => ()}
14 val timemill = System.currentTimeMillis() - start
15 //> timemill : Long = 17
16 Thread.sleep(3000)
17 qJobResult.close.run
18 val bigData = {
19 //multi-tasking
20 val j1 = qJobResult.dequeue
21 val j2 = qJobResult.dequeue
22 val j3 = qJobResult.dequeue
23 for {
24 r1 <- j1
25 r2 <- j2
26 r3 <- j3
27 } yield r1 + ","+ r2 + "," + r3
28 } //> bigData : scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String] = Await(scalaz.concurrent.Task@778d1062,<function1>,<function1>)
29
30 bigData.runLog.run //> res9: Vector[String] = Vector(Some large data sets from job#2,Some large data sets from job#3,Some large data sets from job#1)
再看看Topic示範:
1 import scala.concurrent._
2 import scala.concurrent.duration._
3 import scalaz.stream.async.mutable._
4 import scala.concurrent.ExecutionContext.Implicits.global
5 val sharedData: Topic[BigStringResult] = async.topic()
6 //> sharedData : scalaz.stream.async.mutable.Topic[demo.ws.blogStream.BigStringResult] = scalaz.stream.async.package$$anon$1@797badd3
7 val subscriber = sharedData.subscribe.runLog //> subscriber : scalaz.concurrent.Task[Vector[demo.ws.blogStream.BigStringResult]] = scalaz.concurrent.Task@226a82c4
8 val otherThread = future {
9 subscriber.run // Added this here - now subscriber is really attached to the topic
10 } //> otherThread : scala.concurrent.Future[Vector[demo.ws.blogStream.BigStringResult]] = List()
11 // Need to give subscriber some time to start up.
12 // I doubt you'd do this in actual code.
13
14 // topics seem more useful for hooking up things like
15 // sensors that produce a continual stream of data,
16
17 // and where individual values can be dropped on
18 // floor.
19 Thread.sleep(100)
20
21 sharedData.publishOne(longGet(1)).run // don't just call publishOne; need to run the resulting task
22 sharedData.close.run // Don't just call close; need to run the resulting task
23
24 // Need to wait for the output
25 val result = Await.result(otherThread, Duration.Inf)
26 //> result : Vector[demo.ws.blogStream.BigStringResult] = Vector(Some large data sets from job#1)
以上對可能帶有副作用的Source的各種產生方法提供瞭解釋和示範。scalaz-stream的其他類型節點將在下麵的討論中深入介紹。