FunDA(8)- Static Source:保證資源使用安全 - Resource Safety

来源:http://www.cnblogs.com/tiger-xc/archive/2017/02/10/6384886.html
-Advertisement-
Play Games

我們在前面用了許多章節來討論如何把數據從後臺資料庫中搬到記憶體,然後進行逐行操作運算。我們選定的解決方案是把後臺數據轉換成記憶體中的數據流。無論在打開資料庫表或從資料庫讀取數據等環節都涉及到對資料庫表這項資源的安全使用:我們最起碼要保證在完成使用或者使用中途出現錯誤異常退出時能釋放占用的資源。談到資源使 ...


   我們在前面用了許多章節來討論如何把數據從後臺資料庫中搬到記憶體,然後進行逐行操作運算。我們選定的解決方案是把後臺數據轉換成記憶體中的數據流。無論在打開資料庫表或從資料庫讀取數據等環節都涉及到對資料庫表這項資源的安全使用:我們最起碼要保證在完成使用或者使用中途出現錯誤異常退出時能釋放占用的資源。談到資源使用安全,不得不想到函數式編程通用的那個bracket函數,fs2同樣提供了這個函數:

def bracket[F[_],R,A](r: F[R])(use: R => Stream[F,A], release: R => F[Unit]): Stream[F,A] = Stream.mk {
    StreamCore.acquire(r, release andThen (Free.eval)) flatMap { case (_, r) => use(r).get }
  }

這個函數的入參數r,use,release都涉及到了資源占用處理:r一般是打開文件或者庫表操作,use是資源使用如讀取數據過程,release 顧名思義就是正常完成資源使用後的資源釋放清理過程。函數bracket能保證這些過程的正確引用。

我們用幾個例子來分析一下這個函數的功能:

val s = Stream.bracket(Task.delay(throw new Exception("Oh no!")))(
  _ => Stream(1,2,3) ++ Stream.fail(new Exception("boom!")) ++ Stream(3,4),
  _ => Task.delay(println("normal end")))         
s.runLog.unsafeRun                                //> java.lang.Exception: Oh no!
                                                  //|     at demo.ws.streams$$anonfun$main$1$$anonfun$1.apply(demo.ws.streams.scal
                                                  //| a:4)
                                                  //|     at demo.ws.streams$$anonfun$main$1$$anonfun$1.apply(demo.ws.streams.scal
                                                  //| a:4)

在上面這個例子里我們人為在兩個地方製造了異常。我們可以用onError來截獲這些異常: 

val s1 = s.map(_.toString).onError {e => Stream.emit(e.getMessage)}
                                                  
s1.runLog.unsafeRun                               //> res0: Vector[String] = Vector(Oh no!)

必須用toString轉換了Stream元素類型後才能把截獲的異常信息放進Stream。註意release未調用,因為資源還沒有被占用。但是如果除了釋放資源外還有其它清理工作的話,我們可以用onFinalize來確保一定可以調用清理程式:

val s5 = s1.onFinalize(Task.delay{println("finally end!")})
 
s5.runLog.unsafeRun                               //> finally end!
                                                  //| res1: Vector[String] = Vector(Oh no!)

如果在使用資源中間出現異常會怎樣?

val s3 = Stream.bracket(Task.delay())(
  _ => Stream(1,2,3) ++ Stream.fail(new Exception("boom!")) ++ Stream(3,4),
  _ => Task.delay(println("normal end"))) 
val s4 = s3.map(_.toString).onError {e => Stream.emit(e.getMessage)}
         .onFinalize(Task.delay{println("finally end!")})

s4.runLog.unsafeRun                               //> normal end
                                                  //| finally end!
                                                  //| res2: Vector[String] = Vector(1, 2, 3, boom!)

返回結果res2正確記錄了出錯地點,而且所有清理過程都得到運行。當然,我們可以不用動Stream元素類型,用attempt:

val s6 = s3.attempt.onError {e => Stream.emit(e.getMessage)}
         .onFinalize(Task.delay{println("finally end!")})
 
s6.runLog.unsafeRun                               //> normal end
                                                  //| finally end!
 //| res3: Vector[Object] = Vector(Right(1), Right(2), Right(3), Left(java.lang.Exception: boom!))

我們在前面FunDA(1)里討論過運算slick Query Action run返回結果類型是Future[Iterable[ROW]]。Slick獲取數據的方式是一次性讀入記憶體,所以本期標題提到的Static-Source就是指這樣的一個記憶體中的集合。那麼我們就可以不必考慮開啟並占用資料庫表這項操作了。我們只需要用FunDA DataRowType.getTypedRow函數獲取了Iterable[ROW]結果後直接傳給bracket就行了。現在最重要的是如何把Seq[ROW]轉換成Stream[F[_],ROW]。我們可以用Seq的fold函數來構建Stream: 

val data = Seq(1,2,3,4)                           //> data  : Seq[Int] = List(1, 2, 3, 4)
val s8 = data.foldLeft(Stream[Task,Int]())((s,a) => s ++ Stream.emit(a))
def log[A](prompt: String): Pipe[Task,A,A] =
    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}
                                                  //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]

s8.through(log("")).run.unsafeRun                 //> > 1
                                                  //| > 2
                                                  //| > 3
                                                  //| > 4

錶面上看好像沒什麼問題,但仔細分析:Seq[ROW]可以是個超大的集合,而foldLeft是個遞歸函數,無論是否尾遞歸都有可能造成堆棧溢出錯誤(StackOverflowError)。看來還是用freemonad,它可以把每步運算都存放在記憶體結構里,可以在固定的堆棧空間運算。下麵的函數用fs2.Pull類型結構可以把Seq[ROW]轉換成Stream[F[_],ROW]:

 def pullSeq[ROW](h: Seq[ROW]): Pull[Task, ROW, Unit] = {
    val it = h.iterator
    def go(it: Iterator[ROW]): Pull[Task, ROW, Unit] = for {
      res <- Pull.eval(Task.delay({ if (it.hasNext) Some(it.next()) else None }))
      next <- res.fold[Pull[Task, ROW, Unit]](Pull.done)(o => Pull.output1(o) >> go(it))
    } yield next
    go(it)
  }                                                  
 def streamSeq[ROW](h: Seq[ROW]): Stream[Task, ROW] =
    pullSeq(h).close

雖然go是個遞歸函數,但因為Pull是個freemonad,每個flapMap迴圈(>>)會把新的Iterable it狀態存放在heap記憶體里。由於每個步驟都是存放在記憶體結構里的,而運算這些步驟的模式是靠下游拖動逐步運算的,也就是說按下游拖動每次產生一個元素。pullSeq返回Pull,Pull.close >>> Stream,這就是streamSeq函數的作用了。現在我們可以直接用bracket來安全構建Stream:

 val s9 = Stream.bracket(Task.delay(data))(streamSeq, _ => Task.delay())                                                
 s9.through(log("")).run.unsafeRun               //> > 1
                                                  //| > 2
                                                  //| > 3
                                                  //| > 4

現在可以放心了。但我們的目的是為大眾編程人員提供一個最低門檻的工具庫,他們不需要瞭解Task, onError,onFinalize。。。我們必須把bracket函數使用方式搞得更直白點,讓用戶可以更容易調用:

  type FDAStream[A] = Stream[Task,A]
  implicit val strategy = Strategy.fromFixedDaemonPool(4)
                                                  //> strategy  : fs2.Strategy = Strategy

  def fda_staticSource[ROW](acquirer: => Seq[ROW],
                            releaser: => Unit = (),
                            errhandler: Throwable => FDAStream[ROW] = null,
                            finalizer: => Unit = ()): FDAStream[ROW] = {
     val s = Stream.bracket(Task(acquirer))(r => streamSeq(r), r => Task(releaser))
     if (errhandler != null)
       s.onError(errhandler).onFinalize(Task.delay(finalizer))
     else
       s.onFinalize(Task.delay(finalizer))
  }                                               //> fda_staticSource: [ROW](acquirer: => Seq[ROW], releaser: => Unit, errhandle
                                                  //| r: Throwable => demo.ws.streams.FDAStream[ROW], finalizer: => Unit)demo.ws.
                                                  //| streams.FDAStream[ROW]

