FunDA(6)- Reactive Streams:Play with Iteratees、Enumerator and Enumeratees

来源:http://www.cnblogs.com/tiger-xc/archive/2017/02/02/6361037.html
-Advertisement-
Play Games

在上一節我們介紹了Iteratee。它的功能是消耗從一些數據源推送過來的數據元素,不同的數據消耗方式代表了不同功能的Iteratee。所謂的數據源就是我們這節要討論的Enumerator。Enumerator是一種數據源:它會根據下游數據消耗方(Iteratee)的具體狀態主動向下推送數據元素。我們 ...


    在上一節我們介紹了Iteratee。它的功能是消耗從一些數據源推送過來的數據元素,不同的數據消耗方式代表了不同功能的Iteratee。所謂的數據源就是我們這節要討論的Enumerator。Enumerator是一種數據源:它會根據下游數據消耗方(Iteratee)的具體狀態主動向下推送數據元素。我們已經討論過Iteratee的狀態Step類型:

trait Step[E,+A]
case class Done[+A,E](a: A, remain: Input[E]) extends Step[E,A]
case class Cont[E,+A](k: Input[E] => InputStreamHandler[E,A]) extends Step[E,A]
case class Error[E](msg: String, loc:Input[E]) extends Step[E,Nothing]

這其中Iteratee通過Cont狀態通知Enumerator可以發送數據元素,並提供了k函數作為Enumerator的數據推送函數。Enumerator推送的數據元素,也就是Iteratee的輸入Input[E],除單純數據元素之外還代表著數據源狀態: 

trait Input[+E]
case class EL[E](e: E) extends Input[E]
case object EOF extends Input[Nothing]
case object Empty extends Input[Nothing]

Enumerator通過Input[E]來通知Iteratee當前數據源狀態,如:是否已經完成所有數據推送(EOF),或者當前推送了什麼數據元素(El[E](e:E))。Enumerator主動向Iteratee輸出數據然後返回新狀態的Iteratee。我們可以從Enumerator的類型款式看得出:

trait Enumerator[E] {

  /**
   * Apply this Enumerator to an Iteratee
   */
  def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]]

}

這個Future的目的主要是為了避免占用線程。實際上我們可以最終通過調用Iteratee的fold函數來實現Enumerator功能,如:

 /**
   * Creates an enumerator which produces the one supplied
   * input and nothing else. This enumerator will NOT
   * automatically produce Input.EOF after the given input.
   */
  def enumInput[E](e: Input[E]) = new Enumerator[E] {
    def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] =
      i.fold {
        case Step.Cont(k) => eagerFuture(k(e))
        case _ => Future.successful(i)
      }(dec)
  }

又或者通過構建器(constructor, apply)來構建Eumerator:

/**
   * Create an Enumerator from a set of values
   *
   * Example:
   * {{{
   *   val enumerator: Enumerator[String] = Enumerator("kiki", "foo", "bar")
   * }}}
   */
  def apply[E](in: E*): Enumerator[E] = in.length match {
    case 0 => Enumerator.empty
    case 1 => new Enumerator[E] {
      def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = i.pureFoldNoEC {
        case Step.Cont(k) => k(Input.El(in.head))
        case _ => i
      }
    }
    case _ => new Enumerator[E] {
      def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = enumerateSeq(in, i)
    }
  }

  /**
   * Create an Enumerator from any TraversableOnce like collection of elements.
   *
   * Example of an iterator of lines of a file :
   * {{{
   *  val enumerator: Enumerator[String] = Enumerator( scala.io.Source.fromFile("myfile.txt").getLines )
   * }}}
   */
  def enumerate[E](traversable: TraversableOnce[E])(implicit ctx: scala.concurrent.ExecutionContext): Enumerator[E] = {
    val it = traversable.toIterator
    Enumerator.unfoldM[scala.collection.Iterator[E], E](it: scala.collection.Iterator[E])({ currentIt =>
      if (currentIt.hasNext)
        Future[Option[(scala.collection.Iterator[E], E)]]({
          val next = currentIt.next
          Some((currentIt -> next))
        })(ctx)
      else
        Future.successful[Option[(scala.collection.Iterator[E], E)]]({
          None
        })
    })(dec)
  }

  /**
   * An empty enumerator
   */
  def empty[E]: Enumerator[E] = new Enumerator[E] {
    def apply[A](i: Iteratee[E, A]) = Future.successful(i)
  }

  private def enumerateSeq[E, A]: (Seq[E], Iteratee[E, A]) => Future[Iteratee[E, A]] = { (l, i) =>
    l.foldLeft(Future.successful(i))((i, e) =>
      i.flatMap(it => it.pureFold {
        case Step.Cont(k) => k(Input.El(e))
        case _ => it
      }(dec))(dec))
  }

