在上一集的討論里我們介紹並實現了強類型返回結果行。使用強類型主要的目的是當我們把後端資料庫SQL批次操作搬到記憶體里轉變成數據流式按行操作時能更方便、準確、高效地選定數據欄位。在上集討論示範里我們用集合的foreach方式模擬了一個最簡單的數據流,並把從資料庫里批次讀取的數據集轉換成一串連續的數據行來 ...
在上一集的討論里我們介紹並實現了強類型返回結果行。使用強類型主要的目的是當我們把後端資料庫SQL批次操作搬到記憶體里轉變成數據流式按行操作時能更方便、準確、高效地選定數據欄位。在上集討論示範里我們用集合的foreach方式模擬了一個最簡單的數據流,並把從資料庫里批次讀取的數據集轉換成一串連續的數據行來逐行使用。一般來說完整的流式數據處理流程包括了從資料庫中讀取數據、根據讀取的每行數據狀態再對後臺資料庫進行更新,包括:插入新數據、更新、刪除等。那麼在上篇中實現的流式操作基礎上再添加一種指令行類型就可以完善整個數據處理流程了,就像下麵這個圖示:
Database => Query -> Collection => Streaming -> DataRow => QueryAction(DataRow) -> ActionRow => execAction(ActionRow) -> Database
如果我們還是以Slick為目標FRM,那麼這個ActionRow的類型就是Slick的DBIO[T]了:
1 package com.bayakala.funda.rowtypes
2 import slick.dbio._
3 object ActionType {
4 type FDAAction[T] = DBIO[T]
5 }
記得有一次在一個Scala討論區里遇到這樣一個問題:如何把a表裡的status欄位更新成b表的status欄位值,轉化成SQL語句如下:
update a,b set a.status=b.status where a.id=b.id
那位哥們的問題是如何用Slick來實現對a表的更新,不能用sql"???" interpolation 直接調用SQL語句,可能因為要求compile time語法check保障吧。這個問題用Slick Query還真的不太容易解決(能不能解決就不想費功夫去想了),這是因為FRM的SQL批次處理弱點。如果用FunDA的流式操作思路就會很容易解決了,只要用join Query把b.status讀出來再用b.id=a.id逐個更新a.status。剛好,下麵我們就示範通過ActionRow來解決這個問題。先用下麵這段代碼來設置測試數據:
1 import slick.dbio.DBIO
2 import slick.driver.H2Driver.api._
3
4 import scala.concurrent.duration._
5 import scala.concurrent.{Await, Future}
6 import scala.util.{Failure, Success}
7 import scala.concurrent.ExecutionContext.Implicits.global
8 import slick.jdbc.meta.MTable
9 object ActionRowTest extends App {
10
11 class ATable(tag: Tag) extends Table[(Int,String,Int)](tag,"TA") {
12 def id = column[Int]("id",O.PrimaryKey)
13 def flds = column[String]("aflds")
14 def status = column[Int]("status")
15 def * = (id,flds,status)
16 }
17 val tableA = TableQuery[ATable]
18
19 class BTable(tag: Tag) extends Table[(Int,String,Int)](tag,"TB") {
20 def id = column[Int]("id",O.PrimaryKey)
21 def flds = column[String]("bflds")
22 def status = column[Int]("status")
23 def * = (id,flds,status)
24 }
25 val tableB = TableQuery[BTable]
26
27 val insertAAction =
28 tableA ++= Seq (
29 (1,"aaa",0),
30 (2,"bbb",3),
31 (3,"ccc",1),
32 (4,"ddd",0),
33 (16,"kkk",16)
34 )
35 val insertBAction =
36 tableB ++= Seq (
37 (1,"aaa",1),
38 (2,"bbb",2),
39 (3,"ccc",3),
40 (4,"ddd",4),
41 (5,"kkk",5)
42 )
43
44 val db = Database.forConfig("h2db")
45
46
47 def tableExists(tables: Vector[MTable], tblname: String) =
48 tables.exists {t =>t.name.toString.contains(tblname)}
49
50 def createSchemaIfNotExists(): Future[Unit] = {
51 db.run(MTable.getTables).flatMap {
52 case tables if !tableExists(tables,".TA") && !tableExists(tables,".TB") =>
53 println("Creating schemas for TA and TB...")
54 db.run((tableA.schema ++ tableB.schema).create)
55 case tables if !tableExists(tables,".TA") =>
56 println("Creating schema for TA ...")
57 db.run(tableA.schema.create)
58 case tables if !tableExists(tables,".TB") =>
59 println("Creating schema for TB ...")
60 db.run(tableB.schema.create)
61 case _ =>
62 println("Schema for TA, TB already created.")
63 Future.successful()
64 }
65 }
66
67 def insertInitialData(): Future[Unit] = {
68 val cleanInsert = DBIO.seq(
69 tableA.delete, tableB.delete,
70 insertAAction,
71 insertBAction)
72 db.run(cleanInsert).andThen {
73 case Success(_) => println("Data insert completed.")
74 case Failure(e) => println(s"Data insert failed [${e.getMessage}]")
75 }
76 }
77
78 Await.ready(db.run(sql"DROP TABLE TA; DROP TABLE TB".as[String]),Duration.Inf)
79
80 val initResult = createSchemaIfNotExists().flatMap {_ => insertInitialData()}
81 Await.ready(initResult,Duration.Inf)
82
83
84
85
86 }
用join query先把這兩個表相關的欄位值搬到記憶體轉成強類型行FDADataRow:
1 val selectAB = for {
2 a <- tableA
3 b <- tableB
4 if (a.id === b.id)
5 } yield (a.id,b.id,a.status,b.status)
6
7 case class ABRow (id: Int, asts: Int, bsts: Int)
8 def toABRow(raw: (Int,Int,Int,Int)) = ABRow(raw._1,raw._3,raw._4)
9
10 import com.bayakala.funda.rowtypes.DataRowType
11
12 val loader = FDADataRow(slick.driver.H2Driver, toABRow _)
13 loader.getTypedRows(selectAB.result)(db).foreach {dataRow =>
14 println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}")
15 }
初始結果如下:
ID:1 Status A = 0, B = 1
ID:2 Status A = 3, B = 2
ID:3 Status A = 1, B = 3
ID:4 Status A = 0, B = 4
現在我們把每條數據行DataRow轉成動作行ActionRow。然後把每條DataRow的asts欄位值替換成bsts的欄位值:
1 import com.bayakala.funda.rowtypes.ActionType.FDAAction
2 def updateAStatus(row: ABRow): FDAAction[Int] = {
3 tableA.filter{r => r.id === row.id}
4 .map(_.status)
5 .update(row.asts)
6 }
7
8
9 loader.getTypedRows(selectAB.result)(db).map(updateAStatus(_)).foreach {
10 actionRow =>
11 println(s"${actionRow.toString}")
12 }
顯示結果如下:
slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@492691d7 slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@27216cd slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@558bdf1f slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@8576fa0
現在每條DataRow已經被轉化成jdbc action類型了。
下一步我們只需要運行這些ActionRow就可以完成任務了:
1 def execAction(act: FDAAction[Int]) = db.run(act)
2
3 loader.getTypedRows(selectAB.result)(db)
4 .map(updateAStatus(_))
5 .map(execAction(_))
現在再看看資料庫中的TA表狀態:
loader.getTypedRows(selectAB.result)(db).foreach {dataRow =>
println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}")
}
結果:
ID:1 Status A = 1, B = 1
ID:2 Status A = 2, B = 2
ID:3 Status A = 3, B = 3
ID:4 Status A = 4, B = 4
我們看到已經正確更新了TA的status欄位值。
在這個示範中明顯有很多不足之處:如果a.status=b.status應該省略更新步驟。這是因為foreach只能模擬最基本的數據流動。如果我們使用了具備強大功能的Stream工具庫如scalaz-stream-fs2,就可以更好控制數據元素的流動。更重要的是scalaz-stream-fs2支持並行運算,那麼上面所描述的流程:
Database => Query -> Collection => Streaming -> DataRow => QueryAction(DataRow) -> ActionRow => execAction(ActionRow) -> Database
幾個 => 環節:Query、Streaming、QueryAction、execAction將可以並行運算,從而實現充分利用多核CPU硬體資源,提高運算效率的目的。下麵是這次討論涉及的源代碼:
1 package com.bayakala.funda.rowtypes
2
3 import scala.concurrent.duration._
4 import scala.concurrent.Await
5 import slick.driver.JdbcProfile
6
7 object DataRowType {
8 class FDADataRow[SOURCE, TARGET](slickProfile: JdbcProfile,convert: SOURCE => TARGET){
9 import slickProfile.api._
10
11 def getTypedRows(slickAction: DBIO[Iterable[SOURCE]])(slickDB: Database): Iterable[TARGET] =
12 Await.result(slickDB.run(slickAction), Duration.Inf).map(raw => convert(raw))
13 }
14
15 object FDADataRow {
16 def apply[SOURCE, TARGET](slickProfile: JdbcProfile, converter: SOURCE => TARGET): FDADataRow[SOURCE, TARGET] =
17 new FDADataRow[SOURCE, TARGET](slickProfile, converter)
18 }
19
20 }
1 package com.bayakala.funda.rowtypes
2 import slick.dbio._
3 object ActionType {
4 type FDAAction[T] = DBIO[T]
5 }
1 import slick.dbio.DBIO 2 import slick.driver.H2Driver.api._ 3 4 import scala.concurrent.duration._ 5 import scala.concurrent.{Await, Future} 6 import scala.util.{Failure, Success} 7 import scala.concurrent.ExecutionContext.Implicits.global 8 import slick.jdbc.meta.MTable 9 object ActionRowTest extends App { 10 11 class ATable(tag: Tag) extends Table[(Int,String,Int)](tag,"TA") { 12 def id = column[Int]("id",O.PrimaryKey) 13 def flds = column[String]("aflds") 14 def status = column[Int]("status") 15 def * = (id,flds,status) 16 } 17 val tableA = TableQuery[ATable] 18 19 class BTable(tag: Tag) extends Table[(Int,String,Int)](tag,"TB") { 20 def id = column[Int]("id",O.PrimaryKey) 21 def flds = column[String]("bflds") 22 def status = column[Int]("status") 23 def * = (id,flds,status) 24 } 25 val tableB = TableQuery[BTable] 26 27 val insertAAction = 28 tableA ++= Seq ( 29 (1,"aaa",0), 30 (2,"bbb",3), 31 (3,"ccc",1), 32 (4,"ddd",0), 33 (16,"kkk",16) 34 ) 35 val insertBAction = 36 tableB ++= Seq ( 37 (1,"aaa",1), 38 (2,"bbb",2), 39 (3,"ccc",3), 40 (4,"ddd",4), 41 (5,"kkk",5) 42 ) 43 44 val db = Database.forConfig("h2db") 45 46 47 def tableExists(tables: Vector[MTable], tblname: String) = 48 tables.exists {t =>t.name.toString.contains(tblname)} 49 50 def createSchemaIfNotExists(): Future[Unit] = { 51 db.run(MTable.getTables).flatMap { 52 case tables if !tableExists(tables,".TA") && !tableExists(tables,".TB") => 53 println("Creating schemas for TA and TB...") 54 db.run((tableA.schema ++ tableB.schema).create) 55 case tables if !tableExists(tables,".TA") => 56 println("Creating schema for TA ...") 57 db.run(tableA.schema.create) 58 case tables if !tableExists(tables,".TB") => 59 println("Creating schema for TB ...") 60 db.run(tableB.schema.create) 61 case _ => 62 println("Schema for TA, TB already created.") 63 Future.successful() 64 } 65 } 66 67 def insertInitialData(): Future[Unit] = { 68 val cleanInsert = DBIO.seq( 69 tableA.delete, tableB.delete, 70 insertAAction, 71 insertBAction) 72 db.run(cleanInsert).andThen { 73 case Success(_) => println("Data insert completed.") 74 case Failure(e) => println(s"Data insert failed [${e.getMessage}]") 75 } 76 } 77 78 Await.ready(db.run(sql"DROP TABLE TA; DROP TABLE TB".as[String]),Duration.Inf) 79 80 val initResult = createSchemaIfNotExists().flatMap {_ => insertInitialData()} 81 Await.ready(initResult,Duration.Inf) 82 83 84 val selectAB = for { 85 a <- tableA 86 b <- tableB 87 if (a.id === b.id) 88 } yield (a.id,b.id,a.status,b.status) 89 90 case class ABRow (id: Int, asts: Int, bsts: Int) 91 def toABRow(raw: (Int,Int,Int,Int)) = ABRow(raw._1,raw._3,raw._4) 92 93 import com.bayakala.funda.rowtypes.DataRowType.FDADataRow 94 95 val loader = FDADataRow(slick.driver.H2Driver, toABRow _) 96 loader.getTypedRows(selectAB.result)(db).foreach {dataRow => 97 println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}") 98 } 99 100 import com.bayakala.funda.rowtypes.ActionType.FDAAction 101 def updateAStatus(row: ABRow): FDAAction[Int] = { 102 tableA.filter{r => r.id === row.id} 103 .map(_.status) 104 .update(row.bsts) 105 } 106 107 108 loader.getTypedRows(selectAB.result)(db).map(updateAStatus(_)).foreach { 109 actionRow => 110 println(s"${actionRow.toString}") 111 } 112 113 def execAction(act: FDAAction[Int]) = db.run(act) 114 115 loader.getTypedRows(selectAB.result)(db) 116 .map(updateAStatus(_)) 117 .map(execAction(_)) 118 119 loader.getTypedRows(selectAB.result)(db).foreach {dataRow => 120 println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}") 121 } 122 123 }