Scalaz(47)- scalaz-stream: 深入瞭解-Source

来源:http://www.cnblogs.com/tiger-xc/archive/2016/07/15/5674366.html
-Advertisement-
Play Games

scalaz-stream庫的主要設計目標是實現函數式的I/O編程(functional I/O)。這樣用戶就能使用功能單一的基礎I/O函數組合成為功能完整的I/O程式。還有一個目標就是保證資源的安全使用(resource safety):使用scalaz-stream編寫的I/O程式能確保資源的安 ...


   scalaz-stream庫的主要設計目標是實現函數式的I/O編程(functional I/O)。這樣用戶就能使用功能單一的基礎I/O函數組合成為功能完整的I/O程式。還有一個目標就是保證資源的安全使用(resource safety):使用scalaz-stream編寫的I/O程式能確保資源的安全使用,特別是在完成一項I/O任務後自動釋放所有占用的資源包括file handle、memory等等。我們在上一篇的討論里籠統地解釋了一下scalaz-stream核心類型Process的基本情況,不過大部分時間都用在了介紹Process1這個通道類型。在這篇討論里我們會從實際應用的角度來介紹整個scalaz-stream鏈條的設計原理及應用目的。我們提到過Process具有Emit/Await/Halt三個狀態,而Append是一個鏈接stream節點的重要類型。先看看這幾個類型在scalaz-stream里的定義:

case class Emit[+O](seq: Seq[O]) extends HaltEmitOrAwait[Nothing, O] with EmitOrAwait[Nothing, O]

case class Await[+F[_], A, +O](
    req: F[A]
    , rcv: (EarlyCause \/ A) => Trampoline[Process[F, O]] @uncheckedVariance
    , preempt : A => Trampoline[Process[F,Nothing]] @uncheckedVariance = (_:A) => Trampoline.delay(halt:Process[F,Nothing])
    ) extends HaltEmitOrAwait[F, O] with EmitOrAwait[F, O] 

case class Halt(cause: Cause) extends HaltEmitOrAwait[Nothing, Nothing] with HaltOrStep[Nothing, Nothing]

case class Append[+F[_], +O](
    head: HaltEmitOrAwait[F, O]
    , stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance
    ) extends Process[F, O] 

我們看到Process[F,O]被包嵌在Trampoline類型里,所以Process是通過Trampoline來實現函數結構化的,可以有效解決大量stream運算堆棧溢出問題(StackOverflowError)。撇開Trampoline等複雜的語法,以上類型可以簡化成以下理論結構:

 1 rait Process[+F[_],+O]
 2 case object Cause
 3 
 4 case class Emit[O](out: O) extends Process[Nothing, O] 
 5 
 6 case class Halt(cause: Cause) extends Process[Nothing,Nothing]
 7 
 8 case class Await[+F[_],E,+O](
 9   req: F[E],
10   rcv: E => Process[F,O],
11   preempt: E => Process[F,Nothing] = Halt) extends Process[F,O]
12 
13 case class Append[+F[_],+O](
14   head: Process[F,O],
15   stack: Vector[Cause => Process[F,O]]) extends Process[F,O]  

我們來說明一下:

Process[F[_],O]:從它的類型可以推斷出scalaz-stream可以在輸出O類型元素的過程中進行可能含副作用的F類型運算。

Emit[O](out: O):發送一個O類型元素;不可能進行任何附加運算

Halt(cause: Cause):停止發送;cause是停止的原因:End-完成發送,Err-出錯終止,Kill-強行終止

Await[+F[_],E,+O]:這個是運算流的核心Process狀態。先進行F運算req,得出結果E後輸入函數rcv轉換到下一個Process狀態,完成後執行preempt這個事後清理函數。這不就是個flatMap函數結構版嘛。值得註意的是E類型是個內部類型,由F運算產生後輸入rcv後就不再引用了。我們可以在preepmt函數里進行資源釋放。如果我們需要構建一個運算流,看來就只有使用這個Await類型了