下麵是個直接構建Enumerator的例子: 

 val enumUsers: Enumerator[String] = {
   Enumerator("Tiger","Hover","Grand","John")    
       //> enumUsers  : play.api.libs.iteratee.Enumerator[String] = play.api.libs.iteratee.Enumerator$$anon$19@2ef9b8bc

在這個例子里的Enumerator就是用上面那個apply構建的。我們把enumUsers連接到costume Iteratee:

 

 val consume = Iteratee.consume[String]()        //> consume  : play.api.libs.iteratee.Iteratee[String,String] = Cont(<function1>)
 val consumeUsers = enumUsers.apply(consume)      //> consumeUsers  : scala.concurrent.Future[play.api.libs.iteratee.Iteratee[String,String]] = Success(play.api.libs.iteratee.FutureIteratee@1dfe2924)

我們是用apply(consume)來連接Enumerator和Iteratees的。apply函數的定義如下:

/**
   * Attaches this Enumerator to an [[play.api.libs.iteratee.Iteratee]], driving the
   * Iteratee to (asynchronously) consume the input. The Iteratee may enter its
   * [[play.api.libs.iteratee.Done]] or [[play.api.libs.iteratee.Error]]
   * state, or it may be left in a [[play.api.libs.iteratee.Cont]] state (allowing it
   * to consume more input after that sent by the enumerator).
   *
   * If the Iteratee reaches a [[play.api.libs.iteratee.Done]] state, it will
   * contain a computed result and the remaining (unconsumed) input.
   */
  def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]]

這是個抽象函數。舉個例實現這個apply函數的例子:

/**
   * Creates an enumerator which produces the one supplied
   * input and nothing else. This enumerator will NOT
   * automatically produce Input.EOF after the given input.
   */
  def enumInput[E](e: Input[E]) = new Enumerator[E] {
    def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] =
      i.fold {
        case Step.Cont(k) => eagerFuture(k(e))
        case _ => Future.successful(i)
      }(dec)
  }

consumeUsers: Future[Iteratee[String,String]],我們用Future的函數來顯示發送數據內容:

 val futPrint = consumeUsers.flatMap { i => i.run }.map(println)
    //> futPrint  : scala.concurrent.Future[Unit] = List()
 Await.ready(futPrint,Duration.Inf)     //> TigerHoverGrandJohn res0: demo.worksheet.enumerator.futPrint.type = Success(()) 

另一種更直接的方式:

val futUsers = Iteratee.flatten(consumeUsers).run.map(println)
      //> futUsers  : scala.concurrent.Future[Unit] = List()
 Await.ready(futPrint,Duration.Inf)               
      //> TigerHoverGrandJohnres1: demo.worksheet.enumerator.futPrint.type = Success(())

我們也可以使用函數符號 |>> :

 val futPrintUsers = {
  Iteratee.flatten(enumUsers |>> consume).run.map(println)
     //> futPrintUsers  : scala.concurrent.Future[Unit] = List()
 }
 Await.ready(futPrintUsers,Duration.Inf)          
     //> TigerHoverGrandJohn res2: demo.worksheet.enumerator.futPrintUsers.type = Success(())

我們還可以把兩個Enumerator串聯起來向一個Iteratee發送數據:

 val futEnums = {
   Iteratee.flatten {
     (enumUsers >>> enumColors) |>> consume
   }.run.map(println)                       //> futEnums  : scala.concurrent.Future[Unit] = List()
 }
  Await.ready(futEnums,Duration.Inf)              
      //> TigerHoverGrandJohnRedWhiteBlueYellow res3: demo.worksheet.enumerator.futEnums.type = Success(())

當然,最實用的應該是把InputStream的數據推送給一個Iteratee,如把一個文件內容發送給Iteratee:

/**
   * Create an enumerator from the given input stream.
   *
   * Note that this enumerator will block when it reads from the file.
   *
   * @param file The file to create the enumerator from.
   * @param chunkSize The size of chunks to read from the file.
   */
  def fromFile(file: java.io.File, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
    fromStream(new java.io.FileInputStream(file), chunkSize)(ec)
  }