如果完整調用fda_staticSource可以如下這樣:

  val s10 = fda_staticSource(data,
     println("endofuse"), e => { println(e.getMessage);Stream.emit(-99) },
     println("finallyend")) 
  s10.through(log("")).run.unsafeRun              //> > 1
                                                  //| > 2
                                                  //| > 3
                                                  //| > 4
                                                  //| endofuse
                                                  //| finallyend

最簡單直接的方式如下:

  val s11 = fda_staticSource(acquirer = data) 
  s11.through(log("")).run.unsafeRun              //> > 1
                                                  //| > 2
                                                  //| > 3
                                                  //| > 4

又或者帶異常處理過程的調用方法:

  val s12 = fda_staticSource(acquirer = data, errhandler = {e => println(e.getMessage);Stream()})
 
  s12.through(log("")).run.unsafeRun              //> > 1
                                                  //| > 2
                                                  //| > 3
                                                  //| > 4

下麵是這次討論示範的源代碼:

import fs2._
object streams {
val s = Stream.bracket(Task.delay(throw new Exception("Oh no!")))(
  _ => Stream(1,2,3) ++ Stream.fail(new Exception("boom!")) ++ Stream(3,4),
  _ => Task.delay(println("normal end")))
//s.runLog.unsafeRun
val s1 = s.map(_.toString).onError {e => Stream.emit(e.getMessage)}
s1.runLog.unsafeRun
val s5 = s1.onFinalize(Task.delay{println("finally end!")})
s5.runLog.unsafeRun

val s3 = Stream.bracket(Task.delay())(
  _ => Stream(1,2,3) ++ Stream.fail(new Exception("boom!")) ++ Stream(3,4),
  _ => Task.delay(println("normal end")))
val s4 = s3.map(_.toString).onError {e => Stream.emit(e.getMessage)}
         .onFinalize(Task.delay{println("finally end!")})
s4.runLog.unsafeRun
val s6 = s3.attempt.onError {e => Stream.emit(e.getMessage)}
         .onFinalize(Task.delay{println("finally end!")})

s6.runLog.unsafeRun
                                                  
val data = Seq(1,2,3,4)
val s8 = data.foldLeft(Stream[Task,Int]())((s,a) => s ++ Stream.emit(a))
def log[A](prompt: String): Pipe[Task,A,A] =
    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}

