我們在前面用了許多章節來討論如何把數據從後臺資料庫中搬到記憶體,然後進行逐行操作運算。我們選定的解決方案是把後臺數據轉換成記憶體中的數據流。無論在打開資料庫表或從資料庫讀取數據等環節都涉及到對資料庫表這項資源的安全使用:我們最起碼要保證在完成使用或者使用中途出現錯誤異常退出時能釋放占用的資源。談到資源使 ...
我們在前面用了許多章節來討論如何把數據從後臺資料庫中搬到記憶體,然後進行逐行操作運算。我們選定的解決方案是把後臺數據轉換成記憶體中的數據流。無論在打開資料庫表或從資料庫讀取數據等環節都涉及到對資料庫表這項資源的安全使用:我們最起碼要保證在完成使用或者使用中途出現錯誤異常退出時能釋放占用的資源。談到資源使用安全,不得不想到函數式編程通用的那個bracket函數,fs2同樣提供了這個函數:
def bracket[F[_],R,A](r: F[R])(use: R => Stream[F,A], release: R => F[Unit]): Stream[F,A] = Stream.mk {
StreamCore.acquire(r, release andThen (Free.eval)) flatMap { case (_, r) => use(r).get }
}
這個函數的入參數r,use,release都涉及到了資源占用處理:r一般是打開文件或者庫表操作,use是資源使用如讀取數據過程,release 顧名思義就是正常完成資源使用後的資源釋放清理過程。函數bracket能保證這些過程的正確引用。
我們用幾個例子來分析一下這個函數的功能:
val s = Stream.bracket(Task.delay(throw new Exception("Oh no!")))(
_ => Stream(1,2,3) ++ Stream.fail(new Exception("boom!")) ++ Stream(3,4),
_ => Task.delay(println("normal end")))
s.runLog.unsafeRun //> java.lang.Exception: Oh no!
//| at demo.ws.streams$$anonfun$main$1$$anonfun$1.apply(demo.ws.streams.scal
//| a:4)
//| at demo.ws.streams$$anonfun$main$1$$anonfun$1.apply(demo.ws.streams.scal
//| a:4)
在上面這個例子里我們人為在兩個地方製造了異常。我們可以用onError來截獲這些異常:
val s1 = s.map(_.toString).onError {e => Stream.emit(e.getMessage)}
s1.runLog.unsafeRun //> res0: Vector[String] = Vector(Oh no!)
必須用toString轉換了Stream元素類型後才能把截獲的異常信息放進Stream。註意release未調用,因為資源還沒有被占用。但是如果除了釋放資源外還有其它清理工作的話,我們可以用onFinalize來確保一定可以調用清理程式:
val s5 = s1.onFinalize(Task.delay{println("finally end!")})
s5.runLog.unsafeRun //> finally end!
//| res1: Vector[String] = Vector(Oh no!)
如果在使用資源中間出現異常會怎樣?
val s3 = Stream.bracket(Task.delay())(
_ => Stream(1,2,3) ++ Stream.fail(new Exception("boom!")) ++ Stream(3,4),
_ => Task.delay(println("normal end")))
val s4 = s3.map(_.toString).onError {e => Stream.emit(e.getMessage)}
.onFinalize(Task.delay{println("finally end!")})
s4.runLog.unsafeRun //> normal end
//| finally end!
//| res2: Vector[String] = Vector(1, 2, 3, boom!)
返回結果res2正確記錄了出錯地點,而且所有清理過程都得到運行。當然,我們可以不用動Stream元素類型,用attempt:
val s6 = s3.attempt.onError {e => Stream.emit(e.getMessage)}
.onFinalize(Task.delay{println("finally end!")})
s6.runLog.unsafeRun //> normal end
//| finally end!
//| res3: Vector[Object] = Vector(Right(1), Right(2), Right(3), Left(java.lang.Exception: boom!))
我們在前面FunDA(1)里討論過運算slick Query Action run返回結果類型是Future[Iterable[ROW]]。Slick獲取數據的方式是一次性讀入記憶體,所以本期標題提到的Static-Source就是指這樣的一個記憶體中的集合。那麼我們就可以不必考慮開啟並占用資料庫表這項操作了。我們只需要用FunDA DataRowType.getTypedRow函數獲取了Iterable[ROW]結果後直接傳給bracket就行了。現在最重要的是如何把Seq[ROW]轉換成Stream[F[_],ROW]。我們可以用Seq的fold函數來構建Stream:
val data = Seq(1,2,3,4) //> data : Seq[Int] = List(1, 2, 3, 4)
val s8 = data.foldLeft(Stream[Task,Int]())((s,a) => s ++ Stream.emit(a))
def log[A](prompt: String): Pipe[Task,A,A] =
_.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}
//> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]
s8.through(log("")).run.unsafeRun //> > 1
//| > 2
//| > 3
//| > 4
錶面上看好像沒什麼問題,但仔細分析:Seq[ROW]可以是個超大的集合,而foldLeft是個遞歸函數,無論是否尾遞歸都有可能造成堆棧溢出錯誤(StackOverflowError)。看來還是用freemonad,它可以把每步運算都存放在記憶體結構里,可以在固定的堆棧空間運算。下麵的函數用fs2.Pull類型結構可以把Seq[ROW]轉換成Stream[F[_],ROW]:
def pullSeq[ROW](h: Seq[ROW]): Pull[Task, ROW, Unit] = {
val it = h.iterator
def go(it: Iterator[ROW]): Pull[Task, ROW, Unit] = for {
res <- Pull.eval(Task.delay({ if (it.hasNext) Some(it.next()) else None }))
next <- res.fold[Pull[Task, ROW, Unit]](Pull.done)(o => Pull.output1(o) >> go(it))
} yield next
go(it)
}
def streamSeq[ROW](h: Seq[ROW]): Stream[Task, ROW] =
pullSeq(h).close
雖然go是個遞歸函數,但因為Pull是個freemonad,每個flapMap迴圈(>>)會把新的Iterable it狀態存放在heap記憶體里。由於每個步驟都是存放在記憶體結構里的,而運算這些步驟的模式是靠下游拖動逐步運算的,也就是說按下游拖動每次產生一個元素。pullSeq返回Pull,Pull.close >>> Stream,這就是streamSeq函數的作用了。現在我們可以直接用bracket來安全構建Stream:
val s9 = Stream.bracket(Task.delay(data))(streamSeq, _ => Task.delay())
s9.through(log("")).run.unsafeRun //> > 1
//| > 2
//| > 3
//| > 4
現在可以放心了。但我們的目的是為大眾編程人員提供一個最低門檻的工具庫,他們不需要瞭解Task, onError,onFinalize。。。我們必須把bracket函數使用方式搞得更直白點,讓用戶可以更容易調用:
type FDAStream[A] = Stream[Task,A]
implicit val strategy = Strategy.fromFixedDaemonPool(4)
//> strategy : fs2.Strategy = Strategy
def fda_staticSource[ROW](acquirer: => Seq[ROW],
releaser: => Unit = (),
errhandler: Throwable => FDAStream[ROW] = null,
finalizer: => Unit = ()): FDAStream[ROW] = {
val s = Stream.bracket(Task(acquirer))(r => streamSeq(r), r => Task(releaser))
if (errhandler != null)
s.onError(errhandler).onFinalize(Task.delay(finalizer))
else
s.onFinalize(Task.delay(finalizer))
} //> fda_staticSource: [ROW](acquirer: => Seq[ROW], releaser: => Unit, errhandle
//| r: Throwable => demo.ws.streams.FDAStream[ROW], finalizer: => Unit)demo.ws.
//| streams.FDAStream[ROW]
如果完整調用fda_staticSource可以如下這樣:
val s10 = fda_staticSource(data,
println("endofuse"), e => { println(e.getMessage);Stream.emit(-99) },
println("finallyend"))
s10.through(log("")).run.unsafeRun //> > 1
//| > 2
//| > 3
//| > 4
//| endofuse
//| finallyend
最簡單直接的方式如下:
val s11 = fda_staticSource(acquirer = data)
s11.through(log("")).run.unsafeRun //> > 1
//| > 2
//| > 3
//| > 4
又或者帶異常處理過程的調用方法:
val s12 = fda_staticSource(acquirer = data, errhandler = {e => println(e.getMessage);Stream()})
s12.through(log("")).run.unsafeRun //> > 1
//| > 2
//| > 3
//| > 4
下麵是這次討論示範的源代碼:
import fs2._
object streams {
val s = Stream.bracket(Task.delay(throw new Exception("Oh no!")))(
_ => Stream(1,2,3) ++ Stream.fail(new Exception("boom!")) ++ Stream(3,4),
_ => Task.delay(println("normal end")))
//s.runLog.unsafeRun
val s1 = s.map(_.toString).onError {e => Stream.emit(e.getMessage)}
s1.runLog.unsafeRun
val s5 = s1.onFinalize(Task.delay{println("finally end!")})
s5.runLog.unsafeRun
val s3 = Stream.bracket(Task.delay())(
_ => Stream(1,2,3) ++ Stream.fail(new Exception("boom!")) ++ Stream(3,4),
_ => Task.delay(println("normal end")))
val s4 = s3.map(_.toString).onError {e => Stream.emit(e.getMessage)}
.onFinalize(Task.delay{println("finally end!")})
s4.runLog.unsafeRun
val s6 = s3.attempt.onError {e => Stream.emit(e.getMessage)}
.onFinalize(Task.delay{println("finally end!")})
s6.runLog.unsafeRun
val data = Seq(1,2,3,4)
val s8 = data.foldLeft(Stream[Task,Int]())((s,a) => s ++ Stream.emit(a))
def log[A](prompt: String): Pipe[Task,A,A] =
_.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}
s8.through(log("")).run.unsafeRun
def pullSeq[ROW](h: Seq[ROW]): Pull[Task, ROW, Unit] = {
val it = h.iterator
def go(it: Iterator[ROW]): Pull[Task, ROW, Unit] = for {
res <- Pull.eval(Task.delay({ if (it.hasNext) Some(it.next()) else None }))
next <- res.fold[Pull[Task, ROW, Unit]](Pull.done)(o => Pull.output1(o) >> go(it))
} yield next
go(it)
}
def streamSeq[ROW](h: Seq[ROW]): Stream[Task, ROW] =
pullSeq(h).close
val s9 = Stream.bracket(Task.delay(data))(streamSeq, _ => Task.delay())
s9.through(log("")).run.unsafeRun
type FDAStream[A] = Stream[Task,A]
implicit val strategy = Strategy.fromFixedDaemonPool(4)
def fda_staticSource[ROW](acquirer: => Seq[ROW],
releaser: => Unit = (),
errhandler: Throwable => FDAStream[ROW] = null,
finalizer: => Unit = ()): FDAStream[ROW] = {
val s = Stream.bracket(Task(acquirer))(r => streamSeq(r), r => Task(releaser))
if (errhandler != null)
s.onError(errhandler).onFinalize(Task.delay(finalizer))
else
s.onFinalize(Task.delay(finalizer))
}
val s10 = fda_staticSource(data,
println("endofuse"), e => { println(e.getMessage);Stream.emit(-99) },
println("finallyend"))
s10.through(log("")).run.unsafeRun
val s11 = fda_staticSource(acquirer = data)
s11.through(log("")).run.unsafeRun
val s12 = fda_staticSource(acquirer = data, errhandler = {e => println(e.getMessage);Stream()})
s12.through(log("")).run.unsafeRun
}