FunDA(4)- 數據流內容控制:Stream data element control

来源:http://www.cnblogs.com/tiger-xc/archive/2017/01/19/6305731.html
-Advertisement-
Play Games

上節我們探討了通過scalaz-stream-fs2來驅動一套數據處理流程,用fs2的Pipe類型來實現對數據流的逐行操作。本篇討論準備在上節討論的基礎上對數據流的流動和元素操作進行優化完善。如數據流動中增加諸如next、skip、eof功能、內容控制中增加對行元素的append、insert、up ...


    上節我們探討了通過scalaz-stream-fs2來驅動一套數據處理流程,用fs2的Pipe類型來實現對數據流的逐行操作。本篇討論準備在上節討論的基礎上對數據流的流動和元素操作進行優化完善。如數據流動中增加諸如next、skip、eof功能、內容控制中增加對行元素的append、insert、update、remove等操作方法。但是經過一番對fs2的再次解讀,發現這些操作模式並不像我所想象那樣的方式,實際上用fs2來實現數據行控制可能會更加簡單和直接。這是因為與傳統資料庫行瀏覽方式不同的是fs2是一種拖式流(pull-model stream),它的數據行集合是一種泛函不可變集合。每一行一旦讀取就等於直接消耗了斷(consumed),所以只支持一種向前逐行讀取模式。如果形象地描述的話,我們習慣的所謂數據集瀏覽可能是下麵這樣的場景:

讀取一行數據 >>> (使用或更新行欄位值)>>> 向下游發送新的一行數據。只有停止發送動作才代表終止運算。完成對上游的所有行數據讀取並不代表終止操作,因為我們還可以不斷向下游發送自定義產生的數據行。

我們用fs2模擬一套數據流管道FDAPipeLine,管道中間有不定數量的作業節點FDAWorkNode。作業方式包括從管道上游截取一個數據元素、對其進行處理、然後選擇是否向下游的管道介面(FDAPipeJoint)發送。下麵是這套模擬的類型:fdapipes/package.scala

 1 package com.bayakala.funda {
 2 
 3   import fs2._
 4 
 5   package object fdapipes {
 6     //數據行類型
 7     trait FDAROW
 8 
 9     //數據處理管道
10     type FDAPipeLine[ROW] = Stream[Task, ROW]
11     //數據作業節點
12     type FDAWorkNode[ROW] = Pipe[Task, ROW, ROW]
13     //數據管道開關閥門,從此處獲得管道內數據
14     type FDAValve[ROW] = Handle[Task, ROW]
15     //管道連接器
16     type FDAPipeJoint[ROW] = Pull[Task, ROW, Unit]
17 
18     //作業類型
19     type FDATask[ROW] = ROW => Option[List[ROW]]
20 
21   }
22 
23 }

註意這個FDAROW類型:這是一種泛類型,因為在管道中流動的數據可能有多重類型,如數據行和QueryAction行。

流動控制方法:FDAValves.scala

 1 package com.bayakala.funda.fdapipes
 2 import fs2._
 3 object FDAValves {  //流動控制方法
 4 //跳過本行(不向下游發送)
 5   def fda_skip[ROW] = Some(List[ROW]())
 6 //將本行發送至下游連接管道
 7   def fda_next[ROW](r: ROW) = Some(List[ROW](r))
 8 //終止流動
 9   def fda_break = None
10 
11 }

數據發送方法:FDAPipes.scala

1 package com.bayakala.funda.fdapipes
2 import fs2._
3 object FDAJoints {  //數據發送方法
4 //write rows down the pipeline
5   def fda_pushRow[ROW](row: ROW) = Pull.output1(row)
6   def fda_pushRows[ROW](rows: List[ROW]) = Pull.output(Chunk.seq(rows))
7 }

