上節我們探討了通過scalaz-stream-fs2來驅動一套數據處理流程,用fs2的Pipe類型來實現對數據流的逐行操作。本篇討論準備在上節討論的基礎上對數據流的流動和元素操作進行優化完善。如數據流動中增加諸如next、skip、eof功能、內容控制中增加對行元素的append、insert、up ...
上節我們探討了通過scalaz-stream-fs2來驅動一套數據處理流程,用fs2的Pipe類型來實現對數據流的逐行操作。本篇討論準備在上節討論的基礎上對數據流的流動和元素操作進行優化完善。如數據流動中增加諸如next、skip、eof功能、內容控制中增加對行元素的append、insert、update、remove等操作方法。但是經過一番對fs2的再次解讀,發現這些操作模式並不像我所想象那樣的方式,實際上用fs2來實現數據行控制可能會更加簡單和直接。這是因為與傳統資料庫行瀏覽方式不同的是fs2是一種拖式流(pull-model stream),它的數據行集合是一種泛函不可變集合。每一行一旦讀取就等於直接消耗了斷(consumed),所以只支持一種向前逐行讀取模式。如果形象地描述的話,我們習慣的所謂數據集瀏覽可能是下麵這樣的場景:
讀取一行數據 >>> (使用或更新行欄位值)>>> 向下游發送新的一行數據。只有停止發送動作才代表終止運算。完成對上游的所有行數據讀取並不代表終止操作,因為我們還可以不斷向下游發送自定義產生的數據行。
我們用fs2模擬一套數據流管道FDAPipeLine,管道中間有不定數量的作業節點FDAWorkNode。作業方式包括從管道上游截取一個數據元素、對其進行處理、然後選擇是否向下游的管道介面(FDAPipeJoint)發送。下麵是這套模擬的類型:fdapipes/package.scala
1 package com.bayakala.funda {
2
3 import fs2._
4
5 package object fdapipes {
6 //數據行類型
7 trait FDAROW
8
9 //數據處理管道
10 type FDAPipeLine[ROW] = Stream[Task, ROW]
11 //數據作業節點
12 type FDAWorkNode[ROW] = Pipe[Task, ROW, ROW]
13 //數據管道開關閥門,從此處獲得管道內數據
14 type FDAValve[ROW] = Handle[Task, ROW]
15 //管道連接器
16 type FDAPipeJoint[ROW] = Pull[Task, ROW, Unit]
17
18 //作業類型
19 type FDATask[ROW] = ROW => Option[List[ROW]]
20
21 }
22
23 }
註意這個FDAROW類型:這是一種泛類型,因為在管道中流動的數據可能有多重類型,如數據行和QueryAction行。
流動控制方法:FDAValves.scala
1 package com.bayakala.funda.fdapipes
2 import fs2._
3 object FDAValves { //流動控制方法
4 //跳過本行(不向下游發送)
5 def fda_skip[ROW] = Some(List[ROW]())
6 //將本行發送至下游連接管道
7 def fda_next[ROW](r: ROW) = Some(List[ROW](r))
8 //終止流動
9 def fda_break = None
10
11 }
數據發送方法:FDAPipes.scala
1 package com.bayakala.funda.fdapipes
2 import fs2._
3 object FDAJoints { //數據發送方法
4 //write rows down the pipeline
5 def fda_pushRow[ROW](row: ROW) = Pull.output1(row)
6 def fda_pushRows[ROW](rows: List[ROW]) = Pull.output(Chunk.seq(rows))
7 }
作業節點工作方法:
1 package com.bayakala.funda.fdapipes
2 import FDAJoints._
3 object FDANodes { //作業節點工作方法
4 def fda_execUserTask[ROW](task: FDATask[ROW]): FDAWorkNode[ROW] = {
5 def go: FDAValve[ROW] => FDAPipeJoint[ROW] = h => {
6 h.receive1Option {
7 case Some((r, h)) => task(r) match {
8 case Some(xr) => xr match {
9 case Nil => go(h)
10 case _ => fda_pushRows(xr) >> go(h)
11 }
12 case None => fda_halt
13 }
14 case None => fda_halt
15 }
16 }
17 in => in.pull(go)
18 }
19
20 }
下麵我們就示範這個工具庫的具體使用方法:examples/Example1.scala
設置示範環境:
1 package com.bayakala.funda.fdapipes.examples
2 import fs2._
3 import com.bayakala.funda.fdapipes._
4 import FDANodes._
5 import FDAValves._
6 import Helpers._
7 object Example1 extends App {
8
9
10 case class Employee(id: Int, name: String, age: Int, salary: BigDecimal) extends FDAROW
11 // test data set
12 val r1 = Employee(1, "John", 23, 100.00)
13 val r2 = Employee(2, "Peter", 25,100.00)
14 val r3 = Employee(3, "Kay", 35,100.00)
15 val r4 = Employee(4, "Cain", 45,100.00)
16 val r5 = Employee(5, "Catty", 35,100.00)
17 val r6 = Employee(6, "Little", 19,80.00)
註意Employee是一種行類型,因為它extends FDAROW。
我們再寫一個跟蹤顯示當前流動數據行的函數:examples/Helpers.scala
1 package com.bayakala.funda.fdapipes.examples
2 import com.bayakala.funda.fdapipes._
3 import fs2.Task
4 object Helpers {
5 def log[ROW](prompt: String): FDAWorkNode[ROW] =
6 _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}
7 }
下麵我們就用幾個有不同要求的例子來示範流動控制和數據處理功能,這些例子就是給最終用戶的標準編程示範版本,然後由用戶照版編寫:
1、根據每條數據狀態逐行進行處理:
1 // 20 - 30歲加10%, 30歲> 加20%,其它加 5%
2 def raisePay: FDATask[FDAROW] = row => {
3 row match {
4 case emp: Employee => {
5 val cur = emp.age match {
6 case a if ((a >= 20) && (a < 30)) => emp.copy(salary = emp.salary * 1.10)
7 case a if ((a >= 30)) => emp.copy(salary = emp.salary * 1.20)
8 case _ => emp.copy(salary = emp.salary * 1.05)
9 }
10 fda_next(cur)
11 }
12 case _ => fda_skip
13 }
14 }
用戶提供的功能函數類型必須是FDATask[FDAROW]。類型參數FDAROW代表數據行通用類型。如果用戶指定了FDATask[Employee]函數類型,那麼必須保證管道中流動的數據行只有Employee一種類型。完成對當前行數據的處理後用fda_next(emp)把它發送到下一節連接管道。我們用下麵的組合函數來進行運算:
Stream(r1,r2,r3,r4,r5,r6)
.through(log("加薪前>"))
.through(fda_execUserTask[FDAROW](raisePay))
.through(log("加薪後>"))
.run.unsafeRun
-----
運算結果:
加薪前>> Employee(1,John,23,100.0)
加薪後>> Employee(1,John,23,110.00)
加薪前>> Employee(2,Peter,25,100.0)
加薪後>> Employee(2,Peter,25,110.00)
加薪前>> Employee(3,Kay,35,100.0)
加薪後>> Employee(3,Kay,35,120.00)
加薪前>> Employee(4,Cain,45,100.0)
加薪後>> Employee(4,Cain,45,120.00)
加薪前>> Employee(5,Catty,35,100.0)
加薪後>> Employee(5,Catty,35,120.00)
加薪前>> Employee(6,Little,19,80.0)
加薪後>> Employee(6,Little,19,84.000)
2、在一組數據行內根據每條數據狀態進行篩選:
// 篩選40歲以上員工
def filter40: FDATask[FDAROW] = row => {
row match {
case emp: Employee => {
if (emp.age > 40)
Some(List(emp))
else fda_skip[Employee]
}
case _ => fda_break
}
}
println("---------")
Stream(r1,r2,r3,r4,r5,r6)
.through(log("年齡>"))
.through(fda_execUserTask[FDAROW](filter40))
.through(log("合格>"))
.run.unsafeRun
---
運算結果:
年齡>> Employee(1,John,23,100.0)
年齡>> Employee(2,Peter,25,100.0)
年齡>> Employee(3,Kay,35,100.0)
年齡>> Employee(4,Cain,45,100.0)
合格>> Employee(4,Cain,45,100.0)
年齡>> Employee(5,Catty,35,100.0)
年齡>> Employee(6,Little,19,80.0)
-
3、根據當前數據行狀態終止作業:
1 // 瀏覽至第一個30歲以上員工,跳出
2 def stopOn30: FDATask[Employee] = emp => {
3 if (emp.age > 30)
4 fda_break
5 else
6 Some(List(emp))
7 }
8 println("---------")
9 Stream(r1,r2,r3,r4,r5,r6)
10 .through(log("當前員工>"))
11 .through(fda_execUserTask[Employee](stopOn30))
12 .through(log("選入名單>"))
13 .run.unsafeRun
14 ---
15 運算結果:
16 當前員工>> Employee(1,John,23,100.0)
17 選入名單>> Employee(1,John,23,100.0)
18 當前員工>> Employee(2,Peter,25,100.0)
19 選入名單>> Employee(2,Peter,25,100.0)
20 當前員工>> Employee(3,Kay,35,100.0)
在這個例子里用戶指定了行類型統一為Employee。
我們還可以把多個功能串接起來。像下麵這樣把1和2兩個功能連起來:
Stream(r1,r2,r3,r4,r5,r6)
.through(log("加薪前>"))
.through(fda_execUserTask[FDAROW](raisePay))
.through(log("加薪後>"))
.through(log("年齡>"))
.through(fda_execUserTask[FDAROW](filter40))
.through(log("合格>"))
.run.unsafeRun
---
運算結果:
加薪前>> Employee(1,John,23,100.0)
加薪後>> Employee(1,John,23,110.00)
年齡>> Employee(1,John,23,110.00)
加薪前>> Employee(2,Peter,25,100.0)
加薪後>> Employee(2,Peter,25,110.00)
年齡>> Employee(2,Peter,25,110.00)
加薪前>> Employee(3,Kay,35,100.0)
加薪後>> Employee(3,Kay,35,120.00)
年齡>> Employee(3,Kay,35,120.00)
加薪前>> Employee(4,Cain,45,100.0)
加薪後>> Employee(4,Cain,45,120.00)
年齡>> Employee(4,Cain,45,120.00)
合格>> Employee(4,Cain,45,120.00)
加薪前>> Employee(5,Catty,35,100.0)
加薪後>> Employee(5,Catty,35,120.00)
年齡>> Employee(5,Catty,35,120.00)
加薪前>> Employee(6,Little,19,80.0)
加薪後>> Employee(6,Little,19,84.000)
年齡>> Employee(6,Little,19,84.000)
下麵我把完整的示範代碼提供給大家:
package com.bayakala.funda.fdapipes.examples import fs2._ import com.bayakala.funda.fdapipes._ import FDANodes._ import FDAValves._ import Helpers._ object Example1 extends App { case class Employee(id: Int, name: String, age: Int, salary: BigDecimal) extends FDAROW // test data set val r1 = Employee(1, "John", 23, 100.00) val r2 = Employee(2, "Peter", 25,100.00) val r3 = Employee(3, "Kay", 35,100.00) val r4 = Employee(4, "Cain", 45,100.00) val r5 = Employee(5, "Catty", 35,100.00) val r6 = Employee(6, "Little", 19,80.00) // 20 - 30歲加10%, 30歲> 加20%,其它加 5% def raisePay: FDATask[FDAROW] = row => { row match { case emp: Employee => { val cur = emp.age match { case a if ((a >= 20) && (a < 30)) => emp.copy(salary = emp.salary * 1.10) case a if ((a >= 30)) => emp.copy(salary = emp.salary * 1.20) case _ => emp.copy(salary = emp.salary * 1.05) } fda_next(cur) } case _ => fda_skip } } Stream(r1,r2,r3,r4,r5,r6) .through(log("加薪前>")) .through(fda_execUserTask[FDAROW](raisePay)) .through(log("加薪後>")) .run.unsafeRun // 篩選40歲以上員工 def filter40: FDATask[FDAROW] = row => { row match { case emp: Employee => { if (emp.age > 40) Some(List(emp)) else fda_skip[Employee] } case _ => fda_break } } println("---------") Stream(r1,r2,r3,r4,r5,r6) .through(log("年齡>")) .through(fda_execUserTask[FDAROW](filter40)) .through(log("合格>")) .run.unsafeRun // 瀏覽至第一個30歲以上員工,跳出 def stopOn30: FDATask[Employee] = emp => { if (emp.age > 30) fda_break else Some(List(emp)) } println("---------") Stream(r1,r2,r3,r4,r5,r6) .through(log("當前員工>")) .through(fda_execUserTask[Employee](stopOn30)) .through(log("選入名單>")) .run.unsafeRun println("---------") Stream(r1,r2,r3,r4,r5,r6) .through(log("加薪前>")) .through(fda_execUserTask[FDAROW](raisePay)) .through(log("加薪後>")) .through(log("年齡>")) .through(fda_execUserTask[FDAROW](filter40)) .through(log("合格>")) .run.unsafeRun }