在以前的博文中我們介紹了Slick,它是一種FRM(Functional Relation Mapper)。有別於ORM,FRM的特點是函數式的語法可以支持靈活的對象組合(Query Composition)實現大規模的代碼重覆利用,但同時這些特點又影響了編程人員群體對FRM的接受程度,阻礙了FRM ...
在以前的博文中我們介紹了Slick,它是一種FRM(Functional Relation Mapper)。有別於ORM,FRM的特點是函數式的語法可以支持靈活的對象組合(Query Composition)實現大規模的代碼重覆利用,但同時這些特點又影響了編程人員群體對FRM的接受程度,阻礙了FRM成為廣為流行的一種資料庫編程方式。所以我們只能從小眾心態來探討如何改善Slick現狀,希望通過與某些Stream庫集成,在Slick FRM的基礎上恢復一些人們熟悉的Recordset資料庫游標(cursor)操作方式,希望如此可以降低FRM資料庫編程對函數式編程水平要求,能夠吸引更多的編程人員接受FRM。剛好,在這篇討論里我們希望能介紹一些Akka-Stream和外部系統集成對接的實際用例,把Slick資料庫數據載入連接到Akka-Stream形成streaming-dataset應該是一個挺好的想法。Slick和Akka-Stream可以說是自然匹配的一對,它們都是同一個公司產品,都支持Reactive-Specification。Reactive系統的集成對象之間是通過公共界面Publisher來實現對接的。Slick提供了個Dababase.stream函數可以構建這個Publisher:
/** Create a `Publisher` for Reactive Streams which, when subscribed to, will run the specified
* `DBIOAction` and return the result directly as a stream without buffering everything first.
* This method is only supported for streaming actions.
*
* The Publisher itself is just a stub that holds a reference to the action and this Database.
* The action does not actually start to run until the call to `onSubscribe` returns, after
* which the Subscriber is responsible for reading the full response or cancelling the
* Subscription. The created Publisher can be reused to serve a multiple Subscribers,
* each time triggering a new execution of the action.
*
* For the purpose of combinators such as `cleanup` which can run after a stream has been
* produced, cancellation of a stream by the Subscriber is not considered an error. For
* example, there is no way for the Subscriber to cause a rollback when streaming the
* results of `someQuery.result.transactionally`.
*
* When using a JDBC back-end, all `onNext` calls are done synchronously and the ResultSet row
* is not advanced before `onNext` returns. This allows the Subscriber to access LOB pointers
* from within `onNext`. If streaming is interrupted due to back-pressure signaling, the next
* row will be prefetched (in order to buffer the next result page from the server when a page
* boundary has been reached). */
final def stream[T](a: DBIOAction[_, Streaming[T], Nothing]): DatabasePublisher[T] = streamInternal(a, false)
這個DatabasePublisher[T]就是一個Publisher[T]:
/** A Reactive Streams `Publisher` for database Actions. */
abstract class DatabasePublisher[T] extends Publisher[T] { self =>
...
}
然後Akka-Stream可以通過Source.fromPublisher(publisher)構建Akka Source構件:
/**
* Helper to create [[Source]] from `Publisher`.
*
* Construct a transformation starting with given publisher. The transformation steps
* are executed by a series of [[org.reactivestreams.Processor]] instances
* that mediate the flow of elements downstream and the propagation of
* back-pressure upstream.
*/
def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] =
fromGraph(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource")))
理論上Source.fromPublisher(db.stream(query))就可以構建一個Reactive-Stream-Source了。下麵我們就建了例子來做示範:首先是Slick的鋪墊代碼boiler-code:
val aqmraw = Models.AQMRawQuery
val db = Database.forConfig("h2db")
// aqmQuery.result returns Seq[(String,String,String,String)]
val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)}
// type alias
type RowType = (String,String,String,String)
// user designed strong typed resultset type. must extend FDAROW
case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW
// strong typed resultset conversion function. declared implicit to remind during compilation
implicit def toTypedRow(row: RowType): TypedRow =
TypedRow(row._1,row._2,row._3,row._4)
我們需要的其實就是aqmQuery,用它來構建DatabasePublisher:
// construct DatabasePublisher from db.stream
val dbPublisher: DatabasePublisher[RowType] = db.stream[RowType](aqmQuery.result)
// construct akka source
val source: Source[RowType,NotUsed] = Source.fromPublisher[RowType](dbPublisher)
有了dbPublisher就可以用Source.fromPublisher函數構建source了。現在我們試著運算這個Akka-Stream:
implicit val actorSys = ActorSystem("actor-system")
implicit val ec = actorSys.dispatcher
implicit val mat = ActorMaterializer()
source.take(6).map{row => toTypedRow(row)}.runWith(
Sink.foreach(qmr => {
println(s"州名: ${qmr.state}")
println(s"縣名:${qmr.county}")
println(s"年份:${qmr.year}")
println(s"取值:${qmr.value}")
println("-------------")
}))
scala.io.StdIn.readLine()
actorSys.terminate()
下麵是運算結果:
州名: Alabama
縣名:Elmore
年份:1999
取值:5
-------------
州名: Alabama
縣名:Jefferson
年份:1999
取值:39
-------------
州名: Alabama
縣名:Lawrence
年份:1999
取值:28
-------------
州名: Alabama
縣名:Madison
年份:1999
取值:31
-------------
州名: Alabama
縣名:Mobile
年份:1999
取值:32
-------------
州名: Alabama
縣名:Montgomery
年份:1999
取值:15
-------------
顯示我們已經成功的連接了Slick和Akka-Stream。
現在我們有了Reactive stream source,它是個akka-stream,該如何對接處於下游的scalaz-stream-fs2呢?我們知道:akka-stream是Reactive stream,而scalaz-stream-fs2是純“拖式”pull-model stream,也就是說上面這個Reactive stream source必須被動等待下游的scalaz-stream-fs2來讀取數據。按照Reactive-Stream規範,下游必須通過backpressure信號來知會上游是否可以發送數據狀態,也就是說我們需要scalaz-stream-fs2來產生backpressure。scalaz-stream-fs2 async包里有個Queue結構:
/**
* Asynchronous queue interface. Operations are all nonblocking in their
* implementations, but may be 'semantically' blocking. For instance,
* a queue may have a bound on its size, in which case enqueuing may
* block until there is an offsetting dequeue.
*/
trait Queue[F[_], A] { self =>
/**
* Enqueues one element in this `Queue`.
* If the queue is `full` this waits until queue is empty.
*
* This completes after `a` has been successfully enqueued to this `Queue`
*/
def enqueue1(a: A): F[Unit]
/**
* Enqueues each element of the input stream to this `Queue` by
* calling `enqueue1` on each element.
*/
def enqueue: Sink[F, A] = _.evalMap(enqueue1)
/** Dequeues one `A` from this queue. Completes once one is ready. */
def dequeue1: F[A]
/** Repeatedly calls `dequeue1` forever. */
def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat
...
}
這個結構支持多線程操作,也就是說enqueue和dequeue可以在不同的線程里操作。值得關註的是:enqueue會block,只有在完成了dequeue後才能繼續。這個dequeue就變成了抵消backpressure的有效方法了。具體操作方法是:上游在一個線程里用enqueue發送一個數據元素,然後等待下游完成在另一個線程里的dequeue操作,完成這個迴圈後再進行下一個元素的enqueue。enqueue代表akka-stream向scalaz-stream-fs2發送數據,可以用akka-stream的Sink構件來實現:
class FS2Gate[T](q: fs2.async.mutable.Queue[Task,Option[T]]) extends GraphStage[SinkShape[T]] {
val in = Inlet[T]("inport")
val shape = SinkShape.of(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler {
override def preStart(): Unit = {
pull(in) //initiate stream elements movement
super.preStart()
}
override def onPush(): Unit = {
q.enqueue1(Some(grab(in))).unsafeRun()
pull(in)
}
override def onUpstreamFinish(): Unit = {
q.enqueue1(None).unsafeRun()
println("the end of stream !")
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
q.enqueue1(None).unsafeRun()
completeStage()
}
setHandler(in,this)
}
}
以上這個akka-stream GraphStage描述了對上游每一個元素的enqueue動作。我們可以用scalaz-stream-fs2的flatMap來序列化運算兩個線程里的enqueue和dequeue:
val fs2Stream: Stream[Task,RowType] = Stream.eval(async.boundedQueue[Task,Option[RowType]](16))
.flatMap { q =>
Task(source.to(new FS2Gate[RowType](q)).run).unsafeRunAsyncFuture //enqueue Task(new thread)
pipe.unNoneTerminate(q.dequeue) //dequeue in current thread
}
這個函數返回fs2.Stream[Task,RowType],是一種運算方案,我們必須run來實際運算:
fs2Stream.map{row => toTypedRow(row)}
.map(qmr => {
println(s"州名: ${qmr.state}")
println(s"縣名:${qmr.county}")
println(s"年份:${qmr.year}")
println(s"取值:${qmr.value}")
println("-------------")
}).run.unsafeRun
通過測試運行,我們成功的為scalaz-stream-fs2實現了data streaming。
下麵是本次示範的源代碼:
import slick.jdbc.H2Profile.api._
import com.bayakala.funda._
import api._
import scala.language.implicitConversions
import scala.concurrent.duration._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import slick.basic.DatabasePublisher
import akka._
import fs2._
import akka.stream.stage.{GraphStage, GraphStageLogic}
class FS2Gate[T](q: fs2.async.mutable.Queue[Task,Option[T]]) extends GraphStage[SinkShape[T]] {
val in = Inlet[T]("inport")
val shape = SinkShape.of(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler {
override def preStart(): Unit = {
pull(in) //initiate stream elements movement
super.preStart()
}
override def onPush(): Unit = {
q.enqueue1(Some(grab(in))).unsafeRun()
pull(in)
}
override def onUpstreamFinish(): Unit = {
q.enqueue1(None).unsafeRun()
println("end of stream !!!!!!!")
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
q.enqueue1(None).unsafeRun()
completeStage()
}
setHandler(in,this)
}
}
object AkkaStreamSource extends App {
val aqmraw = Models.AQMRawQuery
val db = Database.forConfig("h2db")
// aqmQuery.result returns Seq[(String,String,String,String)]
val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)}
// type alias
type RowType = (String,String,String,String)
// user designed strong typed resultset type. must extend FDAROW
case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW
// strong typed resultset conversion function. declared implicit to remind during compilation
implicit def toTypedRow(row: RowType): TypedRow =
TypedRow(row._1,row._2,row._3,row._4)
// construct DatabasePublisher from db.stream
val dbPublisher: DatabasePublisher[RowType] = db.stream[RowType](aqmQuery.result)
// construct akka source
val source: Source[RowType,NotUsed] = Source.fromPublisher[RowType](dbPublisher)
implicit val actorSys = ActorSystem("actor-system")
implicit val ec = actorSys.dispatcher
implicit val mat = ActorMaterializer()
/*
source.take(10).map{row => toTypedRow(row)}.runWith(
Sink.foreach(qmr => {
println(s"州名: ${qmr.state}")
println(s"縣名:${qmr.county}")
println(s"年份:${qmr.year}")
println(s"取值:${qmr.value}")
println("-------------")
})) */
val fs2Stream: Stream[Task,RowType] = Stream.eval(async.boundedQueue[Task,Option[RowType]](16))
.flatMap { q =>
Task(source.to(new FS2Gate[RowType](q)).run).unsafeRunAsyncFuture //enqueue Task(new thread)
pipe.unNoneTerminate(q.dequeue) //dequeue in current thread
}
fs2Stream.map{row => toTypedRow(row)}
.map(qmr => {
println(s"州名: ${qmr.state}")
println(s"縣名:${qmr.county}")
println(s"年份:${qmr.year}")
println(s"取值:${qmr.value}")
println("-------------")
}).run.unsafeRun
scala.io.StdIn.readLine()
actorSys.terminate()
}