Akka(19): Stream:組合數據流,組合共用-Graph modular composition

来源:http://www.cnblogs.com/tiger-xc/archive/2017/08/24/7421514.html
-Advertisement-
Play Games

akka-stream的Graph是一種運算方案,它可能代表某種簡單的線性數據流圖如:Source/Flow/Sink,也可能是由更基礎的流圖組合而成相對複雜點的某種複合流圖,而這個複合流圖本身又可以被當作組件來組合更大的Graph。因為Graph只是對數據流運算的描述,所以它是可以被重覆利用的。所 ...


   akka-stream的Graph是一種運算方案,它可能代表某種簡單的線性數據流圖如:Source/Flow/Sink,也可能是由更基礎的流圖組合而成相對複雜點的某種複合流圖,而這個複合流圖本身又可以被當作組件來組合更大的Graph。因為Graph只是對數據流運算的描述,所以它是可以被重覆利用的。所以我們應該儘量地按照業務流程需要來設計構建Graph。在更高的功能層面上實現Graph的模塊化(modular)。按上回討論,Graph又可以被描述成一種黑盒子,它的入口和出口就是Shape,而內部的作用即處理步驟Stage則是用GraphStage來形容的。下麵是akka-stream預設的一些基礎數據流圖:

compose_shapes.png

上面Source,Sink,Flow代表具備線性步驟linear-stage的流圖,屬於最基礎的組件,可以用來構建數據處理鏈條。而Fan-In合併型,Fan-Out擴散型則具備多個輸入或輸出埠,可以用來構建更複雜的數據流圖。我們可以用以上這些基礎Graph來構建更複雜的複合流圖,而這些複合流圖又可以被重覆利用去構建更複雜的複合流圖。下麵就是一些常見的複合流圖:

compose_composites.png

註意上面的Composite Flow(from Sink and Source)可以用Flow.fromSinkAndSource函數構建:

def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] =
    fromSinkAndSourceMat(sink, source)(Keep.none)

這個Flow從流向來說先Sink再Source是反的,形成的Flow上下游間無法協調,即Source端終結信號無法到達Sink端,因為這兩端是相互獨立的。我們必須用CoupledTermination對象中的fromSinkAndSource函數構建的Flow來解決這個問題:

/**
 * Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow them them.
 * Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped stages.
 */