Append[+F[_],+O]:Append是一個Process[F,O]鏈接類型。首先它不但擔負了元素O的傳送,更重要的是它還可以把上一節點的F運算傳到下一個節點。這樣才能在下麵節點時運行對上一個節點的事後處置函數(finalizer)。Append可以把多個節點結成一個大節點:head是第一個節點,stack是一串函數,每個函數接受上一個節點完成狀態後運算出下一個節點狀態

一個完整的scalaz-stream由三個類型的節點組成Source(源點)/Transducer(傳換點)/Sink(終點)。節點間通過Await或者Append來鏈接。我們再來看看Source/Transducer/Sink的類型款式:

上游:Source       >>> Process0[O]   >>> Process[F[_],O]

中游:Transduce    >>> Process1[I,O] 

下游:Sink/Channel >>> Process[F[_],O => F[Unit]], Channel >>> Process[F[_],I => F[O]]

我們可以用一個文件處理流程來描述完整scalaz-stream鏈條的作用:

Process[F[_],O],用F[O]方式讀取文件中的O值,這時F是有副作用的 

>>> Process[I,O],I代表從文件中讀取的原始數據,O代表經過篩選、處理產生的輸出數據

>>> O => F[Unit]是一個不返回結果的函數,代表對輸入的O類型數據進行F運算,如把O類型數據存寫入一個文件

/>> I => F[O]是個返回結果的函數,對輸入I進行F運算後返回O,如把一條記錄寫入資料庫後返回寫入狀態

以上流程簡單描述:從文件中讀出數據->加工處理讀出數據->寫入另一個文件。雖然從描述上看起來很簡單,但我們的目的是資源安全使用:無論在任何終止情況下:正常讀寫、中途強行停止、出錯終止,scalaz-stream都會主動關閉開啟的文件、停止使用的線程、釋放占用的記憶體等其它資源。這樣看來到不是那麼簡單了。我們先試著分析Source/Transducer/Sink這幾種類型的作用:

1 import Process._
2 emit(0)                        //> res0: scalaz.stream.Process0[Int] = Emit(Vector(0))
3 emitAll(Seq(1,2,3))            //> res1: scalaz.stream.Process0[Int] = Emit(List(1, 2, 3))
4 Process(1,2,3)                 //> res2: scalaz.stream.Process0[Int] = Emit(WrappedArray(1, 2, 3))
5 Process()                      //> res3: scalaz.stream.Process0[Nothing] = Emit(List())

以上都是Process0的構建方式,也算是數據源。但它們只是代表了記憶體中的一串值,對我們來說沒什麼意義,因為我們希望從外設獲取這些值,比如從文件或者資料庫里讀取數據,也就是說需要F運算效果。Process0[O] >>> Process[Nothing,O],而我們需要的是Process[F,O]。那麼我們這樣寫如何呢?

1 val p: Process[Task,Int] = emitAll(Seq(1,2,3))    
2    //> p  : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3))
3 
4 emitAll(Seq(1,2,3)).toSource
5    //> res4: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3))
6                                                   

類型倒是匹配了,但表達式Emit(...)里沒有任何Task的影子,這個無法滿足我們對Source的需要。看來只有以下這種方式了:

1 await(Task.delay{3})(emit)                        
2 //> res5: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@57855c9a,<function1>,<function1>)
3 eval(Task.delay{3})                               
4 //> res6: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@63e2203c,<function1>,<function1>)

