FunDA(2)- Streaming Data Operation:流式數據操作

来源:http://www.cnblogs.com/tiger-xc/archive/2016/12/31/6239002.html
-Advertisement-
Play Games

在上一集的討論里我們介紹並實現了強類型返回結果行。使用強類型主要的目的是當我們把後端資料庫SQL批次操作搬到記憶體里轉變成數據流式按行操作時能更方便、準確、高效地選定數據欄位。在上集討論示範里我們用集合的foreach方式模擬了一個最簡單的數據流,並把從資料庫里批次讀取的數據集轉換成一串連續的數據行來 ...


   在上一集的討論里我們介紹並實現了強類型返回結果行。使用強類型主要的目的是當我們把後端資料庫SQL批次操作搬到記憶體里轉變成數據流式按行操作時能更方便、準確、高效地選定數據欄位。在上集討論示範里我們用集合的foreach方式模擬了一個最簡單的數據流,並把從資料庫里批次讀取的數據集轉換成一串連續的數據行來逐行使用。一般來說完整的流式數據處理流程包括了從資料庫中讀取數據、根據讀取的每行數據狀態再對後臺資料庫進行更新,包括:插入新數據、更新、刪除等。那麼在上篇中實現的流式操作基礎上再添加一種指令行類型就可以完善整個數據處理流程了,就像下麵這個圖示:

Database => Query -> Collection => Streaming -> DataRow => QueryAction(DataRow) -> ActionRow => execAction(ActionRow) -> Database 

如果我們還是以Slick為目標FRM,那麼這個ActionRow的類型就是Slick的DBIO[T]了:

1 package com.bayakala.funda.rowtypes
2 import slick.dbio._
3 object ActionType {
4   type FDAAction[T] = DBIO[T]
5 }

記得有一次在一個Scala討論區里遇到這樣一個問題:如何把a表裡的status欄位更新成b表的status欄位值,轉化成SQL語句如下:

 update a,b set a.status=b.status where a.id=b.id

那位哥們的問題是如何用Slick來實現對a表的更新,不能用sql"???" interpolation 直接調用SQL語句,可能因為要求compile time語法check保障吧。這個問題用Slick Query還真的不太容易解決(能不能解決就不想費功夫去想了),這是因為FRM的SQL批次處理弱點。如果用FunDA的流式操作思路就會很容易解決了,只要用join Query把b.status讀出來再用b.id=a.id逐個更新a.status。剛好,下麵我們就示範通過ActionRow來解決這個問題。先用下麵這段代碼來設置測試數據:

 1 import slick.dbio.DBIO
 2 import slick.driver.H2Driver.api._
 3 
 4 import scala.concurrent.duration._
 5 import scala.concurrent.{Await, Future}
 6 import scala.util.{Failure, Success}
 7 import scala.concurrent.ExecutionContext.Implicits.global
 8 import slick.jdbc.meta.MTable
 9 object ActionRowTest extends App {
10 
11   class ATable(tag: Tag) extends Table[(Int,String,Int)](tag,"TA")  {
12     def id = column[Int]("id",O.PrimaryKey)
13     def flds = column[String]("aflds")
14     def status = column[Int]("status")
15     def * = (id,flds,status)
16   }
17   val tableA = TableQuery[ATable]
18 
19   class BTable(tag: Tag) extends Table[(Int,String,Int)](tag,"TB")  {
20     def id = column[Int]("id",O.PrimaryKey)
21     def flds = column[String]("bflds")
22     def status = column[Int]("status")
23     def * = (id,flds,status)
24   }
25   val tableB = TableQuery[BTable]
26 
27   val insertAAction =
28     tableA ++= Seq (
29         (1,"aaa",0),
30         (2,"bbb",3),
31         (3,"ccc",1),
32         (4,"ddd",0),
33         (16,"kkk",16)
34     )
35    val insertBAction =
36      tableB ++= Seq (
37        (1,"aaa",1),
38        (2,"bbb",2),
39        (3,"ccc",3),
40        (4,"ddd",4),
41        (5,"kkk",5)
42      )
43 
44    val db = Database.forConfig("h2db")
45 
46 
47    def tableExists(tables: Vector[MTable], tblname: String) =
48     tables.exists {t =>t.name.toString.contains(tblname)}
49 
50    def createSchemaIfNotExists(): Future[Unit] = {
51     db.run(MTable.getTables).flatMap {
52       case tables if !tableExists(tables,".TA") && !tableExists(tables,".TB") =>
53         println("Creating schemas for TA and TB...")
54         db.run((tableA.schema ++ tableB.schema).create)
55       case tables if !tableExists(tables,".TA") =>
56         println("Creating schema for TA ...")
57         db.run(tableA.schema.create)
58       case tables if !tableExists(tables,".TB") =>
59         println("Creating schema for TB ...")
60         db.run(tableB.schema.create)
61       case _ =>
62         println("Schema for TA, TB already created.")
63         Future.successful()
64     }
65    }
66 
67    def insertInitialData(): Future[Unit] = {
68     val cleanInsert = DBIO.seq(
69       tableA.delete, tableB.delete,
70       insertAAction,
71       insertBAction)
72     db.run(cleanInsert).andThen {
73       case Success(_) => println("Data insert completed.")
74       case Failure(e) => println(s"Data insert failed [${e.getMessage}]")
75     }
76    }
77 
78    Await.ready(db.run(sql"DROP TABLE TA; DROP TABLE TB".as[String]),Duration.Inf)
79 
80    val initResult = createSchemaIfNotExists().flatMap {_ => insertInitialData()}
81    Await.ready(initResult,Duration.Inf)
82 
83 
84 
85 
86 }