/**
   * Create an enumerator from the given input stream.
   *
   * This enumerator will block on reading the input stream, in the supplied ExecutionContext.  Care must therefore
   * be taken to ensure that this isn't a slow stream.  If using this with slow input streams, make sure the
   * ExecutionContext is appropriately configured to handle the blocking.
   *
   * @param input The input stream
   * @param chunkSize The size of chunks to read from the stream.
   * @param ec The ExecutionContext to execute blocking code.
   */
  def fromStream(input: java.io.InputStream, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
    implicit val pec = ec.prepare()
    generateM({
      val buffer = new Array[Byte](chunkSize)
      val bytesRead = blocking { input.read(buffer) }
      val chunk = bytesRead match {
        case -1 => None
        case `chunkSize` => Some(buffer)
        case read =>
          val input = new Array[Byte](read)
          System.arraycopy(buffer, 0, input, 0, read)
          Some(input)
      }
      Future.successful(chunk)
    })(pec).onDoneEnumerating(input.close)(pec)
  }

這項功能的核心函數是這個generateM,它的函數款式如下:

/**
   * Like [[play.api.libs.iteratee.Enumerator.repeatM]], but the callback returns an Option, which allows the stream
   * to be eventually terminated by returning None.
   *
   * @param e The input function.  Returns a future eventually redeemed with Some value if there is input to pass, or a
   *          future eventually redeemed with None if the end of the stream has been reached.
   */
  def generateM[E](e: => Future[Option[E]])(implicit ec: ExecutionContext): Enumerator[E] = checkContinue0(new TreatCont0[E] {
    private val pec = ec.prepare()

    def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]) = executeFuture(e)(pec).flatMap {
      case Some(e) => loop(k(Input.El(e)))
      case None => Future.successful(Cont(k))
    }(dec)
  })

checkContinue0函數是這樣定義的:

trait TreatCont0[E] {

    def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]): Future[Iteratee[E, A]]

  }

  def checkContinue0[E](inner: TreatCont0[E]) = new Enumerator[E] {

    def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = {

      def step(it: Iteratee[E, A]): Future[Iteratee[E, A]] = it.fold {
        case Step.Done(a, e) => Future.successful(Done(a, e))
        case Step.Cont(k) => inner[A](step, k)
        case Step.Error(msg, e) => Future.successful(Error(msg, e))
      }(dec)

      step(it)
    }
  }

從這段代碼 case Step.Cont(k)=>inner[A](step, k)可以推斷操作模式應該是當下游Iteratee在Cont狀態下不斷遞歸式調用Cont函數k向下推送數據e。我們再仔細看看generateM的函數款式;

 def generateM[E](e: => Future[Option[E]])(implicit ec: ExecutionContext): Enumerator[E] 

實際上剛纔的操作就是重覆調用這個e:=>Future[Option[E]]函數。再分析fromStream代碼:

  def fromStream(input: java.io.InputStream, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
    implicit val pec = ec.prepare()
    generateM({
      val buffer = new Array[Byte](chunkSize)
      val bytesRead = blocking { input.read(buffer) }
      val chunk = bytesRead match {
        case -1 => None
        case `chunkSize` => Some(buffer)
        case read =>
          val input = new Array[Byte](read)
          System.arraycopy(buffer, 0, input, 0, read)
          Some(input)
      }
      Future.successful(chunk)
    })(pec).onDoneEnumerating(input.close)(pec)
  }

我們看到傳入generateM的參數是一段代碼,在Iteratee狀態為Cont時會不斷重覆運行,也就是說這段代碼會逐次從輸入源中讀取chunkSize個Byte。這種做法是典型的Streaming方式,避免了一次性上傳所有數據。下麵是一個文件讀取Enumerator例子:

 import java.io._
 val fileEnum: Enumerator[Array[Byte]] = {
  Enumerator.fromFile(new File("/users/tiger/lines.txt"))
 }
 val futFile = Iteratee.flatten { fileEnum |>> consume }.run.map(println)

註意:fileEnum |>> consume並不能通過編譯,這是因為fileEnum是個Enumerator[Array[Byte]],而consume是個Iteratee[String,String],Array[Byte]與String類型不符。我們可以用個Enumeratee來進行相關的轉換。下麵就介紹一下Enumeratee的功能。

Enumeratee其實是一種轉換器。它把Enumerator產生的數據轉換成能適配Iteratee的數據類型,或者Iteratee所需要的數據。比如我們想把一串字元類的數字彙總相加時,首先必須把字元轉換成數字類型才能進行Iteratee的彙總操作:

