akka-stream的Graph是一種運算方案,它可能代表某種簡單的線性數據流圖如:Source/Flow/Sink,也可能是由更基礎的流圖組合而成相對複雜點的某種複合流圖,而這個複合流圖本身又可以被當作組件來組合更大的Graph。因為Graph只是對數據流運算的描述,所以它是可以被重覆利用的。所 ...
akka-stream的Graph是一種運算方案,它可能代表某種簡單的線性數據流圖如:Source/Flow/Sink,也可能是由更基礎的流圖組合而成相對複雜點的某種複合流圖,而這個複合流圖本身又可以被當作組件來組合更大的Graph。因為Graph只是對數據流運算的描述,所以它是可以被重覆利用的。所以我們應該儘量地按照業務流程需要來設計構建Graph。在更高的功能層面上實現Graph的模塊化(modular)。按上回討論,Graph又可以被描述成一種黑盒子,它的入口和出口就是Shape,而內部的作用即處理步驟Stage則是用GraphStage來形容的。下麵是akka-stream預設的一些基礎數據流圖:
上面Source,Sink,Flow代表具備線性步驟linear-stage的流圖,屬於最基礎的組件,可以用來構建數據處理鏈條。而Fan-In合併型,Fan-Out擴散型則具備多個輸入或輸出埠,可以用來構建更複雜的數據流圖。我們可以用以上這些基礎Graph來構建更複雜的複合流圖,而這些複合流圖又可以被重覆利用去構建更複雜的複合流圖。下麵就是一些常見的複合流圖:
註意上面的Composite Flow(from Sink and Source)可以用Flow.fromSinkAndSource函數構建:
def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] =
fromSinkAndSourceMat(sink, source)(Keep.none)
這個Flow從流向來說先Sink再Source是反的,形成的Flow上下游間無法協調,即Source端終結信號無法到達Sink端,因為這兩端是相互獨立的。我們必須用CoupledTermination對象中的fromSinkAndSource函數構建的Flow來解決這個問題:
/**
* Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow them them.
* Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped stages.
*/
object CoupledTerminationFlow {
@deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ...)(Keep.both)` instead", "2.5.2")
def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] =
Flow.fromSinkAndSourceCoupledMat(in, out)(Keep.both)
從上面圖列里的Composite BidiFlow可以看出:一個複合Graph的內部可以是很複雜的,但從外面看到的只是簡單的幾個輸入輸出埠。不過Graph內部構件之間的埠必須按照功能邏輯進行正確的連接,剩下的就變成直接向外公開的界面埠了。這種機制支持了層級式的模塊化組合方式,如下麵的圖示:
最後變成:
在DSL里我們可以用name("???")來分割模塊:
val nestedFlow = Flow[Int].filter(_ != 0) // an atomic processing stage .map(_ - 2) // another atomic processing stage .named("nestedFlow") // wraps up the Flow, and gives it a name val nestedSink = nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow .named("nestedSink") // wrap it up // Create a RunnableGraph val runnableGraph = nestedSource.to(nestedSink)
在下麵這個示範里我們自定義一個某種功能的流圖模塊:它有2個輸入和3個輸出。然後我們再使用這個自定義流圖模塊組建一個完整的閉合流圖:
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import scala.collection.immutable object GraphModules { def someProcess[I, O]: I => O = i => i.asInstanceOf[O] case class TwoThreeShape[I, I2, O, O2, O3]( in1: Inlet[I], in2: Inlet[I2], out1: Outlet[O], out2: Outlet[O2], out3: Outlet[O3]) extends Shape { override def inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil override def outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: out3 :: Nil override def deepCopy(): Shape = TwoThreeShape( in1.carbonCopy(), in2.carbonCopy(), out1.carbonCopy(), out2.carbonCopy(), out3.carbonCopy() ) } //a functional module with 2 input 3 output def TwoThreeGraph[I, I2, O, O2, O3] = GraphDSL.create() { implicit builder => val balancer = builder.add(Balance[I](2)) val flow = builder.add(Flow[I2].map(someProcess[I2, O2])) TwoThreeShape(balancer.in, flow.in, balancer.out(0), balancer.out(1), flow.out) } val closedGraph = GraphDSL.create() {implicit builder => import GraphDSL.Implicits._ val inp1 = builder.add(Source(List(1,2,3))).out val inp2 = builder.add(Source(List(10,20,30))).out val merge = builder.add(Merge[Int](2)) val mod23 = builder.add(TwoThreeGraph[Int,Int,Int,Int,Int]) inp1 ~> mod23.in1 inp2 ~> mod23.in2 mod23.out1 ~> merge.in(0) mod23.out2 ~> merge.in(1) mod23.out3 ~> Sink.foreach(println) merge ~> Sink.foreach(println) ClosedShape } } object TailorGraph extends App { import GraphModules._ implicit val sys = ActorSystem("streamSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer() RunnableGraph.fromGraph(closedGraph).run() scala.io.StdIn.readLine() sys.terminate() }
這個自定義的TwoThreeGraph是一個複合的流圖模塊,是可以重覆使用的。註意這個~>符合的使用:akka-stream只提供了對預設定Shape作為連接對象的支持如:
def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...}
def ~>[Out](junction: UniformFanOutShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...}
def ~>[Out](flow: FlowShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...}
def ~>(to: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit =
b.addEdge(importAndGetPort(b), b.add(to).in)
def ~>(to: SinkShape[T])(implicit b: Builder[_]): Unit =
b.addEdge(importAndGetPort(b), to.in)
...
所以對於我們自定義的TwoThreeShape就只能使用直接的埠連接了:
def ~>[U >: T](to: Inlet[U])(implicit b: Builder[_]): Unit =
b.addEdge(importAndGetPort(b), to)
以上的過程顯示:通過akka的GraphDSL,對複合型Graph的構建可以實現形象化,大部分工作都在如何對組件之間的埠進行連接。我們再來看個較複雜複合流圖的構建過程,下麵是這個流圖的圖示:
可以說這是一個相對複雜的數據處理方案,裡面甚至包括了數據流迴路(feedback)。無法想象如果用純函數數據流如scalaz-stream應該怎樣去實現這麼複雜的流程,也可能根本是沒有解決方案的。但用akka GraphDSL可以很形象的組合這個數據流圖;
import GraphDSL.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
val A: Outlet[Int] = builder.add(Source.single(0)).out
val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2))
val C: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))
val D: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1))
val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2))
val F: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))
val G: Inlet[Any] = builder.add(Sink.foreach(println)).in
C <~ F
A ~> B ~> C ~> F
B ~> D ~> E ~> F
E ~> G
ClosedShape
})
另一個埠連接方式的版本如下:
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
val B = builder.add(Broadcast[Int](2))
val C = builder.add(Merge[Int](2))
val E = builder.add(Balance[Int](2))
val F = builder.add(Merge[Int](2))
Source.single(0) ~> B.in; B.out(0) ~> C.in(1); C.out ~> F.in(0)
C.in(0) <~ F.out
B.out(1).map(_ + 1) ~> E.in; E.out(0) ~> F.in(1)
E.out(1) ~> Sink.foreach(println)
ClosedShape
})
如果把上面這個複雜的Graph切分成模塊的話,其中一部分是這樣的:
這個開放數據流複合圖可以用GraphDSL這樣構建:
val partial = GraphDSL.create() { implicit builder =>
val B = builder.add(Broadcast[Int](2))
val C = builder.add(Merge[Int](2))
val E = builder.add(Balance[Int](2))
val F = builder.add(Merge[Int](2))
C <~ F
B ~> C ~> F
B ~> Flow[Int].map(_ + 1) ~> E ~> F
FlowShape(B.in, E.out(1))
}.named("partial")
模塊化的完整Graph圖示如下:
這部分可以用下麵的代碼來實現:
// Convert the partial graph of FlowShape to a Flow to get
// access to the fluid DSL (for example to be able to call .filter())
val flow = Flow.fromGraph(partial)
// Simple way to create a graph backed Source
val source = Source.fromGraph( GraphDSL.create() { implicit builder =>
val merge = builder.add(Merge[Int](2))
Source.single(0) ~> merge
Source(List(2, 3, 4)) ~> merge
// Exposing exactly one output port
SourceShape(merge.out)
})
// Building a Sink with a nested Flow, using the fluid DSL
val sink = {
val nestedFlow = Flow[Int].map(_ * 2).drop(10).named("nestedFlow")
nestedFlow.to(Sink.head)
}
// Putting all together
val closed = source.via(flow.filter(_ > 1)).to(sink)
和scalaz-stream不同的還有akka-stream的運算是在actor上進行的,除了大家都能對數據流元素進行處理之外,akka-stream還可以通過actor的內部狀態來維護和返回運算結果。這個運算結果在複合流圖中傳播的過程是可控的,如下圖示:
返回運算結果是通過viaMat, toMat來實現的。簡寫的via,to預設選擇流圖左邊運算產生的結果。