Akka(27): Stream:Use case-Connecting Slick-dbStream & Scalaz-stream-fs2

来源:http://www.cnblogs.com/tiger-xc/archive/2017/09/27/7599908.html
-Advertisement-
Play Games

在以前的博文中我們介紹了Slick,它是一種FRM(Functional Relation Mapper)。有別於ORM,FRM的特點是函數式的語法可以支持靈活的對象組合(Query Composition)實現大規模的代碼重覆利用,但同時這些特點又影響了編程人員群體對FRM的接受程度,阻礙了FRM ...


   在以前的博文中我們介紹了Slick,它是一種FRM(Functional Relation Mapper)。有別於ORM,FRM的特點是函數式的語法可以支持靈活的對象組合(Query Composition)實現大規模的代碼重覆利用,但同時這些特點又影響了編程人員群體對FRM的接受程度,阻礙了FRM成為廣為流行的一種資料庫編程方式。所以我們只能從小眾心態來探討如何改善Slick現狀,希望通過與某些Stream庫集成,在Slick FRM的基礎上恢復一些人們熟悉的Recordset資料庫游標(cursor)操作方式,希望如此可以降低FRM資料庫編程對函數式編程水平要求,能夠吸引更多的編程人員接受FRM。剛好,在這篇討論里我們希望能介紹一些Akka-Stream和外部系統集成對接的實際用例,把Slick資料庫數據載入連接到Akka-Stream形成streaming-dataset應該是一個挺好的想法。Slick和Akka-Stream可以說是自然匹配的一對,它們都是同一個公司產品,都支持Reactive-Specification。Reactive系統的集成對象之間是通過公共界面Publisher來實現對接的。Slick提供了個Dababase.stream函數可以構建這個Publisher:

 /** Create a `Publisher` for Reactive Streams which, when subscribed to, will run the specified
      * `DBIOAction` and return the result directly as a stream without buffering everything first.
      * This method is only supported for streaming actions.
      *
      * The Publisher itself is just a stub that holds a reference to the action and this Database.
      * The action does not actually start to run until the call to `onSubscribe` returns, after
      * which the Subscriber is responsible for reading the full response or cancelling the
      * Subscription. The created Publisher can be reused to serve a multiple Subscribers,
      * each time triggering a new execution of the action.
      *
      * For the purpose of combinators such as `cleanup` which can run after a stream has been
      * produced, cancellation of a stream by the Subscriber is not considered an error. For
      * example, there is no way for the Subscriber to cause a rollback when streaming the
      * results of `someQuery.result.transactionally`.
      *
      * When using a JDBC back-end, all `onNext` calls are done synchronously and the ResultSet row
      * is not advanced before `onNext` returns. This allows the Subscriber to access LOB pointers
      * from within `onNext`. If streaming is interrupted due to back-pressure signaling, the next
      * row will be prefetched (in order to buffer the next result page from the server when a page
      * boundary has been reached). */
    final def stream[T](a: DBIOAction[_, Streaming[T], Nothing]): DatabasePublisher[T] = streamInternal(a, false)

這個DatabasePublisher[T]就是一個Publisher[T]:

/** A Reactive Streams `Publisher` for database Actions. */
abstract class DatabasePublisher[T] extends Publisher[T] { self =>
...
}

然後Akka-Stream可以通過Source.fromPublisher(publisher)構建Akka Source構件:

  /**
   * Helper to create [[Source]] from `Publisher`.
   *
   * Construct a transformation starting with given publisher. The transformation steps
   * are executed by a series of [[org.reactivestreams.Processor]] instances
   * that mediate the flow of elements downstream and the propagation of
   * back-pressure upstream.
   */
  def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] =
    fromGraph(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource")))

理論上Source.fromPublisher(db.stream(query))就可以構建一個Reactive-Stream-Source了。下麵我們就建了例子來做示範:首先是Slick的鋪墊代碼boiler-code:

  val aqmraw = Models.AQMRawQuery
  val db = Database.forConfig("h2db")
  // aqmQuery.result returns Seq[(String,String,String,String)]
  val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)}
  // type alias
  type RowType = (String,String,String,String)
  // user designed strong typed resultset type. must extend FDAROW
  case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW
  // strong typed resultset conversion function. declared implicit to remind during compilation
  implicit def toTypedRow(row: RowType): TypedRow =
    TypedRow(row._1,row._2,row._3,row._4)

我們需要的其實就是aqmQuery,用它來構建DatabasePublisher:

  // construct DatabasePublisher from db.stream
  val dbPublisher: DatabasePublisher[RowType] = db.stream[RowType](aqmQuery.result)
  // construct akka source
  val source: Source[RowType,NotUsed] = Source.fromPublisher[RowType](dbPublisher)

有了dbPublisher就可以用Source.fromPublisher函數構建source了。現在我們試著運算這個Akka-Stream:

  implicit val actorSys = ActorSystem("actor-system")
  implicit val ec = actorSys.dispatcher
  implicit val mat = ActorMaterializer()

  source.take(6).map{row => toTypedRow(row)}.runWith(
    Sink.foreach(qmr => {
      println(s"州名: ${qmr.state}")
      println(s"縣名:${qmr.county}")
      println(s"年份:${qmr.year}")
      println(s"取值:${qmr.value}")
      println("-------------")
    }))

  scala.io.StdIn.readLine()
  actorSys.terminate()