用join query先把這兩個表相關的欄位值搬到記憶體轉成強類型行FDADataRow: 

 1 val selectAB = for {
 2      a <- tableA
 3      b <- tableB
 4      if (a.id === b.id)
 5    } yield (a.id,b.id,a.status,b.status)
 6 
 7    case class ABRow (id: Int, asts: Int, bsts: Int)
 8    def toABRow(raw: (Int,Int,Int,Int)) = ABRow(raw._1,raw._3,raw._4)
 9 
10    import com.bayakala.funda.rowtypes.DataRowType
11   
12    val loader = FDADataRow(slick.driver.H2Driver, toABRow _)
13    loader.getTypedRows(selectAB.result)(db).foreach {dataRow =>
14      println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}")
15    }

初始結果如下:

ID:1 Status A = 0, B = 1
ID:2 Status A = 3, B = 2
ID:3 Status A = 1, B = 3
ID:4 Status A = 0, B = 4

現在我們把每條數據行DataRow轉成動作行ActionRow。然後把每條DataRow的asts欄位值替換成bsts的欄位值:

 1 import com.bayakala.funda.rowtypes.ActionType.FDAAction
 2    def updateAStatus(row: ABRow): FDAAction[Int] = {
 3      tableA.filter{r => r.id === row.id}
 4           .map(_.status)
 5           .update(row.asts)
 6    }
 7 
 8 
 9    loader.getTypedRows(selectAB.result)(db).map(updateAStatus(_)).foreach {
10      actionRow =>
11        println(s"${actionRow.toString}")
12    }

顯示結果如下:

slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@492691d7
slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@27216cd
slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@558bdf1f
slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@8576fa0

現在每條DataRow已經被轉化成jdbc action類型了。

下一步我們只需要運行這些ActionRow就可以完成任務了:

1   def execAction(act: FDAAction[Int]) = db.run(act)
2   
3    loader.getTypedRows(selectAB.result)(db)
4        .map(updateAStatus(_))
5        .map(execAction(_))

現在再看看資料庫中的TA表狀態:

  loader.getTypedRows(selectAB.result)(db).foreach {dataRow =>
    println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}")
  }

結果:
ID:1 Status A = 1, B = 1
ID:2 Status A = 2, B = 2
ID:3 Status A = 3, B = 3
ID:4 Status A = 4, B = 4

我們看到已經正確更新了TA的status欄位值。

在這個示範中明顯有很多不足之處:如果a.status=b.status應該省略更新步驟。這是因為foreach只能模擬最基本的數據流動。如果我們使用了具備強大功能的Stream工具庫如scalaz-stream-fs2,就可以更好控制數據元素的流動。更重要的是scalaz-stream-fs2支持並行運算,那麼上面所描述的流程:

Database => Query -> Collection => Streaming -> DataRow => QueryAction(DataRow) -> ActionRow => execAction(ActionRow) -> Database

幾個 => 環節:Query、Streaming、QueryAction、execAction將可以並行運算,從而實現充分利用多核CPU硬體資源,提高運算效率的目的。