object CoupledTerminationFlow {
  @deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ...)(Keep.both)` instead", "2.5.2")
  def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] =
    Flow.fromSinkAndSourceCoupledMat(in, out)(Keep.both)
 

從上面圖列里的Composite BidiFlow可以看出:一個複合Graph的內部可以是很複雜的,但從外面看到的只是簡單的幾個輸入輸出埠。不過Graph內部構件之間的埠必須按照功能邏輯進行正確的連接,剩下的就變成直接向外公開的界面埠了。這種機制支持了層級式的模塊化組合方式,如下麵的圖示:

compose_nested_flow.png

最後變成:

compose_nested_flow_opaque.png

在DSL里我們可以用name("???")來分割模塊:

val nestedFlow =
  Flow[Int].filter(_ != 0) // an atomic processing stage
    .map(_ - 2) // another atomic processing stage
    .named("nestedFlow") // wraps up the Flow, and gives it a name

val nestedSink =
  nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
    .named("nestedSink") // wrap it up

// Create a RunnableGraph
val runnableGraph = nestedSource.to(nestedSink)

在下麵這個示範里我們自定義一個某種功能的流圖模塊:它有2個輸入和3個輸出。然後我們再使用這個自定義流圖模塊組建一個完整的閉合流圖:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._

import scala.collection.immutable

object GraphModules {
  def someProcess[I, O]: I => O = i => i.asInstanceOf[O]

  case class TwoThreeShape[I, I2, O, O2, O3](
                                              in1: Inlet[I],
                                              in2: Inlet[I2],
                                              out1: Outlet[O],
                                              out2: Outlet[O2],
                                              out3: Outlet[O3]) extends Shape {

    override def inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil

    override def outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: out3 :: Nil

    override def deepCopy(): Shape = TwoThreeShape(
      in1.carbonCopy(),
      in2.carbonCopy(),
      out1.carbonCopy(),
      out2.carbonCopy(),
      out3.carbonCopy()
    )
  }
//a functional module with 2 input 3 output
  def TwoThreeGraph[I, I2, O, O2, O3] = GraphDSL.create() { implicit builder =>
    val balancer = builder.add(Balance[I](2))
    val flow = builder.add(Flow[I2].map(someProcess[I2, O2]))

    TwoThreeShape(balancer.in, flow.in, balancer.out(0), balancer.out(1), flow.out)
  }

  val closedGraph = GraphDSL.create() {implicit builder =>
    import GraphDSL.Implicits._
    val inp1 = builder.add(Source(List(1,2,3))).out
    val inp2 = builder.add(Source(List(10,20,30))).out
    val merge = builder.add(Merge[Int](2))
    val mod23 = builder.add(TwoThreeGraph[Int,Int,Int,Int,Int])

     inp1 ~> mod23.in1
     inp2 ~> mod23.in2
     mod23.out1 ~> merge.in(0)
     mod23.out2 ~> merge.in(1)
     mod23.out3 ~> Sink.foreach(println)
     merge ~> Sink.foreach(println)
     ClosedShape

  }
}

object TailorGraph extends App {
  import GraphModules._

  implicit val sys = ActorSystem("streamSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer()

  RunnableGraph.fromGraph(closedGraph).run()

  scala.io.StdIn.readLine()
  sys.terminate()


}

這個自定義的TwoThreeGraph是一個複合的流圖模塊,是可以重覆使用的。註意這個~>符合的使用:akka-stream只提供了對預設定Shape作為連接對象的支持如:

      def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...}
      def ~>[Out](junction: UniformFanOutShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...}
      def ~>[Out](flow: FlowShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...}
      def ~>(to: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit =
        b.addEdge(importAndGetPort(b), b.add(to).in)

      def ~>(to: SinkShape[T])(implicit b: Builder[_]): Unit =
        b.addEdge(importAndGetPort(b), to.in)
...

所以對於我們自定義的TwoThreeShape就只能使用直接的埠連接了:

   def ~>[U >: T](to: Inlet[U])(implicit b: Builder[_]): Unit =
        b.addEdge(importAndGetPort(b), to)

以上的過程顯示:通過akka的GraphDSL,對複合型Graph的構建可以實現形象化,大部分工作都在如何對組件之間的埠進行連接。我們再來看個較複雜複合流圖的構建過程,下麵是這個流圖的圖示:

compose_graph.png

可以說這是一個相對複雜的數據處理方案,裡面甚至包括了數據流迴路(feedback)。無法想象如果用純函數數據流如scalaz-stream應該怎樣去實現這麼複雜的流程,也可能根本是沒有解決方案的。但用akka GraphDSL可以很形象的組合這個數據流圖;

  import GraphDSL.Implicits._
  RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    val A: Outlet[Int]                  = builder.add(Source.single(0)).out
    val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2))
    val C: UniformFanInShape[Int, Int]  = builder.add(Merge[Int](2))
    val D: FlowShape[Int, Int]          = builder.add(Flow[Int].map(_ + 1))
    val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2))
    val F: UniformFanInShape[Int, Int]  = builder.add(Merge[Int](2))
    val G: Inlet[Any]                   = builder.add(Sink.foreach(println)).in

    C     <~      F
    A  ~>  B  ~>  C     ~>      F
    B  ~>  D  ~>  E  ~>  F
    E  ~>  G

    ClosedShape
  })

另一個埠連接方式的版本如下:

RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  val B = builder.add(Broadcast[Int](2))
  val C = builder.add(Merge[Int](2))
  val E = builder.add(Balance[Int](2))
  val F = builder.add(Merge[Int](2))

  Source.single(0) ~> B.in; B.out(0) ~> C.in(1); C.out ~> F.in(0)
  C.in(0) <~ F.out

  B.out(1).map(_ + 1) ~> E.in; E.out(0) ~> F.in(1)
  E.out(1) ~> Sink.foreach(println)
  ClosedShape
})

如果把上面這個複雜的Graph切分成模塊的話,其中一部分是這樣的:

compose_graph_partial.png

這個開放數據流複合圖可以用GraphDSL這樣構建:
val partial = GraphDSL.create() { implicit builder =>
    val B = builder.add(Broadcast[Int](2))
    val C = builder.add(Merge[Int](2))
    val E = builder.add(Balance[Int](2))
    val F = builder.add(Merge[Int](2))

    C  <~  F
    B  ~>                            C  ~>  F
    B  ~>  Flow[Int].map(_ + 1)  ~>  E  ~>  F
    FlowShape(B.in, E.out(1))
  }.named("partial")
模塊化的完整Graph圖示如下: compose_graph_flow.png 這部分可以用下麵的代碼來實現:
// Convert the partial graph of FlowShape to a Flow to get
// access to the fluid DSL (for example to be able to call .filter())
val flow = Flow.fromGraph(partial)

// Simple way to create a graph backed Source
val source = Source.fromGraph( GraphDSL.create() { implicit builder =>
  val merge = builder.add(Merge[Int](2))
  Source.single(0)      ~> merge
  Source(List(2, 3, 4)) ~> merge

  // Exposing exactly one output port
  SourceShape(merge.out)
})

// Building a Sink with a nested Flow, using the fluid DSL
val sink = {
  val nestedFlow = Flow[Int].map(_ * 2).drop(10).named("nestedFlow")
  nestedFlow.to(Sink.head)
}

// Putting all together
val closed = source.via(flow.filter(_ > 1)).to(sink)
和scalaz-stream不同的還有akka-stream的運算是在actor上進行的,除了大家都能對數據流元素進行處理之外,akka-stream還可以通過actor的內部狀態來維護和返回運算結果。這個運算結果在複合流圖中傳播的過程是可控的,如下圖示: compose_mat.png

返回運算結果是通過viaMat, toMat來實現的。簡寫的via,to預設選擇流圖左邊運算產生的結果。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一:原題 二:原題講解 ...
  • / :為定界符,要匹配的字元一般放在定界符裡面; 2、 常用元字元 1)+:出現一次或多次 2)*:出現零次或多次 3)?:出現零次或一次 3、限定符 1) 字元1字元2{n} 表示字元2連續出現n次的匹配結果 字元1字元2{n,} 表示字元2連續出現n次或更多次的匹配結果 (字元1字元2){n} ...
  • 題目描述 所謂蟲食算,就是原先的算式中有一部分被蟲子啃掉了,需要我們根據剩下的數字來判定被啃掉的字母。來看一個簡單的例子: 43#9865#045 +8468#6633 44445509678 其中#號代表被蟲子啃掉的數字。根據算式,我們很容易判斷:第一行的兩個數字分別是5和3,第二行的數字是5。 ...
  • 一、打包JavaWeb應用 在Java中,使用"jar"命令來對將JavaWeb應用打包成一個War包,jar命令的用法如下: 範例:將he這個JavaWeb應用打包成war包 執行完之後,就可以得到一個文件,平時開發完JavaWeb應用後,一般都會將JavaWeb應用打包成一個war包,然後將這個 ...
  • InterlliJ IDEA 、JUnit、插件載入、單元測試 ...
  • 1.根據已完成的Hibernate1基礎案例,我們接下來寫一下查詢,修改刪除,對於基礎生可以學習一下 只改寫一下測試類的代碼 1 private void findStudent() { 2 //02Hibernate 保存 3 //讀取大配置文件,獲取連接的資料庫信息 4 Configuratio ...
  • 我學習go的五個感悟(譯) 原文 "5 things about programming I learned with Go By MICHAŁ KONARSKI" Go在最近一段時間內開始變得十分流行。語言相關的論文和博客每天都在更新,新的golang相關的項目在github中也層出不窮。Go語言 ...
  • 題目鏈接 Problem Description Giving two strings and you should judge if they are matched.The first string contains lowercase letters and uppercase letters ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...