下麵是運算結果:

州名: Alabama
縣名:Elmore
年份:1999
取值:5
-------------
州名: Alabama
縣名:Jefferson
年份:1999
取值:39
-------------
州名: Alabama
縣名:Lawrence
年份:1999
取值:28
-------------
州名: Alabama
縣名:Madison
年份:1999
取值:31
-------------
州名: Alabama
縣名:Mobile
年份:1999
取值:32
-------------
州名: Alabama
縣名:Montgomery
年份:1999
取值:15
-------------

顯示我們已經成功的連接了Slick和Akka-Stream。

現在我們有了Reactive stream source,它是個akka-stream,該如何對接處於下游的scalaz-stream-fs2呢?我們知道:akka-stream是Reactive stream,而scalaz-stream-fs2是純“拖式”pull-model stream,也就是說上面這個Reactive stream source必須被動等待下游的scalaz-stream-fs2來讀取數據。按照Reactive-Stream規範,下游必須通過backpressure信號來知會上游是否可以發送數據狀態,也就是說我們需要scalaz-stream-fs2來產生backpressure。scalaz-stream-fs2 async包里有個Queue結構:

/**
 * Asynchronous queue interface. Operations are all nonblocking in their
 * implementations, but may be 'semantically' blocking. For instance,
 * a queue may have a bound on its size, in which case enqueuing may
 * block until there is an offsetting dequeue.
 */
trait Queue[F[_], A] { self =>
  /**
   * Enqueues one element in this `Queue`.
   * If the queue is `full` this waits until queue is empty.
   *
   * This completes after `a`  has been successfully enqueued to this `Queue`
   */
  def enqueue1(a: A): F[Unit]

  /**
   * Enqueues each element of the input stream to this `Queue` by
   * calling `enqueue1` on each element.
   */
  def enqueue: Sink[F, A] = _.evalMap(enqueue1)
  /** Dequeues one `A` from this queue. Completes once one is ready. */
  def dequeue1: F[A]
  /** Repeatedly calls `dequeue1` forever. */
  def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat
...
}

這個結構支持多線程操作,也就是說enqueue和dequeue可以在不同的線程里操作。值得關註的是:enqueue會block,只有在完成了dequeue後才能繼續。這個dequeue就變成了抵消backpressure的有效方法了。具體操作方法是:上游在一個線程里用enqueue發送一個數據元素,然後等待下游完成在另一個線程里的dequeue操作,完成這個迴圈後再進行下一個元素的enqueue。enqueue代表akka-stream向scalaz-stream-fs2發送數據,可以用akka-stream的Sink構件來實現:

 class FS2Gate[T](q: fs2.async.mutable.Queue[Task,Option[T]]) extends GraphStage[SinkShape[T]] {
  val in = Inlet[T]("inport")
  val shape = SinkShape.of(in)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler {
      override def preStart(): Unit = {
        pull(in)          //initiate stream elements movement
        super.preStart()
      }

      override def onPush(): Unit = {
        q.enqueue1(Some(grab(in))).unsafeRun()
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        q.enqueue1(None).unsafeRun()
        println("the end of stream !")
        completeStage()
      }

      override def onUpstreamFailure(ex: Throwable): Unit = {
        q.enqueue1(None).unsafeRun()
        completeStage()
      }

      setHandler(in,this)

    }
}

以上這個akka-stream GraphStage描述了對上游每一個元素的enqueue動作。我們可以用scalaz-stream-fs2的flatMap來序列化運算兩個線程里的enqueue和dequeue: 

   val fs2Stream: Stream[Task,RowType] = Stream.eval(async.boundedQueue[Task,Option[RowType]](16))
     .flatMap { q =>
       Task(source.to(new FS2Gate[RowType](q)).run).unsafeRunAsyncFuture  //enqueue Task(new thread)
       pipe.unNoneTerminate(q.dequeue)      //dequeue in current thread
     }

這個函數返回fs2.Stream[Task,RowType],是一種運算方案,我們必須run來實際運算:

  fs2Stream.map{row => toTypedRow(row)}
      .map(qmr => {
      println(s"州名: ${qmr.state}")
      println(s"縣名:${qmr.county}")
      println(s"年份:${qmr.year}")
      println(s"取值:${qmr.value}")
      println("-------------")
    }).run.unsafeRun

通過測試運行,我們成功的為scalaz-stream-fs2實現了data streaming。

下麵是本次示範的源代碼:

 

import slick.jdbc.H2Profile.api._
import com.bayakala.funda._
import api._