作業節點工作方法:

 1 package com.bayakala.funda.fdapipes
 2 import FDAJoints._
 3 object FDANodes { //作業節點工作方法
 4  def fda_execUserTask[ROW](task: FDATask[ROW]): FDAWorkNode[ROW] = {
 5    def go: FDAValve[ROW] => FDAPipeJoint[ROW] = h => {
 6      h.receive1Option {
 7        case Some((r, h)) => task(r) match {
 8          case Some(xr) => xr match {
 9            case Nil => go(h)
10            case _ => fda_pushRows(xr) >> go(h)
11          }
12          case None => fda_halt
13        }
14        case None => fda_halt
15      }
16    }
17    in => in.pull(go)
18  }
19 
20 }

下麵我們就示範這個工具庫的具體使用方法:examples/Example1.scala
設置示範環境:

 1 package com.bayakala.funda.fdapipes.examples
 2 import fs2._
 3 import com.bayakala.funda.fdapipes._
 4 import FDANodes._
 5 import FDAValves._
 6 import Helpers._
 7 object Example1 extends App {
 8 
 9 
10   case class Employee(id: Int, name: String, age: Int, salary: BigDecimal) extends FDAROW
11 // test data set
12   val r1 = Employee(1, "John", 23, 100.00)
13   val r2 = Employee(2, "Peter", 25,100.00)
14   val r3 = Employee(3, "Kay", 35,100.00)
15   val r4 = Employee(4, "Cain", 45,100.00)
16   val r5 = Employee(5, "Catty", 35,100.00)
17   val r6 = Employee(6, "Little", 19,80.00)

註意Employee是一種行類型,因為它extends FDAROW。

我們再寫一個跟蹤顯示當前流動數據行的函數:examples/Helpers.scala

1 package com.bayakala.funda.fdapipes.examples
2 import com.bayakala.funda.fdapipes._
3 import fs2.Task
4 object Helpers {
5   def log[ROW](prompt: String): FDAWorkNode[ROW] =
6     _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}
7 }

下麵我們就用幾個有不同要求的例子來示範流動控制和數據處理功能,這些例子就是給最終用戶的標準編程示範版本,然後由用戶照版編寫:

1、根據每條數據狀態逐行進行處理:

 1 // 20 - 30歲加10%, 30歲> 加20%,其它加 5%
 2   def raisePay: FDATask[FDAROW] = row => {
 3     row match {
 4       case emp: Employee => {
 5         val cur = emp.age match {
 6           case a if ((a >= 20) && (a < 30)) => emp.copy(salary = emp.salary * 1.10)
 7           case a if ((a >= 30)) => emp.copy(salary = emp.salary * 1.20)
 8           case _ => emp.copy(salary = emp.salary * 1.05)
 9         }
10         fda_next(cur)
11       }
12       case _ => fda_skip
13     }
14   }

用戶提供的功能函數類型必須是FDATask[FDAROW]。類型參數FDAROW代表數據行通用類型。如果用戶指定了FDATask[Employee]函數類型,那麼必須保證管道中流動的數據行只有Employee一種類型。完成對當前行數據的處理後用fda_next(emp)把它發送到下一節連接管道。我們用下麵的組合函數來進行運算:

  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("加薪前>"))
      .through(fda_execUserTask[FDAROW](raisePay))
      .through(log("加薪後>"))
    .run.unsafeRun
-----
運算結果:
加薪前>> Employee(1,John,23,100.0)
加薪後>> Employee(1,John,23,110.00)
加薪前>> Employee(2,Peter,25,100.0)
加薪後>> Employee(2,Peter,25,110.00)
加薪前>> Employee(3,Kay,35,100.0)
加薪後>> Employee(3,Kay,35,120.00)
加薪前>> Employee(4,Cain,45,100.0)
加薪後>> Employee(4,Cain,45,120.00)
加薪前>> Employee(5,Catty,35,100.0)
加薪後>> Employee(5,Catty,35,120.00)
加薪前>> Employee(6,Little,19,80.0)
加薪後>> Employee(6,Little,19,84.000)

2、在一組數據行內根據每條數據狀態進行篩選:

  // 篩選40歲以上員工
  def filter40: FDATask[FDAROW] = row => {
    row match {
      case emp: Employee => {
        if (emp.age > 40)
          Some(List(emp))
        else fda_skip[Employee]
      }
      case _ => fda_break
    }
  }
  println("---------")
  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("年齡>"))
    .through(fda_execUserTask[FDAROW](filter40))
    .through(log("合格>"))
    .run.unsafeRun