val strNums = Enumerator("1","2","3")            //> strNums  : play.api.libs.iteratee.Enumerator[String] = play.api.libs.iteratee.Enumerator$$anon$19@36b4cef0
 val sumIteratee: Iteratee[Int,Int] = Iteratee.fold(0)((s,i) => s+i)
                                                 //> sumIteratee  : play.api.libs.iteratee.Iteratee[Int,Int] = Cont(<function1>)
 
 val strToInt: Enumeratee[String,Int] = Enumeratee.map {s => s.toInt}
                                                 //> strToInt  : play.api.libs.iteratee.Enumeratee[String,Int] = play.api.libs.iteratee.Enumeratee$$anon$38$$anon$1@371a67ec
 strNums |>> strToInt.transform(sumIteratee)     //> res4: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[String,Int]] = List()
 strNums |>> strToInt &>> sumIteratee            //> res5: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[String,Int]] = List()
 strNums.through(strToInt) |>> sumIteratee       //> res6: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Int]] = List()
 val futsum = Iteratee.flatten(strNums &> strToInt |>> sumIteratee).run.map(println)
                                                //> futsum  : scala.concurrent.Future[Unit] = List()
 Await.ready(futsum,Duration.Inf)               //> 6
                                                //| res7: demo.worksheet.enumerator.futsum.type = Success(())

在上面這個例子里Enumerator數據元素是String, Iteratee操作數據類型是Int, strToInt是個把String轉換成Int的Enumeratee,我們用了幾種轉換方式的表達形式,結果都是一樣的,等於6。我們可以用Enumerator.through或者Enumeratee.transform來連接Enumerator與Iteratee。當然,我們也可以篩選輸入Iteratee的數據:

val sum2 = strNums &> Enumeratee.take(2) &> strToInt |>> sumIteratee
                 //> sum2  : scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Int]] =List()
 val futsum2 = Iteratee.flatten(sum2).run.map(println)
                                                  //> futsum2  : scala.concurrent.Future[Unit] = List()
 Await.ready(futsum2,Duration.Inf)                //> 3
                                                  //| res8: demo.worksheet.enumerator.futsum2.type = Success(())

上面例子里的Enumeratee.take(2)就是一個數據處理的Enumeratee。

現在Enumerator+Enumeratee+Iteratee從功能上越來越像fs2了,當然了,Iteratee就是一個流工具庫。我們已經選擇了fs2,因為它可以支持靈活的並行運算,所以再深入討論Iteratee就沒什麼意義了。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 環境:Abp1.2 疑問:沒有調用工作單元的SaveChanges方法引起的事務提交時機的問題. 例如:有一個應用服務代碼如下: 根據用戶提交數據插入一條記錄,但在方法末未顯式調用SaveChanges方法 在Mvc的Controller里調用上述方法的代碼如下: 在_phraseAppServic ...
  • Forms認證即是表單認證,需提供身份id和密碼password的進行認證和授權管理。應該是大家比較熟悉的一種,剛接觸.net可能都會學學這個東西。 ...
  • 前言: 我們在很多項目場景中使用對象映射工具,那麼使用最多的OOM對象工具也就那幾個。今天所說的EmitMapper 和TinyMapper 兩者的性能都是很高的,相比autoMapper 速度不知道快了多少倍,因為我平時使用的最多EmitMapper,所在業餘時間做了一下測試兩者對比。 測試數據: ...
  • 在微信支付中,當用戶支付成功後,微信會把相關支付結果和用戶信息發送給商戶,商戶需要接收處理,並返回應答。 在經歷了千幸萬苦之,填完了所有的JSAPI支付的坑後(微信JSAPI支付 跟 所遇到的那些坑),好不容易調起了微信支付介面,看到了親愛的支付頁面,支付成功後發現自己還有個叫做微信回調的忘了處理, ...
  • 1、Main函數是什麼?在程式中使用Main函數有什麼需要註意的地方? Q:程式的入口函數,函數名稱不能改變;一個程式有且只有一個Main函數。 2、面向對象的三大特征是什麼? Q:封裝,繼承,多態。 3、值類型預設值是什麼? Q:隔天公佈。 ...
  • 本文彙總了C#啟動外部程式的幾種常用方法,非常具有實用價值,主要包括如下幾種方法: 1. 啟動外部程式,不等待其退出。 2. 啟動外部程式,等待其退出。 3. 啟動外部程式,無限等待其退出。 4. 啟動外部程式,通過事件監視其退出。 c#使用process.start啟動程式報錯解決方法 ...
  • 1 不跟你多廢話 上代碼! /// <summary> /// SQL關鍵字轉換器 /// </summary> public class SqlConverter : IKeywordsConvertible { public SqlConverter(string[] keywords) { K ...
  • G.系列導航 【G】開源的分散式部署解決方案 - 預告篇 【G】開源的分散式部署解決方案(一) - 開篇 【G】開源的分散式部署解決方案(二) - 好項目是從爛項目基礎上重構出來的 分析目前項目結構 眼前出現這麼一坨坨的文件夾,相信很多人已經看不下去了。是的,首先就是要把它給做掉。 按照這個項目文件 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...