下麵是這次討論涉及的源代碼:

 1 package com.bayakala.funda.rowtypes
 2 
 3 import scala.concurrent.duration._
 4 import scala.concurrent.Await
 5 import slick.driver.JdbcProfile
 6 
 7 object DataRowType {
 8   class FDADataRow[SOURCE, TARGET](slickProfile: JdbcProfile,convert: SOURCE  => TARGET){
 9     import slickProfile.api._
10 
11     def getTypedRows(slickAction: DBIO[Iterable[SOURCE]])(slickDB: Database): Iterable[TARGET] =
12       Await.result(slickDB.run(slickAction), Duration.Inf).map(raw => convert(raw))
13   }
14 
15   object FDADataRow {
16     def apply[SOURCE, TARGET](slickProfile: JdbcProfile, converter: SOURCE => TARGET): FDADataRow[SOURCE, TARGET] =
17       new FDADataRow[SOURCE, TARGET](slickProfile, converter)
18   }
19 
20 }
1 package com.bayakala.funda.rowtypes
2 import slick.dbio._
3 object ActionType {
4   type FDAAction[T] = DBIO[T]
5 }
  1 import slick.dbio.DBIO
  2 import slick.driver.H2Driver.api._
  3 
  4 import scala.concurrent.duration._
  5 import scala.concurrent.{Await, Future}
  6 import scala.util.{Failure, Success}
  7 import scala.concurrent.ExecutionContext.Implicits.global
  8 import slick.jdbc.meta.MTable
  9 object ActionRowTest extends App {
 10 
 11   class ATable(tag: Tag) extends Table[(Int,String,Int)](tag,"TA")  {
 12     def id = column[Int]("id",O.PrimaryKey)
 13     def flds = column[String]("aflds")
 14     def status = column[Int]("status")
 15     def * = (id,flds,status)
 16   }
 17   val tableA = TableQuery[ATable]
 18 
 19   class BTable(tag: Tag) extends Table[(Int,String,Int)](tag,"TB")  {
 20     def id = column[Int]("id",O.PrimaryKey)
 21     def flds = column[String]("bflds")
 22     def status = column[Int]("status")
 23     def * = (id,flds,status)
 24   }
 25   val tableB = TableQuery[BTable]
 26 
 27   val insertAAction =
 28     tableA ++= Seq (
 29         (1,"aaa",0),
 30         (2,"bbb",3),
 31         (3,"ccc",1),
 32         (4,"ddd",0),
 33         (16,"kkk",16)
 34     )
 35    val insertBAction =
 36      tableB ++= Seq (
 37        (1,"aaa",1),
 38        (2,"bbb",2),
 39        (3,"ccc",3),
 40        (4,"ddd",4),
 41        (5,"kkk",5)
 42      )
 43 
 44    val db = Database.forConfig("h2db")
 45 
 46 
 47    def tableExists(tables: Vector[MTable], tblname: String) =
 48     tables.exists {t =>t.name.toString.contains(tblname)}
 49 
 50    def createSchemaIfNotExists(): Future[Unit] = {
 51     db.run(MTable.getTables).flatMap {
 52       case tables if !tableExists(tables,".TA") && !tableExists(tables,".TB") =>
 53         println("Creating schemas for TA and TB...")
 54         db.run((tableA.schema ++ tableB.schema).create)
 55       case tables if !tableExists(tables,".TA") =>
 56         println("Creating schema for TA ...")
 57         db.run(tableA.schema.create)
 58       case tables if !tableExists(tables,".TB") =>
 59         println("Creating schema for TB ...")
 60         db.run(tableB.schema.create)
 61       case _ =>
 62         println("Schema for TA, TB already created.")
 63         Future.successful()
 64     }
 65    }
 66 
 67    def insertInitialData(): Future[Unit] = {
 68     val cleanInsert = DBIO.seq(
 69       tableA.delete, tableB.delete,
 70       insertAAction,
 71       insertBAction)
 72     db.run(cleanInsert).andThen {
 73       case Success(_) => println("Data insert completed.")
 74       case Failure(e) => println(s"Data insert failed [${e.getMessage}]")
 75     }
 76    }
 77 
 78    Await.ready(db.run(sql"DROP TABLE TA; DROP TABLE TB".as[String]),Duration.Inf)
 79 
 80    val initResult = createSchemaIfNotExists().flatMap {_ => insertInitialData()}
 81    Await.ready(initResult,Duration.Inf)
 82 
 83 
 84    val selectAB = for {
 85      a <- tableA
 86      b <- tableB
 87      if (a.id === b.id)
 88    } yield (a.id,b.id,a.status,b.status)
 89 
 90    case class ABRow (id: Int, asts: Int, bsts: Int)
 91    def toABRow(raw: (Int,Int,Int,Int)) = ABRow(raw._1,raw._3,raw._4)
 92 
 93    import com.bayakala.funda.rowtypes.DataRowType.FDADataRow
 94 
 95    val loader = FDADataRow(slick.driver.H2Driver, toABRow _)
 96    loader.getTypedRows(selectAB.result)(db).foreach {dataRow =>
 97      println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}")
 98    }
 99 
