FunDA的設計目標就是把後臺資料庫中的數據搬到記憶體里,然後進行包括並行運算的數據處理,最後可能再對後臺資料庫進行更新。如果需要把數據搬到記憶體的話,那我們就必須考慮記憶體是否能一次性容納所有的數據,有必要配合數據處理分部逐步讀入,這就是Reactive Stream規範主要目的之一。所以在設計FunD ...
FunDA的設計目標就是把後臺資料庫中的數據搬到記憶體里,然後進行包括並行運算的數據處理,最後可能再對後臺資料庫進行更新。如果需要把數據搬到記憶體的話,那我們就必須考慮記憶體是否能一次性容納所有的數據,有必要配合數據處理分部逐步讀入,這就是Reactive Stream規範主要目的之一。所以在設計FunDA的數據源(Source)之前必須要考慮實現reacive-data-stream。Slick 3.x版在功能上的突破之一就是實現了對Reactive-Stream API的支持。遺憾的是新版的Slick並沒有提供針對data-stream的具體操作函數,官方文檔提到可以通過akka-stream或者Play-Iteratee-Reactive-Stream來實現對data-stream的處理操作。Slick是通過db.stream構建一個DatabasePublisher類型來實現Reactive-Stream介面的。Play則提供了stream.IterateeStreams.publisherToEnumerator(SlickDatabasePubliser)轉換函數,能夠把DatabasePublisher轉成Reactive-Stream的數據源(Source)。Play是通過Iteratee來實現對Reactive-Stream的處理操作。我們就在這節討論一下有關Iteratee的一些原理。在示範前我們必須在build.sbt中增加依賴:"com.typesafe.play" % "play-iteratees-reactive-streams_2.11" % "2.6.0"。所謂Reactive從字面來解釋就是互動。Reacive-Stream是指數據產生方(producer)和數據使用方(consumer)之間的互動。大體上是producer通知consumer數據準備完畢可以讀取、consumer通知producer讀取數據的具體狀態,提示是否可以發送數據。下麵我們就把Reactive-Stream的基礎原理給大家介紹一下:一般我們需要從一個Stream里獲取數據時,可以用下麵這個界面的read:
trait InputStream {
def read(): Byte
}
這是一種典型的同步操作:read會占用線程直到獲取這個Byte。我們可以用callback函數形式來解決這個問題:把一個讀取函數傳給目標Stream,以一種被動形式來獲取這個Byte:
trait InputStreamHandler {
def onByte(byte: Byte)
}
我們想辦法把onByte傳給Stream作為一種callback函數。當Stream有了Byte後調用這個onByte函數,在這個onByte函數里是收到Byte後應該進行的運算。不過收到這個Byte代表我們程式狀態的一個轉變,所以我們可以把上面這個界面寫成函數式的:
trait InputStreamHandler {
def onByte(byte: Byte): InputStreamHandler
}
由於狀態可能轉變,所以我們把當前這個有變化的對象傳出來。下麵是一個界面實現的例子:
class consume(data: Seq[Byte]) extends InputStreamHandler {
def onByte(byte: Byte) = new consume(data :+ byte)
}
這個例子里我們把讀取的Byte彙集到一個Seq里。但是假如Stream準備好了數據後調用我們的callback函數onByte,而我們無法立即完成函數內的運算,導致調用方線程阻塞,影響整個Stream的運轉。我們可以用Future來解決這個問題:
trait InputStreamHandle {
def onByte(byte: Byte): Future[InputStreamHandle]
}
這樣調用方可以立即返回了。不過,調用方如何把數據發送狀態通知數據讀取方呢?比如已經完成所有數據發送。我們需要把調用方返回的數據再細化點:
trait Input[+E]
case class EL[E](e: E) extends Input[E]
case object EOF extends Input[Nothing]
case object Empty extends Input[Nothing]
現在這個返回數據是個Input[E]了,是帶狀態的。返回數據具體類型EL,EOF,Empty從字面就可以理解它們代表的狀態了。我們的界面變成了這樣:
trait InputStreamHandler[E] {
def onInput(input: Input[E]): Future[InputStreamHandler[E]]
}
界面實現例子變成下麵這樣:
class consume(data: Seq[Byte]) extends InputStreamHandler[Byte] {
def onInput(input: Input[Byte]) = input match {
case EL(byte) => Future.successful(new consume(data :+ byte))
case _ => Future.successful(this)
}
}
上面這個例子中返回Future很是彆扭,我們可以這樣改善界面InputStreamHandler定義:
trait InputStreamHandler[E] {
def onByte[B](cont: (Input[E] => InputStreamHandler[E]) => Future[B]): Future[B]
}
現在我們可以這樣實現那個例子:
class consume(data: Seq[Byte]) extends InputStreamHandler[Byte] {
def onByte[B](cont: (Input[Byte] => InputStreamHandler[Byte]) => Future[B]) = cont {
case EL(byte) => new consume(data :+ byte)
case _ => this
}
}
現在用起來順手多了吧。從上面這些例子中我們可以得出一種“推式”流模式(push-model-stream): 由目標stream向讀取方推送數據。但Reactive-Stream應該還具備反向通告機制,比如讀取方如何通知目標stream已經完成讀取操作或者暫時無法再接受數據、又或者可以接受數據了。
現在我們對Reactive-Streams有了個大概的印象:這個模式由兩方組成,分別是:數據源(在push-model中就是數據發送方)以及數據消耗方,分別對應了Iteratee模式的Enumerator和Iteratee。也就是說:Enumerator負責發送,Iteratee負責接收。用Iteratee實現Reactive-Streams時必須實現Enumerator和Iteratee之間的雙向通告機制。實際上Iteratee描述瞭如何消耗Enumerator傳過來的數據:比如把數據串接起來(concat)或者相加彙總等。在消耗數據的過程中Iteratee也必須負責與Enumerator溝通以保證數據傳輸的順利進行。那麼Iteratee又應該如何與Enumerator溝通呢?為了實現這種溝通功能,我們再設計一個trait:
trait Step[E,+A]
case class Done[+A,E](a: A, remain: Input[E]) extends Step[E,A]
case class Cont[E,+A](k: Input[E] => InputStreamHandler[E,A]) extends Step[E,A]
case class Error[E](msg: String, loc:Input[E]) extends Step[E,Nothing]
Step代表Iteratee的操作狀態:Done代表完成,返回運算結果A,remain是剩餘的輸入、Cont代表可以用k來獲取數據、Error返回錯誤信息msg以及出錯地點loc。現在我們可以重新定義InputStreamHandler:
trait InputStreamHandler[E,A] {
def onInput[A](step: Step[E,A] => Future[A]): Future[A]
}
界面實現例子Consume如下:
class Consume(data: Seq[Byte]) extends InputStreamHandler[Byte,Seq[Byte]] {
def onInput(step: Step[Byte,Seq[Byte]] => Future[Seq[Byte]]) = step(Cont {
case EL(byte) => new Consume(data :+ byte)
case EOF => new InputStreamHandler[Byte,Seq[Byte]] {
def onInput(step: Step[Byte,Seq[Byte]] => Future[Seq[Byte]]) = step(Done(data,Empty))
}
case Empty => this
})
}
這個版本最大的區別在於當收到Stream發送的EOF信號後返回Done通知完成操作,可以使用運算結果data了。這個InputStreamHandler就是個Iteratee,它描述瞭如何使用(消耗)接收到的數據。我們可以把界面定義命名為下麵這樣:
trait Iteratee[E,+A] {
def onInput[B](folder: Step[E,A] => Future[B]): Future[B]
}
實際上Iteratee模式與下麵這個函數很相像:
def foldLeft[F[_],A,B](ax: F[A])(z: B)(f: (B,A) => B): B
F[A]是個數據源,我們不需要理會它是如何產生及發送數據的,我們只關註如何去處理收到的數據。在這個函數里(B,A)=>B就是具體的數據消耗方式。foldLeft代表了一種推式流模式(push-model-stream)。至於如何產生數據源,那就是Enumerator要考慮的了。
好了,我們先看看Iteratee正式的類型款式:Iteratee[E,A],E是數據元素類型,A是運算結果類型。trait Iteratee 有一個抽象函數:
/**
* Computes a promised value B from the state of the Iteratee.
*
* The folder function will be run in the supplied ExecutionContext.
* Exceptions thrown by the folder function will be stored in the
* returned Promise.
*
* If the folder function itself is synchronous, it's better to
* use `pureFold()` instead of `fold()`.
*
* @param folder a function that will be called on the current state of the iteratee
* @param ec the ExecutionContext to run folder within
* @return the result returned when folder is called
*/
def fold[B](folder: Step[E, A] => Future[B])(implicit ec: ExecutionContext): Future[B]
不同功能的Iteratee就是通過定義不同的fold函數構成的。fold是個callback函數提供給Enumerator。folder的輸入參數Step[E,A]代表了當前Iteratee的三種可能狀態:
object Step {
case class Done[+A, E](a: A, remaining: Input[E]) extends Step[E, A]
case class Cont[E, +A](k: Input[E] => Iteratee[E, A]) extends Step[E, A]
case class Error[E](msg: String, input: Input[E]) extends Step[E, Nothing]
}
當狀態為Cont[E,A]時,Enumerator就會用這個k: Input[E]=> Iteratee[E,A]函數把Input[E]推送給Iteratee。我們從一個簡單的Enumerator就可以看出:
/**
* Creates an enumerator which produces the one supplied
* input and nothing else. This enumerator will NOT
* automatically produce Input.EOF after the given input.
*/
def enumInput[E](e: Input[E]) = new Enumerator[E] {
def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] =
i.fold {
case Step.Cont(k) => eagerFuture(k(e))
case _ => Future.successful(i)
}(dec)
}
或者:
/**
* Create an Enumerator from a set of values
*
* Example:
* {{{
* val enumerator: Enumerator[String] = Enumerator("kiki", "foo", "bar")
* }}}
*/
def apply[E](in: E*): Enumerator[E] = in.length match {
case 0 => Enumerator.empty
case 1 => new Enumerator[E] {
def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = i.pureFoldNoEC {
case Step.Cont(k) => k(Input.El(in.head))
case _ => i
}
}
case _ => new Enumerator[E] {
def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = enumerateSeq(in, i)
}
}
-----
private def enumerateSeq[E, A]: (Seq[E], Iteratee[E, A]) => Future[Iteratee[E, A]] = { (l, i) =>
l.foldLeft(Future.successful(i))((i, e) =>
i.flatMap(it => it.pureFold {
case Step.Cont(k) => k(Input.El(e))
case _ => it
}(dec))(dec))
}
我們可以通過定義fold函數來獲取不同功能的Iteratee。下麵就是一個直接返回恆量值Iteratee的定義過程:
val doneIteratee = new Iteratee[String,Int] {
def fold[B](folder: Step[String,Int] => Future[B])(implicit ec: ExecutionContext): Future[B] = {
folder(Step.Done(21,Input.EOF))
}
}
這個Iteratee不會消耗任何輸入,直接就返回21。實際上我們可以直接用Done.apply來構建這個doneIteratee:
val doneIteratee = Done[String,Int](21,Input.Empty)
我們也可以定義一個只消耗一個輸入元素的Iteratee:
val consumeOne = new Iteratee[String,String] {
def fold[B](folder: Step[String,String] => Future[B])(implicit ec: ExecutionContext): Future[B] = {
folder(Step.Cont {
case Input.EOF => Done("OK",Input.EOF)
case Input.Empty => this
case Input.El(e) => Done(e,Input.EOF)
})
}
}
同樣,我們也可以用Cont構建器來構建這個consumeOne:
val consumeOne1 = Cont[String,String](in => Done("OK",Input.EOF))
從上面這些例子里我們可以推敲folder函數應該是在Enumerator里定義的,看看下麵這個Enumerator例子:
val enumerator = new Enumerator[String] {
// some messages
val items = 1 to 10 map (i => i.toString)
var index = 0
override def apply[A](i: Iteratee[String, A]):
Future[Iteratee[String, A]] = {
i.fold(
// the folder
{
step => {
step match {
// iteratee is done, so no more messages
// to send
case Step.Done(result, remaining) => {
println("Step.Done")
Future(i)
}
// iteratee can consume more
case Step.Cont(k: (Input[String] => Iteratee[String, A]))
=> {
println("Step.Cont")
// does enumerator have more messages ?
if (index < items.size) {
val item = items(index)
println(s"El($item)")
index += 1
// get new state of iteratee
val newIteratee = k(Input.El(item))
// recursive apply
apply(newIteratee)
} else {
println("EOF")
Future(k(Input.EOF))
}
}
// iteratee is in error state
case Step.Error(message, input: Input[String]) => {
println("Step.Error")
Future(i)
}
}
}
})
}
}
下麵我們示範一個完整的例子:
val userIteratee = new Iteratee[String, Unit] {
override def fold[B](folder: (Step[String, Unit]) => Future[B])
(implicit ec: ExecutionContext): Future[B] = {
// accumulator
val buffer: ListBuffer[String] = ListBuffer()
// the step function
def stepFn(in: Input[String]): Iteratee[String, Unit] = {
in match {
case Input.Empty => this
case Input.EOF => Done({
println(s"Result ${buffer.mkString("--")}")
}, Input.Empty)
case Input.El(el) => {
buffer += el
Cont(stepFn)
}
}
}
// initial state -> iteratee ready to accept input
folder(Step.Cont(stepFn))
}
} //> userIteratee : play.api.libs.iteratee.Iteratee[String,Unit] = demo.worksheet.iteratee2$$anonfun$main$1$$anon$3@4f063c0a
val usersEnum = Enumerator("Tiger","John","Jimmy","Kate","Chris")
//> usersEnum : play.api.libs.iteratee.Enumerator[String] = play.api.libs.iteratee.Enumerator$$anon$19@51cdd8a
(usersEnum |>>> userIteratee) //> Result Tiger--John--Jimmy--Kate--Chris res0: scala.concurrent.Future[Unit] = Success(())
Enumerator usersEnum把輸入推送給userIteratee、userIteratee在完成時直接把它們印了出來。在play-iterate庫Iteratee對象里有個fold函數(Iteratee.fold)。這是個通用的函數,可以輕鬆實現上面這個userIteratee和其它的彙總功能Iteratee。Iteratee.fold函數款式如下:
def fold[E, A](state: A)(f: (A, E) => A): Iteratee[E, A]
我們可以用這個fold函數來構建一個相似的Iteratee:
val userIteratee2 = Iteratee.fold(List[String]())((st, el:String) => st :+ el)
//> userIteratee2 : play.api.libs.iteratee.Iteratee[String,List[String]] = Cont(<function1>)
(usersEnum |>>> userIteratee2).foreach {x => println(x)}
//| List(Tiger, John, Jimmy, Kate, Chris)
下麵是另外兩個用fold函數的例子:
val inputLength: Iteratee[String,Int] = {
Iteratee.fold[String,Int](0) { (length, chars) => length + chars.length }
//> inputLength : play.api.libs.iteratee.Iteratee[String,Int] = Cont(<function1>)
}
Await.result((usersEnum |>>> inputLength),Duration.Inf)
//> res1: Int = 23
val consume: Iteratee[String,String] = {
Iteratee.fold[String,String]("") { (result, chunk) => result ++ chunk }
//> consume : play.api.libs.iteratee.Iteratee[String,String] = Cont(<function1 >)
}
Await.result((usersEnum |>>> consume),Duration.Inf)
//> res2: String = TigerJohnJimmyKateChris
從以上的練習里我們基本摸清了定義Iteratee的兩種主要模式:
1、構建新的Iteratee,重新定義fold函數,如上面的userIteratee及下麵這個上傳大型json文件的例子:
object ReactiveFileUpload extends Controller {
def upload = Action(BodyParser(rh => new CsvIteratee(isFirst = true))) {
request =>
Ok("File Processed")
}
}
case class CsvIteratee(state: Symbol = 'Cont, input: Input[Array[Byte]] = Empty, lastChunk: String = "", isFirst: Boolean = false) extends Iteratee[Array[Byte], Either[Result, String]] {
def fold[B](
done: (Either[Result, String], Input[Array[Byte]]) => Promise[B],
cont: (Input[Array[Byte]] => Iteratee[Array[Byte], Either[Result, String]]) => Promise[B],
error: (String, Input[Array[Byte]]) => Promise[B]
): Promise[B] = state match {
case 'Done =>
done(Right(lastChunk), Input.Empty)
case 'Cont => cont(in => in match {
case in: El[Array[Byte]] => {
// Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk
val content = lastChunk + new String(in.e)
val csvBody =
if (isFirst)
// Skip http header if it is the first chunk
content.drop(content.indexOf("\r\n\r\n") + 4)
else content
val csv = new CSVReader(new StringReader(csvBody), ';')
val lines = csv.readAll
// Process all lines excepted the last one since it is cut by the chunk
for (line <- lines.init)
processLine(line)
// Put forward the part that has not been processed
val last = lines.last.toList.mkString(";")
copy(input = in, lastChunk = last, isFirst = false)
}
case Empty => copy(input = in, isFirst = false)
case EOF => copy(state = 'Done, input = in, isFirst = false)
case _ => copy(state = 'Error, input = in, isFirst = false)
})
case _ =>
error("Unexpected state", input)
}
def processLine(line: Array[String]) = WS.url("http://localhost:9200/affa/na/").post(
toJson(
Map(
"date" -> toJson(line(0)),
"trig" -> toJson(line(1)),
"code" -> toJson(line(2)),
"nbjours" -> toJson(line(3).toDouble)
)
)
)
}
二、直接定義Cont:
/**
* Create an iteratee that takes the first element of the stream, if one occurs before EOF
*/
def head[E]: Iteratee[E, Option[E]] = {
def step: K[E, Option[E]] = {
case Input.Empty => Cont(step)
case Input.EOF => Done(None, Input.EOF)
case Input.El(e) => Done(Some(e), Input.Empty)
}
Cont(step)
}
及:
def fileIteratee(file: File): Iteratee[String, Long] = {
val helper = new FileNIOHelper(file)
def step(totalLines: Long)(in: Input[String]): Iteratee[String, Long] = in match {
case Input.EOF | Input.Empty =>
if(debug) println("CLOSING CHANNEL")
helper.close()
Done(totalLines, Input.EOF)
case Input.El(line) =>
if(debug) println(line)
helper.write(line)
Cont[String, Long](i => step(totalLines+1)(i))
}
// initiates iteration by initialize context and first state (Cont) and launching iteration
Cont[String, Long](i => step(0L)(i))
}
}