---
運算結果:
年齡>> Employee(1,John,23,100.0)
年齡>> Employee(2,Peter,25,100.0)
年齡>> Employee(3,Kay,35,100.0)
年齡>> Employee(4,Cain,45,100.0)
合格>> Employee(4,Cain,45,100.0)
年齡>> Employee(5,Catty,35,100.0)
年齡>> Employee(6,Little,19,80.0)
-

3、根據當前數據行狀態終止作業:

 1   // 瀏覽至第一個30歲以上員工,跳出
 2   def stopOn30: FDATask[Employee] = emp => {
 3         if (emp.age > 30)
 4           fda_break
 5         else
 6           Some(List(emp))
 7   }
 8   println("---------")
 9   Stream(r1,r2,r3,r4,r5,r6)
10     .through(log("當前員工>"))
11     .through(fda_execUserTask[Employee](stopOn30))
12     .through(log("選入名單>"))
13     .run.unsafeRun
14 ---
15 運算結果:
16 當前員工>> Employee(1,John,23,100.0)
17 選入名單>> Employee(1,John,23,100.0)
18 當前員工>> Employee(2,Peter,25,100.0)
19 選入名單>> Employee(2,Peter,25,100.0)
20 當前員工>> Employee(3,Kay,35,100.0)

在這個例子里用戶指定了行類型統一為Employee。

我們還可以把多個功能串接起來。像下麵這樣把1和2兩個功能連起來:

  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("加薪前>"))
    .through(fda_execUserTask[FDAROW](raisePay))
    .through(log("加薪後>"))
    .through(log("年齡>"))
    .through(fda_execUserTask[FDAROW](filter40))
    .through(log("合格>"))
    .run.unsafeRun
---
運算結果:
加薪前>> Employee(1,John,23,100.0)
加薪後>> Employee(1,John,23,110.00)
年齡>> Employee(1,John,23,110.00)
加薪前>> Employee(2,Peter,25,100.0)
加薪後>> Employee(2,Peter,25,110.00)
年齡>> Employee(2,Peter,25,110.00)
加薪前>> Employee(3,Kay,35,100.0)
加薪後>> Employee(3,Kay,35,120.00)
年齡>> Employee(3,Kay,35,120.00)
加薪前>> Employee(4,Cain,45,100.0)
加薪後>> Employee(4,Cain,45,120.00)
年齡>> Employee(4,Cain,45,120.00)
合格>> Employee(4,Cain,45,120.00)
加薪前>> Employee(5,Catty,35,100.0)
加薪後>> Employee(5,Catty,35,120.00)
年齡>> Employee(5,Catty,35,120.00)
加薪前>> Employee(6,Little,19,80.0)
加薪後>> Employee(6,Little,19,84.000)
年齡>> Employee(6,Little,19,84.000)

下麵我把完整的示範代碼提供給大家:

package com.bayakala.funda.fdapipes.examples
import fs2._
import com.bayakala.funda.fdapipes._
import FDANodes._
import FDAValves._
import Helpers._
object Example1 extends App {