現在不但類型匹配,而且表達式里還包含了Task運算。我們通過Task.delay可以進行文件讀取等帶有副作用的運算,這是因為Await將會運行req:F[E] >>> Task[Int]。這正是我們需要的Source。那我們能不能用這個Source來發出一串數據呢?

 1 def emitSeq[A](xa: Seq[A]):Process[Task,A] =
 2   xa match {
 3     case h :: t => await(Task.delay {h})(emit) ++ emitSeq(t)
 4     case Nil => halt
 5   }                                     //> emitSeq: [A](xa: Seq[A])scalaz.stream.Process[scalaz.concurrent.Task,A]
 6 val es1 = emitSeq(Seq(1,2,3))           //> es1  : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Await(scalaz.concurrent.Task@2d6eabae,<function1>,<function1>),Vector(<function1>))
 7 val es2 = emitSeq(Seq("a","b","c"))     //> es2  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Await(
 8 scalaz.concurrent.Task@4e7dc304,<function1>,<function1>),Vector(<function1>))
 9 es1.runLog.run                          //> res7: Vector[Int] = Vector(1, 2, 3)
10 es2.runLog.run                          //> res8: Vector[String] = Vector(a, b, c)

以上示範中我們用await運算了Task,然後返回了Process[Task,?],一個可能帶副作用運算的Source。實際上我們在很多情況下都需要從外部的源頭用Task來獲取一些數據,通常這些數據源都對數據獲取進行了非同步(asynchronous)運算處理,然後通過callback方式來提供這些數據。我們可以用Task.async函數來把這些callback函數轉變成Task,下一步我們只需要用Process.eval或者await就可以把這個Task升格成Process[Task,?]。我們先看個簡單的例子:假如我們用scala.concurrent.Future來進行非同步數據讀取,可以這樣把Future轉換成Process:

 1 def getData(dbName: String): Task[String] = Task.async { cb =>
 2    import scala.concurrent._
 3    import scala.concurrent.ExecutionContext.Implicits.global
 4    import scala.util.{Success,Failure}
 5    Future { s"got data from $dbName" }.onComplete {
 6      case Success(a) => cb(a.right)
 7      case Failure(e) => cb(e.left)
 8    }
 9 }                                        //> getData: (dbName: String)scalaz.concurrent.Task[String]
10 val procGetData = eval(getData("MySQL")) //> procGetData  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@dd3b207,<function1>,<function1>)
11 procGetData.runLog.run                   //> res9: Vector[String] = Vector(got data from MySQL)

我們也可以把java的callback轉變成Task:

 1   import com.ning.http.client._
 2   val asyncHttpClient = new AsyncHttpClient()     //> asyncHttpClient  : com.ning.http.client.AsyncHttpClient = com.ning.http.client.AsyncHttpClient@245b4bdc
 3   def get(s: String): Task[Response] = Task.async[Response] { callback =>
 4     asyncHttpClient.prepareGet(s).execute(
 5       new AsyncCompletionHandler[Unit] {
 6         def onCompleted(r: Response): Unit = callback(r.right)
 7 
 8         def onError(e: Throwable): Unit = callback(e.left)
 9       }
10     )
11   }                 //> get: (s: String)scalaz.concurrent.Task[com.ning.http.client.Response]
12   val prcGet = Process.eval(get("http://sina.com"))
13                     //> prcGet  : scalaz.stream.Process[scalaz.concurrent.Task,com.ning.http.client.Response] = Await(scalaz.concurrent.Task@222545dc,<function1>,<function1>)
14   prcGet.run.run    //> 12:25:27.852 [New I/O worker #1] DEBUG c.n.h.c.p.n.r.NettyConnectListener -Request using non cached Channel '[id: 0x23fa1307, /192.168.200.3:50569 =>sina.com/66.102.251.33:80]':

如果直接按照scalaz Task callback的類型款式 def async(callback:(Throwable \/ Unit) => Unit):

 1   def read(callback: (Throwable \/ Array[Byte]) => Unit): Unit = ???
 2                                  //> read: (callback: scalaz.\/[Throwable,Array[Byte]] => Unit)Unit
 3   val t: Task[Array[Byte]] = Task.async(read)     //> t  : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@1a677343
 4   val t2: Task[Array[Byte]] = for {
 5     bytes <- t
 6     moarBytes <- t
 7   } yield (bytes ++ moarBytes)          //> t2  : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@15de0b3c
 8   val prct2 = Process.eval(t2)          //> prct2  : scalaz.stream.Process[scalaz.concurrent.Task,Array[Byte]] = Await(scalaz.concurrent.Task@15de0b3c,<function1>,<function1>)
 9 
