作為一個能安全運行的工具庫,為了保證占用資源的安全性,對異常處理(exception handling)和事後處理(final clean-up)的支持是不可或缺的。FunDA的數據流FDAPipeLine一般是通過讀取資料庫數據形成數據源開始的。為了保證每個數據源都能被安全的使用,FunDA提供了 ...
作為一個能安全運行的工具庫,為了保證占用資源的安全性,對異常處理(exception handling)和事後處理(final clean-up)的支持是不可或缺的。FunDA的數據流FDAPipeLine一般是通過讀取資料庫數據形成數據源開始的。為了保證每個數據源都能被安全的使用,FunDA提供了事後處理finalizing程式介面來實現數據流使用完畢後的清理及異常處理(error-handling)程式介面來捕獲和處理使用過程中出現的異常情況。首先,事後處理程式(finalizer)保證了在任何情況下的FunDA數據流終止運算:包括元素耗盡,強制中斷以及異常中斷,finalizer都會被調用。在這篇討論里我們將會測試和示範FunDA Exception-handling和Final-cleanup。下麵的樣板代碼設定了一個靜態集合數據源viewState和一個動態數據流streamState:
val db = Database.forConfig("h2db")
implicit def toState(row: StateTable#TableElementType) =
StateModel(row.id,row.name)
val viewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)
val streamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toState _)
val stateSeq = viewLoader.fda_typedRows(StateQuery.result)(db).toSeq
val viewState = fda_staticSource(stateSeq)(println("***Finally*** the end of viewState!!!"))
val streamState = streamLoader.fda_typedStream(StateQuery.result)(db)(64,64)(println("***Finally*** the end of streamState!!!"))
在上面的代碼例子里我們可以看到fda_staticSource和fad_typedStream都掛接了事後處理程式,我們簡單的用println代表一段完整的程式來證實對事後處理程式的調用。所以說事後處理程式的掛接是在構建view或者stream時進行的。我們先看看它們在正常終止或者強行中斷是是否發生調用:
viewState.startRun
viewState.take(2).startRun
streamState.startRun
streamState.take(3).startRun
// ***Finally*** the end of viewState!!!
// ***Finally*** the end of viewState!!!
// ***Finally*** the end of streamState!!!
// ***Finally*** the end of streamState!!!
那麼如果在出現了異常中斷是是否同樣會被調用呢?我們先設計下麵兩個用戶自定義函數:
def trackRows: FDAUserTask[FDAROW] = row => {
row match {
case m@StateModel(id,name) =>
println(s"State: $id $name")
println( "----------------")
fda_next(m)
case m@_ => fda_next(m)
}
}
def errorRow: FDAUserTask[FDAROW] = row => {
row match {
case StateModel(id,name) =>
val idx = id / (id - 3)
fda_next(StateModel(idx,name))
case m@_ => fda_next(m)
}
}
trackRows跟蹤顯示當前數據行,errorRow人為的會在第三行出現異常。我們用streamState來測試一下:
streamState.appendTask(errorRow).appendTask(trackRows).startRun
// State: 0 Alabama
// ----------------
// State: -2 Alaska
// ----------------
// Exception in thread "main" java.lang.ArithmeticException: / by zero
// at examples.ExceptionsAndFinalizers$$anonfun$errorRow$1.apply(ExceptionsAndFinalizers.scala:46)
// ...
// at java.lang.Thread.run(Thread.java:745)
// ***Finally*** the end of streamState!!!
的確在正常顯示了兩行數據後,第三行出錯中斷,直接調用了finalizer。這就保證了無論發生任何情況,當完成使用數據源後都給予編程人員一個空間去進行事後處理如釋放資源、中斷連接、關閉文件等。
我們可以用onError來掛接異常處理程式,如下:
val s = streamState.appendTask(errorRow).appendTask(trackRows)
val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
註意:onError必須掛接在stream的最尾端以確保所有環節的異常情況都可以正確地得到處理。看看運行結果:
State: 0 Alabama
----------------
State: -2 Alaska
----------------
***Finally*** the end of streamState!!!
Caught Error in streamState!!![/ by zero]
以上例子捕獲了異常情況,同時在異常中斷情況後還是調用了finalizer。
有時我們需要自定義一些特殊情況,我們希望能捕獲這些情況的發生。但我們同時希望這些情況發生時不會中斷運算。首先我們可以先自定義一個異常行類型:
case class DivideZeroError(msg: String, e: Exception) extends FDAROW
註意:切不可忘記extends FDAROW。我們把上面的errorRow函數修改成一個自捕獲異常的函數:
def catchError: FDAUserTask[FDAROW] = row => {
row match {
case StateModel(id,name) =>
try {
val idx = id / (id - 3)
fda_next(StateModel(idx, name))
} catch {
case e: Exception => //pass an error row
fda_next(DivideZeroError(s"Divide by zero excption at ${id}",e))
}
case m@_ => fda_next(m)
}
}
必須修改trackRows能分辨DivideZeroError行:
def trackRows: FDAUserTask[FDAROW] = row => {
row match {
case m@StateModel(id,name) =>
println(s"State: $id $name")
println( "----------------")
fda_next(m)
case DivideZeroError(msg, e) => //error row
println(s"***Error:$msg***")
fda_skip
case m@_ => fda_next(m)
}
}
運算下麵的程式:
val s = streamState.take(5).appendTask(catchError).appendTask(trackRows)
val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
s1.startRun
產生下麵的結果:
State: 0 Alabama
----------------
State: -2 Alaska
----------------
***Error:Divide by zero excption at 3***
State: 4 Arkansas
----------------
State: 2 California
----------------
***Finally*** the end of streamState!!!
Process finished with exit code 0
沒有出現異常中斷,捕獲並處理了自定義異常,並且調用了事後處理程式finalizer。
下麵就是這次示範的源代碼:
import slick.jdbc.H2Profile.api._
import com.bayakala.funda.samples.SlickModels._
import com.bayakala.funda._
import api._
import scala.language.implicitConversions
object ExceptionsAndFinalizers extends App {
val db = Database.forConfig("h2db")
implicit def toState(row: StateTable#TableElementType) =
StateModel(row.id,row.name)
val viewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _)
val streamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toState _)
val stateSeq = viewLoader.fda_typedRows(StateQuery.result)(db).toSeq
val viewState = fda_staticSource(stateSeq)(println("***Finally*** the end of viewState!!!"))
val streamState = streamLoader.fda_typedStream(StateQuery.result)(db)(64,64)(println("***Finally*** the end of streamState!!!"))
/*
viewState.startRun
viewState.take(2).startRun
streamState.startRun
streamState.take(3).startRun
// ***Finally*** the end of viewState!!!
// ***Finally*** the end of viewState!!!
// ***Finally*** the end of streamState!!!
// ***Finally*** the end of streamState!!!
*/
def trackRows: FDAUserTask[FDAROW] = row => {
row match {
case m@StateModel(id,name) =>
println(s"State: $id $name")
println( "----------------")
fda_next(m)
case DivideZeroError(msg, e) => //error row
println(s"***Error:$msg***")
fda_skip
case m@_ => fda_next(m)
}
}
def errorRow: FDAUserTask[FDAROW] = row => {
row match {
case StateModel(id,name) =>
val idx = id / (id - 3)
fda_next(StateModel(idx,name))
case m@_ => fda_next(m)
}
}
case class DivideZeroError(msg: String, e: Exception) extends FDAROW
def catchError: FDAUserTask[FDAROW] = row => {
row match {
case StateModel(id,name) =>
try {
val idx = id / (id - 3)
fda_next(StateModel(idx, name))
} catch {
case e: Exception => //pass an error row
fda_next(DivideZeroError(s"Divide by zero excption at ${id}",e))
}
case m@_ => fda_next(m)
}
}
/*
streamState.appendTask(errorRow).appendTask(trackRows).startRun
// State: 0 Alabama
// ----------------
// State: -2 Alaska
// ----------------
// Exception in thread "main" java.lang.ArithmeticException: / by zero
// at examples.ExceptionsAndFinalizers$$anonfun$errorRow$1.apply(ExceptionsAndFinalizers.scala:46)
// ...
// at java.lang.Thread.run(Thread.java:745)
// ***Finally*** the end of streamState!!!
*/
/*
val v = viewState.appendTask(errorRow).appendTask(trackRows)
val v1 = v.onError {case e: Exception => println(s"Caught Error in viewState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
v1.startRun
val s = streamState.appendTask(errorRow).appendTask(trackRows)
val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
s1.startRun
*/
val s = streamState.take(5).appendTask(catchError).appendTask(trackRows)
val s1 = s.onError {case e: Exception => println(s"Caught Error in streamState!!![${e.getMessage}]"); fda_appendRow(FDANullRow)}
s1.startRun
}