  case class Employee(id: Int, name: String, age: Int, salary: BigDecimal) extends FDAROW
// test data set
  val r1 = Employee(1, "John", 23, 100.00)
  val r2 = Employee(2, "Peter", 25,100.00)
  val r3 = Employee(3, "Kay", 35,100.00)
  val r4 = Employee(4, "Cain", 45,100.00)
  val r5 = Employee(5, "Catty", 35,100.00)
  val r6 = Employee(6, "Little", 19,80.00)



// 20 - 30歲加10%, 30歲> 加20%,其它加 5%
  def raisePay: FDATask[FDAROW] = row => {
    row match {
      case emp: Employee => {
        val cur = emp.age match {
          case a if ((a >= 20) && (a < 30)) => emp.copy(salary = emp.salary * 1.10)
          case a if ((a >= 30)) => emp.copy(salary = emp.salary * 1.20)
          case _ => emp.copy(salary = emp.salary * 1.05)
        }
        fda_next(cur)
      }
      case _ => fda_skip
    }
  }

  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("加薪前>"))
      .through(fda_execUserTask[FDAROW](raisePay))
      .through(log("加薪後>"))
    .run.unsafeRun


  // 篩選40歲以上員工
  def filter40: FDATask[FDAROW] = row => {
    row match {
      case emp: Employee => {
        if (emp.age > 40)
          Some(List(emp))
        else fda_skip[Employee]
      }
      case _ => fda_break
    }
  }
  println("---------")
  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("年齡>"))
    .through(fda_execUserTask[FDAROW](filter40))
    .through(log("合格>"))
    .run.unsafeRun

  // 瀏覽至第一個30歲以上員工,跳出
  def stopOn30: FDATask[Employee] = emp => {
        if (emp.age > 30)
          fda_break
        else
          Some(List(emp))
  }
  println("---------")
  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("當前員工>"))
    .through(fda_execUserTask[Employee](stopOn30))
    .through(log("選入名單>"))
    .run.unsafeRun


  println("---------")
  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("加薪前>"))
    .through(fda_execUserTask[FDAROW](raisePay))
    .through(log("加薪後>"))
    .through(log("年齡>"))
    .through(fda_execUserTask[FDAROW](filter40))
    .through(log("合格>"))
    .run.unsafeRun

}

 

 

 

 

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 作為一名使用C#語言開發人員,就很難逃脫與SQLSERVER打交道,雖說我們是開發人員,但我想說的是,對資料庫的操作還是應該時不時的拿出來溫習一番。下麵那就是我見過的一道有趣的SQL題目,與你們一起分享! 題目:在我的TeamPK資料庫中有一個叫department的表,裡面只有一個欄位name,一 ...
  • 問題描述:為了在C#中執行js腳本,在一個目標平臺編譯為Any Cpu的.NET程式集中引用了MSScriptControl組件,在winform程式中,調用這個程式集中的執行js的方法,沒有任何問題。但是在windows服務中調用卻報錯,報錯信息如下: 在嘗試過很多方法之後仍然沒有解決,包括將MS ...
  • 當獲取一個類型(class)的所有屬性時,想排除指定屬性,該如何操作? 比如:EF中一個實體類型UserEntity,通過反射獲取這個類的屬性時,想排除這個為映射的欄位ID 使用以下方法即可! 參考:http://stackoverflow.com/questions/2051834/exclude ...
  • 根據此鏈接博文學習配置: http://www.cnblogs.com/zyw-205520/p/4767633.html 1.JDK的安裝 自行百度,(最好是jdk1.7版本的) 測試如下圖,即完成jdk的安裝 2.MyEclipse安裝 自行下載安裝即可,(我使用的是2013版的) 3.Tomc ...
  • 1、隱藏tomcat版本: ①執行命令:cd /usr/local/tomcat_web/lib/ && ll 我們需要對catalina.jar進行解壓(最好提前先備份一下) ②執行命令:unzip catalina.jar 這時候會多出META-INF和org兩個目錄,找到org/apache/ ...
  • 摘要 在寫這篇文章之前,自己思索了很多,也是因為一些事情觸發了自己,使得自己想寫這麼一篇文章。也算是對2016年自己的一個總結吧。 正文 先說說我自己吧!本人16年於一所二本學校畢業,考過研,夢想著上北航,結果卻因政治考的太差沒能上,說起來很慚愧。英雄都不提往事,何況我這一個失敗的狗熊呢!不,是失敗 ...
  • 第4章 類型和聲明 4.2 布爾量 按照定義,true具有值1,而false具有值0.整數可以隱式地轉換到bool值。指針也可以隱式地轉換到bool,非零指針轉為true,具有零值的指針轉為false。 4.3 字元類型 一個char類型幾乎都包含8個二進位位,這裡還提供了另一個類型wchar_t用 ...
  • 需要 prettytime-3.2.3.Final.jar 包 代碼例子 輸出結果 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...