10   def asyncRead(succ: Array[Byte] => Unit, fail: Throwable => Unit): Unit = ???
11                           //> asyncRead: (succ: Array[Byte] => Unit, fail: Throwable => Unit)Unit
12   val t3: Task[Array[Byte]] = Task.async { callback =>
13      asyncRead(b => callback(b.right), err => callback(err.left))
14   }                      //> t3  : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@489115ef
15   val t4: Task[Array[Byte]] = t3.flatMap(b => Task(b))
16                          //> t4  : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@3857f613
17   val prct4 = Process.eval(t4)      //> prct4  : scalaz.stream.Process[scalaz.concurrent.Task,Array[Byte]] = Await(scalaz.concurrent.Task@3857f613,<function1>,<function1>)

我們也可以用timer來產生Process[Task,A]:

1   import scala.concurrent.duration._
2   implicit val scheduler = java.util.concurrent.Executors.newScheduledThreadPool(3)
3                   //> scheduler  : java.util.concurrent.ScheduledExecutorService = java.util.concurrent.ScheduledThreadPoolExecutor@516be40f[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
4   val fizz = time.awakeEvery(3.seconds).map(_ => "fizz")
5                   //> fizz  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@5762806e,<function1>,<function1>)
6   val fizz3 = fizz.take(3)   //> fizz3  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Halt(End),Vector(<function1>))
7   fizz3.runLog.run           //> res9: Vector[String] = Vector(fizz, fizz, fizz)

Queue、Top和Signal都可以作為帶副作用數據源的構建器。我們先看看Queue是如何產生數據源的:

 1   type BigStringResult = String
 2   val qJobResult = async.unboundedQueue[BigStringResult]
 3                          //> qJobResult  : scalaz.stream.async.mutable.Queue[demo.ws.blogStream.BigStringResult] = scalaz.stream.async.mutable.Queue$$anon$1@25d250c6
 4   def longGet(jobnum: Int): BigStringResult = {
 5     Thread.sleep(2000)
 6     s"Some large data sets from job#${jobnum}"
 7   }                      //> longGet: (jobnum: Int)demo.ws.blogStream.BigStringResult
 8       
 9 //  multi-tasking
10     val start = System.currentTimeMillis()        //> start  : Long = 1468556250797
11     Task.fork(qJobResult.enqueueOne(longGet(1))).unsafePerformAsync{case _ => ()}
12     Task.fork(qJobResult.enqueueOne(longGet(2))).unsafePerformAsync{case _ => ()}
13     Task.fork(qJobResult.enqueueOne(longGet(3))).unsafePerformAsync{case _ => ()}
14     val timemill = System.currentTimeMillis() - start
15                                                   //> timemill  : Long = 17
16     Thread.sleep(3000)
17     qJobResult.close.run
18  val bigData = {
19  //multi-tasking
20     val j1 = qJobResult.dequeue
21     val j2 = qJobResult.dequeue
22     val j3 = qJobResult.dequeue
23     for {
24      r1 <- j1
25      r2 <- j2
26      r3 <- j3
27     } yield r1 + ","+ r2 + "," + r3
28   }                       //> bigData  : scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String] = Await(scalaz.concurrent.Task@778d1062,<function1>,<function1>)
29   
30   bigData.runLog.run      //> res9: Vector[String] = Vector(Some large data sets from job#2,Some large data sets from job#3,Some large data sets from job#1)