s8.through(log("")).run.unsafeRun
                                              
  def pullSeq[ROW](h: Seq[ROW]): Pull[Task, ROW, Unit] = {
    val it = h.iterator
    def go(it: Iterator[ROW]): Pull[Task, ROW, Unit] = for {
      res <- Pull.eval(Task.delay({ if (it.hasNext) Some(it.next()) else None }))
      next <- res.fold[Pull[Task, ROW, Unit]](Pull.done)(o => Pull.output1(o) >> go(it))
    } yield next
    go(it)
  }
  def streamSeq[ROW](h: Seq[ROW]): Stream[Task, ROW] =
    pullSeq(h).close
  val s9 = Stream.bracket(Task.delay(data))(streamSeq, _ => Task.delay())
  s9.through(log("")).run.unsafeRun
  
  type FDAStream[A] = Stream[Task,A]
  implicit val strategy = Strategy.fromFixedDaemonPool(4)

  def fda_staticSource[ROW](acquirer: => Seq[ROW],
                            releaser: => Unit = (),
                            errhandler: Throwable => FDAStream[ROW] = null,
                            finalizer: => Unit = ()): FDAStream[ROW] = {
     val s = Stream.bracket(Task(acquirer))(r => streamSeq(r), r => Task(releaser))
     if (errhandler != null)
       s.onError(errhandler).onFinalize(Task.delay(finalizer))
     else
       s.onFinalize(Task.delay(finalizer))
  }
  val s10 = fda_staticSource(data,
     println("endofuse"), e => { println(e.getMessage);Stream.emit(-99) },
     println("finallyend"))
  s10.through(log("")).run.unsafeRun
   val s11 = fda_staticSource(acquirer = data)
   s11.through(log("")).run.unsafeRun
   val s12 = fda_staticSource(acquirer = data, errhandler = {e => println(e.getMessage);Stream()})
   s12.through(log("")).run.unsafeRun
 
 }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 先初始化主類中的靜態數據,如果要用其他類來定義對象,則初始化對應的其他類。 實例化對象時,先初始化定義為static的數據,再初始化定義為非static的數據,最後調用構造函數。 通過一個小程式,瞭解靜態數據是如何初始化的: 初始化順序:要執行main,必須先載入StaticInitializati ...
  • 一、新建Maven Module測試站點 \ 二、配置Application Server 1.File->Setting,打開設置面板; 2.選中Application Servers,點擊+,設置tomcat路徑,指向本機Tomcat所在目錄。如下圖所示: 三、設置Run Configurati ...
  • 之前寫過一篇用jsoup爬取csdn博客的文章JAVA爬蟲挖取CSDN博客文章 ,當時博主還在上一家公司實習,由於公司辦公網路需要代理才能訪問外網,那一篇的代碼邏輯與代理密切相關,可能有些不熟悉jsoup怎麼使用的朋友看了會感覺越看越糊塗,且當時以為爬取所有文章需要用到分頁,可能會誤導讀者。所以今天... ...
  • 簡介 該頭文件有關 位集 ,實際上是vector對應的固定大小版本(fix sized),位的大小在編譯期固定。 位 位本質上對應bool的概念,只有0或1,true或false兩種對立的值。 但很可惜,位元組才是機器上最小的存儲單元,所以bool基本上是由一個位元組大小。 bitset是出於高效的空間 ...
  • ActiveMQ 是 Apache 出品的、當前最流行的消息中間件之一。本文是《成小胖學習技術》系列的第二篇文章,通過對 JMS、ActiveMQ 的基本概念的講解,讓小白們快速認識 ActiveMQ,同時掌握其基本用法。不是所有的技術類文章都是枯燥無味的,不信你就點進來看看。 ...
  • 三種情況:final數據、方法和類 一、final數據 一個永不改變的編譯時常量。(編譯時常量必須是基本數據類型,必須在這個常量定義時賦值) 一個在運行時被初始化的值,而不希望它被改變。 對於基本類型,final使數值恆定不變;對於對象引用,final使引用恆定不變,然而對象其自身卻是可以被修改的。 ...
  • 隨著目前微信越來越火,所以研究微信的人也就越來越多,這不前一段時間,我們公司就讓我做一個微信公眾號中問卷調查發紅包功能,經過一段時間的研究,把功能完成,裡面主要的實現步驟都是按照微信公眾號開發文檔來的,很詳細,在整個過程唯有紅包演算法需要仔細編寫,因為畢竟涉及到錢,所以得小心,而且不僅微信中需要發紅包 ...
  • 20170209問題解析請點擊今日問題下方的“【Java每日一題】20170210”查看(問題解析在公眾號首發,公眾號ID:weknow619) 今日問題: 請問主程式輸出結果是什麼?(點擊以下“【Java每日一題】20170210”查看20170209問題解析) 題目原發佈於公眾號、簡書:【Jav ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...