在上一節我們介紹了Iteratee。它的功能是消耗從一些數據源推送過來的數據元素,不同的數據消耗方式代表了不同功能的Iteratee。所謂的數據源就是我們這節要討論的Enumerator。Enumerator是一種數據源:它會根據下游數據消耗方(Iteratee)的具體狀態主動向下推送數據元素。我們 ...


trait Step[E,+A]
case class Done[+A,E](a: A, remain: Input[E]) extends Step[E,A]
case class Cont[E,+A](k: Input[E] => InputStreamHandler[E,A]) extends Step[E,A]
case class Error[E](msg: String, loc:Input[E]) extends Step[E,Nothing]


trait Input[+E]
case class EL[E](e: E) extends Input[E]
case object EOF extends Input[Nothing]
case object Empty extends Input[Nothing]


trait Enumerator[E] {

   * Apply this Enumerator to an Iteratee
  def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]]



   * Creates an enumerator which produces the one supplied
   * input and nothing else. This enumerator will NOT
   * automatically produce Input.EOF after the given input.
  def enumInput[E](e: Input[E]) = new Enumerator[E] {
    def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] =
      i.fold {
        case Step.Cont(k) => eagerFuture(k(e))
        case _ => Future.successful(i)

又或者通過構建器(constructor, apply)來構建Eumerator:

   * Create an Enumerator from a set of values
   * Example:
   * {{{
   *   val enumerator: Enumerator[String] = Enumerator("kiki", "foo", "bar")
   * }}}
  def apply[E](in: E*): Enumerator[E] = in.length match {
    case 0 => Enumerator.empty
    case 1 => new Enumerator[E] {
      def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = i.pureFoldNoEC {
        case Step.Cont(k) => k(Input.El(in.head))
        case _ => i
    case _ => new Enumerator[E] {
      def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = enumerateSeq(in, i)

   * Create an Enumerator from any TraversableOnce like collection of elements.
   * Example of an iterator of lines of a file :
   * {{{
   *  val enumerator: Enumerator[String] = Enumerator("myfile.txt").getLines )
   * }}}
  def enumerate[E](traversable: TraversableOnce[E])(implicit ctx: scala.concurrent.ExecutionContext): Enumerator[E] = {
    val it = traversable.toIterator
    Enumerator.unfoldM[scala.collection.Iterator[E], E](it: scala.collection.Iterator[E])({ currentIt =>
      if (currentIt.hasNext)
        Future[Option[(scala.collection.Iterator[E], E)]]({
          val next =
          Some((currentIt -> next))
        Future.successful[Option[(scala.collection.Iterator[E], E)]]({

   * An empty enumerator
  def empty[E]: Enumerator[E] = new Enumerator[E] {
    def apply[A](i: Iteratee[E, A]) = Future.successful(i)

  private def enumerateSeq[E, A]: (Seq[E], Iteratee[E, A]) => Future[Iteratee[E, A]] = { (l, i) =>
    l.foldLeft(Future.successful(i))((i, e) =>
      i.flatMap(it => it.pureFold {
        case Step.Cont(k) => k(Input.El(e))
        case _ => it


 val enumUsers: Enumerator[String] = {
       //> enumUsers  : play.api.libs.iteratee.Enumerator[String] = play.api.libs.iteratee.Enumerator$$anon$19@2ef9b8bc

在這個例子里的Enumerator就是用上面那個apply構建的。我們把enumUsers連接到costume Iteratee:


 val consume = Iteratee.consume[String]()        //> consume  : play.api.libs.iteratee.Iteratee[String,String] = Cont(<function1>)
 val consumeUsers = enumUsers.apply(consume)      //> consumeUsers  : scala.concurrent.Future[play.api.libs.iteratee.Iteratee[String,String]] = Success(play.api.libs.iteratee.FutureIteratee@1dfe2924)


   * Attaches this Enumerator to an [[play.api.libs.iteratee.Iteratee]], driving the
   * Iteratee to (asynchronously) consume the input. The Iteratee may enter its
   * [[play.api.libs.iteratee.Done]] or [[play.api.libs.iteratee.Error]]
   * state, or it may be left in a [[play.api.libs.iteratee.Cont]] state (allowing it
   * to consume more input after that sent by the enumerator).
   * If the Iteratee reaches a [[play.api.libs.iteratee.Done]] state, it will
   * contain a computed result and the remaining (unconsumed) input.
  def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]]


   * Creates an enumerator which produces the one supplied
   * input and nothing else. This enumerator will NOT
   * automatically produce Input.EOF after the given input.
  def enumInput[E](e: Input[E]) = new Enumerator[E] {
    def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] =
      i.fold {
        case Step.Cont(k) => eagerFuture(k(e))
        case _ => Future.successful(i)

consumeUsers: Future[Iteratee[String,String]],我們用Future的函數來顯示發送數據內容:

 val futPrint = consumeUsers.flatMap { i => }.map(println)
    //> futPrint  : scala.concurrent.Future[Unit] = List()
 Await.ready(futPrint,Duration.Inf)     //> TigerHoverGrandJohn res0: demo.worksheet.enumerator.futPrint.type = Success(()) 


val futUsers = Iteratee.flatten(consumeUsers)
      //> futUsers  : scala.concurrent.Future[Unit] = List()
      //> TigerHoverGrandJohnres1: demo.worksheet.enumerator.futPrint.type = Success(())

我們也可以使用函數符號 |>> :

 val futPrintUsers = {
  Iteratee.flatten(enumUsers |>> consume)
     //> futPrintUsers  : scala.concurrent.Future[Unit] = List()
     //> TigerHoverGrandJohn res2: demo.worksheet.enumerator.futPrintUsers.type = Success(())


 val futEnums = {
   Iteratee.flatten {
     (enumUsers >>> enumColors) |>> consume
   }                       //> futEnums  : scala.concurrent.Future[Unit] = List()
      //> TigerHoverGrandJohnRedWhiteBlueYellow res3: demo.worksheet.enumerator.futEnums.type = Success(())


   * Create an enumerator from the given input stream.
   * Note that this enumerator will block when it reads from the file.
   * @param file The file to create the enumerator from.
   * @param chunkSize The size of chunks to read from the file.
  def fromFile(file:, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
    fromStream(new, chunkSize)(ec)

   * Create an enumerator from the given input stream.
   * This enumerator will block on reading the input stream, in the supplied ExecutionContext.  Care must therefore
   * be taken to ensure that this isn't a slow stream.  If using this with slow input streams, make sure the
   * ExecutionContext is appropriately configured to handle the blocking.
   * @param input The input stream
   * @param chunkSize The size of chunks to read from the stream.
   * @param ec The ExecutionContext to execute blocking code.
  def fromStream(input:, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
    implicit val pec = ec.prepare()
      val buffer = new Array[Byte](chunkSize)
      val bytesRead = blocking { }
      val chunk = bytesRead match {
        case -1 => None
        case `chunkSize` => Some(buffer)
        case read =>
          val input = new Array[Byte](read)
          System.arraycopy(buffer, 0, input, 0, read)


   * Like [[play.api.libs.iteratee.Enumerator.repeatM]], but the callback returns an Option, which allows the stream
   * to be eventually terminated by returning None.
   * @param e The input function.  Returns a future eventually redeemed with Some value if there is input to pass, or a
   *          future eventually redeemed with None if the end of the stream has been reached.
  def generateM[E](e: => Future[Option[E]])(implicit ec: ExecutionContext): Enumerator[E] = checkContinue0(new TreatCont0[E] {
    private val pec = ec.prepare()

    def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]) = executeFuture(e)(pec).flatMap {
      case Some(e) => loop(k(Input.El(e)))
      case None => Future.successful(Cont(k))


trait TreatCont0[E] {

    def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]): Future[Iteratee[E, A]]


  def checkContinue0[E](inner: TreatCont0[E]) = new Enumerator[E] {

    def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = {

      def step(it: Iteratee[E, A]): Future[Iteratee[E, A]] = it.fold {
        case Step.Done(a, e) => Future.successful(Done(a, e))
        case Step.Cont(k) => inner[A](step, k)
        case Step.Error(msg, e) => Future.successful(Error(msg, e))


從這段代碼 case Step.Cont(k)=>inner[A](step, k)可以推斷操作模式應該是當下游Iteratee在Cont狀態下不斷遞歸式調用Cont函數k向下推送數據e。我們再仔細看看generateM的函數款式;

 def generateM[E](e: => Future[Option[E]])(implicit ec: ExecutionContext): Enumerator[E] 


  def fromStream(input:, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
    implicit val pec = ec.prepare()
      val buffer = new Array[Byte](chunkSize)
      val bytesRead = blocking { }
      val chunk = bytesRead match {
        case -1 => None
        case `chunkSize` => Some(buffer)
        case read =>
          val input = new Array[Byte](read)
          System.arraycopy(buffer, 0, input, 0, read)


 val fileEnum: Enumerator[Array[Byte]] = {
  Enumerator.fromFile(new File("/users/tiger/lines.txt"))
 val futFile = Iteratee.flatten { fileEnum |>> consume }

註意:fileEnum |>> consume並不能通過編譯,這是因為fileEnum是個Enumerator[Array[Byte]],而consume是個Iteratee[String,String],Array[Byte]與String類型不符。我們可以用個Enumeratee來進行相關的轉換。下麵就介紹一下Enumeratee的功能。


val strNums = Enumerator("1","2","3")            //> strNums  : play.api.libs.iteratee.Enumerator[String] = play.api.libs.iteratee.Enumerator$$anon$19@36b4cef0
 val sumIteratee: Iteratee[Int,Int] = Iteratee.fold(0)((s,i) => s+i)
                                                 //> sumIteratee  : play.api.libs.iteratee.Iteratee[Int,Int] = Cont(<function1>)
 val strToInt: Enumeratee[String,Int] = {s => s.toInt}
                                                 //> strToInt  : play.api.libs.iteratee.Enumeratee[String,Int] = play.api.libs.iteratee.Enumeratee$$anon$38$$anon$1@371a67ec
 strNums |>> strToInt.transform(sumIteratee)     //> res4: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[String,Int]] = List()
 strNums |>> strToInt &>> sumIteratee            //> res5: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[String,Int]] = List()
 strNums.through(strToInt) |>> sumIteratee       //> res6: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Int]] = List()
 val futsum = Iteratee.flatten(strNums &> strToInt |>> sumIteratee)
                                                //> futsum  : scala.concurrent.Future[Unit] = List()
 Await.ready(futsum,Duration.Inf)               //> 6
                                                //| res7: demo.worksheet.enumerator.futsum.type = Success(())

在上面這個例子里Enumerator數據元素是String, Iteratee操作數據類型是Int, strToInt是個把String轉換成Int的Enumeratee,我們用了幾種轉換方式的表達形式,結果都是一樣的,等於6。我們可以用Enumerator.through或者Enumeratee.transform來連接Enumerator與Iteratee。當然,我們也可以篩選輸入Iteratee的數據:

val sum2 = strNums &> Enumeratee.take(2) &> strToInt |>> sumIteratee
                 //> sum2  : scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Int]] =List()
 val futsum2 = Iteratee.flatten(sum2)
                                                  //> futsum2  : scala.concurrent.Future[Unit] = List()
 Await.ready(futsum2,Duration.Inf)                //> 3
                                                  //| res8: demo.worksheet.enumerator.futsum2.type = Success(())




