再看看Topic示範:

 1 import scala.concurrent._
 2   import scala.concurrent.duration._
 3   import scalaz.stream.async.mutable._
 4   import scala.concurrent.ExecutionContext.Implicits.global
 5   val sharedData: Topic[BigStringResult] = async.topic()
 6        //> sharedData  : scalaz.stream.async.mutable.Topic[demo.ws.blogStream.BigStringResult] = scalaz.stream.async.package$$anon$1@797badd3
 7   val subscriber = sharedData.subscribe.runLog    //> subscriber  : scalaz.concurrent.Task[Vector[demo.ws.blogStream.BigStringResult]] = scalaz.concurrent.Task@226a82c4
 8   val otherThread = future {
 9     subscriber.run // Added this here - now subscriber is really attached to the topic
10   }                //> otherThread  : scala.concurrent.Future[Vector[demo.ws.blogStream.BigStringResult]] = List()
11   // Need to give subscriber some time to start up.
12   // I doubt you'd do this in actual code.
13 
14   // topics seem more useful for hooking up things like
15   // sensors that produce a continual stream of data,
16 
17   // and where individual values can be dropped on
18   // floor.
19   Thread.sleep(100)
20 
21   sharedData.publishOne(longGet(1)).run // don't just call publishOne; need to run the resulting task
22   sharedData.close.run // Don't just call close; need to run the resulting task
23 
24   // Need to wait for the output
25   val result = Await.result(otherThread, Duration.Inf)
26        //> result  : Vector[demo.ws.blogStream.BigStringResult] = Vector(Some large data sets from job#1)

以上對可能帶有副作用的Source的各種產生方法提供瞭解釋和示範。scalaz-stream的其他類型節點將在下麵的討論中深入介紹。

 

 

 

 

 

 

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • ...
  • 1.Yeoman? yeoman是一個自動化腳手架工具。它提供很多generator,generator相當於VisualStudio的模板,用來初始化項目。更多的就不多說了,寫一遍都寫不完,自己看吧。 http://yeoman.io/ 2.安裝 yeoman 安裝yeoman之前你需要先安裝np ...
  • 一、EasyUI 基本的拖動和放置 直接代碼看: 二、購物車 三、課程表 ...
  • 1.兩個拷貝之間主要是用於對象之間的拷貝! 2.區別 沒指針: 深拷貝和淺拷貝沒什麼區別; 有指針: 淺拷貝:即對象的預設拷貝函數,只是將指針的地址拷貝給對象,兩個變數同時指向一個地址,這樣在析構的時候必然會導致程式崩潰; 深拷貝:即要自定義拷貝函數,將指針new一段新記憶體保存起來;這樣析構也不會崩 ...
  • 這是本學期java課中實驗大綱里的第一個實驗,這裡簡單做了一個無用戶界面版本。 能看到判斷對錯的方法運用了直接運算符計算結果與函數定義後的運算結果相比較,相等得分,不相等則不得分。 編程中出現的問題:在計算結果為小數的除法時,一開始會出現結果怎麼算都不對的情況,他的正確答案也是一個向偶舍入的數值,那 ...
  • 直接插入排序 直接插入排序是一種簡單的插入排序法,其基本思想是:把待排序的紀錄按其關鍵碼值的大小逐個插入到一個已經排好序的有序序列中,直到所有的紀錄插入完為止,得到一個新的有序序列。[1] 例如,已知待排序的一組紀錄是: 60,71,49,11,24,3,66 假設在排序過程中,前3個紀錄已按關鍵碼 ...
  • 最近遇到的關於VS里編譯出現的“無法解析的外部符號”問題,在網上尋求解決辦=辦法時查到下麵的博客內容,作者講解的挺全面的,作為收藏以備將來查詢。 原文http://blog.csdn.net/shenyulv/article/details/6699836 VC++時經常會遇到鏈接錯誤LNK2001 ...
  • 從誕生至今,20多年過去,Java至今仍是使用最為廣泛的語言。這仰賴於Java提供的各種技術和特性,讓開發人員能優雅的編寫高效的程式。今天我們就來說說Java的一項基本但非常重要的技術記憶體管理 瞭解C語言的同學都知道,在C語言中記憶體的開闢和釋放都是由我們自己來管理的,每一個new操作都要對於一個de ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...