100    import com.bayakala.funda.rowtypes.ActionType.FDAAction
101    def updateAStatus(row: ABRow): FDAAction[Int] = {
102      tableA.filter{r => r.id === row.id}
103           .map(_.status)
104           .update(row.bsts)
105    }
106 
107 
108    loader.getTypedRows(selectAB.result)(db).map(updateAStatus(_)).foreach {
109      actionRow =>
110        println(s"${actionRow.toString}")
111    }
112 
113    def execAction(act: FDAAction[Int]) = db.run(act)
114 
115    loader.getTypedRows(selectAB.result)(db)
116        .map(updateAStatus(_))
117        .map(execAction(_))
118 
119   loader.getTypedRows(selectAB.result)(db).foreach {dataRow =>
120     println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}")
121   }
122 
123 }

 

 

 

 

 

 

 

 

 

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 作者:虛靜 鏈接:https://zhuanlan.zhihu.com/p/24656161 來源:知乎 著作權歸作者所有。商業轉載請聯繫作者獲得授權,非商業轉載請註明出處。 先說明幾件事: 題目的意思是,用於獲取“QQ空間動態”的爬蟲,而不是”針對QQ空間“的”動態爬蟲“ 這裡的QQ空間動態,特指 ...
  • 如果SSH框架下,前段頁面通過from表單提交數據之後,在後臺對象顯示空值,也就是接收不到值得情況下。首先保證前段輸入框有值,這個可以在提交的時候用jQuery的id或者name選擇器alert彈出測試下。如果前段彈出顯示有值的情況下。可以去後臺action中看看接受的對象有沒有給get跟set方法 ...
  • 轉載:http://blog.csdn.net/luoweifu/article/details/10721543 我進行了一些加工,不是本人原創但比原博主要更完善~ 淺談Java異常 以前雖然知道一些異常的處理,也用過一些,但是對throw和throws區別還是有不太清楚。今天用實例測試一下 異常 ...
  • 一、標準IO的效率 對比以下四個程式的用戶CPU、系統CPU與時鐘時間對比 程式1:系統IO 程式2:標準IO getc版本 程式3:標準IO fgets版本 結果: 【註:該表截取自APUE,上表中"表3-1中的最佳時間即《程式1》","表3-1中的單位元組時間指的是《程式1》中BUFSIZE為1時 ...
  • 題目大意: B進位數,每個數字i(i=0,1,...,B-1)有a[i]個。你要用這些數字組成一個最大的B進位數X(不能有前導零,不需要 用完所有數字),使得X是B-1的倍數。q次詢問,每次詢問X在B進位下的第k位數字是什麼(最低位是第0位)。 思路:由於如下定理: a*Bk≡a( mod (B-1 ...
  • 當子類繼承父類的時候,若父類沒有定義帶參的構造方法,則子類可以繼承父類的預設構造方法 當父類中定義了帶參的構造方法,子類必須顯式的調用父類的構造方法 若此時,子類還想調用父類的預設構造方法,必須在父類中明確聲明預設的構造方法 ...
  • set 的 remove() 和 discard() 方法介紹。 函數/方法名 for example: 以下運行代碼會報錯,原因在於 第9行 remove()中的‘l’在集合中不存在 而 discard()方法不會報錯。 正確的代碼如下: 運行結果如下: ...
  • Peter wants to generate some prime numbers for his cryptosystem. Help him! Your task is to generate all prime numbers between two given numbers! Input ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...