import scala.language.implicitConversions
import scala.concurrent.duration._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import slick.basic.DatabasePublisher
import akka._
import fs2._
import akka.stream.stage.{GraphStage, GraphStageLogic}


 class FS2Gate[T](q: fs2.async.mutable.Queue[Task,Option[T]]) extends GraphStage[SinkShape[T]] {
  val in = Inlet[T]("inport")
  val shape = SinkShape.of(in)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler {
      override def preStart(): Unit = {
        pull(in)          //initiate stream elements movement
        super.preStart()
      }

      override def onPush(): Unit = {
        q.enqueue1(Some(grab(in))).unsafeRun()
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        q.enqueue1(None).unsafeRun()
        println("end of stream !!!!!!!")
        completeStage()
      }

      override def onUpstreamFailure(ex: Throwable): Unit = {
        q.enqueue1(None).unsafeRun()
        completeStage()
      }

      setHandler(in,this)

    }
}

object AkkaStreamSource extends App {

  val aqmraw = Models.AQMRawQuery
  val db = Database.forConfig("h2db")
  // aqmQuery.result returns Seq[(String,String,String,String)]
  val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)}
  // type alias
  type RowType = (String,String,String,String)
  // user designed strong typed resultset type. must extend FDAROW
  case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW
  // strong typed resultset conversion function. declared implicit to remind during compilation
  implicit def toTypedRow(row: RowType): TypedRow =
    TypedRow(row._1,row._2,row._3,row._4)
  // construct DatabasePublisher from db.stream
  val dbPublisher: DatabasePublisher[RowType] = db.stream[RowType](aqmQuery.result)
  // construct akka source
  val source: Source[RowType,NotUsed] = Source.fromPublisher[RowType](dbPublisher)

  implicit val actorSys = ActorSystem("actor-system")
  implicit val ec = actorSys.dispatcher
  implicit val mat = ActorMaterializer()

  /*
  source.take(10).map{row => toTypedRow(row)}.runWith(
    Sink.foreach(qmr => {
      println(s"州名: ${qmr.state}")
      println(s"縣名:${qmr.county}")
      println(s"年份:${qmr.year}")
      println(s"取值:${qmr.value}")
      println("-------------")
    })) */

   val fs2Stream: Stream[Task,RowType] = Stream.eval(async.boundedQueue[Task,Option[RowType]](16))
     .flatMap { q =>
       Task(source.to(new FS2Gate[RowType](q)).run).unsafeRunAsyncFuture  //enqueue Task(new thread)
       pipe.unNoneTerminate(q.dequeue)      //dequeue in current thread
     }

  fs2Stream.map{row => toTypedRow(row)}
      .map(qmr => {
      println(s"州名: ${qmr.state}")
      println(s"縣名:${qmr.county}")
      println(s"年份:${qmr.year}")
      println(s"取值:${qmr.value}")
      println("-------------")
    }).run.unsafeRun

  scala.io.StdIn.readLine()
  actorSys.terminate()

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • namespace ArrayListd的長度問題{ class Program { static void Main(string[] args) { //需要的參數是object類型 //alt+shift+F10添加引用using System.Collections; ArrayList l ...
  • 1.複習里氏轉換:1)、子類可以賦值給父類(如果有一個方法需要一個父類作為參數,我們可以傳第一個子類對象)2)、如果父類中裝的是子類對象,則可以將這個父類強轉為子類對象 is和as判斷轉換成功失敗 1 Person p = new Student(); 2 //if(p is Student) 3 ...
  • 基於上篇文章 "《HiBlogs》重寫筆記[1] 從DbContext到依賴註入再到自動註入" 園友 @Flaming丶淡藍@ 吳瑞祥 提出了討論和質疑,嚇得我連夜查詢資料(玩笑~)。 本來重點是想分析“自動註入”和對“註入”有更深的理解。不過既然有疑問和討論那也是很好的。總比時不時來篇“這個不行” ...
  • 要求:1.列印市、區、街道三級菜單 2.按b可隨時返回上一級 3.按q可隨時退出程式 1 dict={'北京':{'海澱區':['中關村','北太平莊','西三旗'], '昌平區':['回龍觀','霍營','沙河'],'朝陽區':['酒仙橋','望京','將台']}, 2 '上海':{'浦東新區': ...
  • test ...
  • 1.當我們使用IE內核的瀏覽器下在PHPExcel報表時(谷歌、火狐瀏覽器正常, IE瀏覽器,360瀏覽器的相容模式報錯),會出現如下錯誤: 2.解決辦法: 在下載文件時,對當前的瀏覽器進行判斷, 如果是IE內核的瀏覽器的話,進行文件名的轉碼, 若不是IE內核的瀏覽器,則不用。 關鍵代碼如下: EN ...
  • 我是一名c#老鳥,雖然編程多年,但只會使用c#通過Visual Studio工具開發Windows環境下的桌面應用和網站。這是我自學.net core的經歷,如果你也和我一樣,也是剛剛接觸.net core,並對此有新區,或許能對你有所幫助。眾所周知,.net也是跨平臺的,但是,都是Windows平 ...
  • 前一段時間做過一個 "郵件發送的服務" ,以前大體都測試過,文本、圖片、附件都是沒有問題的,可有同事反應發送的附件名稱有中文亂碼,類似如下截圖展示: 咋一看不像亂碼,抱著試試看的態度,為MimeMessageHelper硬性加了編碼: 並且對文件名稱加了轉碼: 但是,如果你跟進源碼會發現spring ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...