官網地址 本文內容 簡介 Futures 阻塞 異常 Promises 工具 最近看了《七周七語言:理解多種編程泛型》,介紹了七種語言(四種編程泛型)的主要特性:基本語法,集合,並行/併發,其中就有 Scala。你不能指望這種書全面介紹,因為其中任何一門語言都夠寫一本書了~ 我比較關註並行/併發,但
本文內容
- 簡介
- Futures
- 阻塞
- 異常
- Promises
- 工具
最近看了《七周七語言:理解多種編程泛型》,介紹了七種語言(四種編程泛型)的主要特性:基本語法,集合,並行/併發,其中就有 Scala。你不能指望這種書全面介紹,因為其中任何一門語言都夠寫一本書了~
我比較關註並行/併發,但是書中關於 Scala 的併發部分——Actor,代碼編譯不通過,“Deprecated”,哎,這書點不負責,代碼也不寫採用編譯器的版本。於是就到 Scala 官網看了一下,即便是官網,也列出了對 Actor 的改進,有些已經不再使用了~
Java 在它的版本 8 之前,函數式編程實在太弱了,不然也不會出現像 Scala 這樣在 JVM 上運行,能夠與 Java 完美融合的語言(估計,Java 在函數式編程在這方面,太落後了,社區已經等不急了,而函數式編程最大的優點是——並行)。
本文來自 Scala 官網,完整示例代碼幾乎沒有,大部分是理論,雖然講解得很詳細,但看起來實在有點費勁。因此,你最好找點這方面完整示例再看看。
官網其實也有中文翻譯,但卻是機器翻譯的。
簡介
Future提供了一套高效非阻塞(non-blocking)的方式完成並行操作。其基本思想很簡單,所謂 Future,指的是一類占位符對象(placeholder object),用於指代某些尚未完成計算的結果。一般,由Future的計算結果都是並行執行的,計算完後再使用。以這種方式組織並行任務,便可以寫出高效、非同步、非阻塞的並行代碼。
預設情況,future 和 promise 利用回調(callback)的非阻塞方式,並不是採用典型的阻塞方式。為了在語法和概念層面簡化回調的使用,Scala 提供了 flatMap、foreach 和 filter 等運算元(combinator),使得我們能夠以非阻塞的方式對future進行組合。當然,future 對於那些必須使用阻塞的情況仍然支持阻塞操作,可以阻塞等待future(不過不鼓勵這樣做)。
一個典型的 future 如下所示:
val inverseFuture:Future[Matrix]=Future{fatMatrix.inverse()// non-blocking long lasting computation
}(executionContext)
或是更常用的:
implicit val ec:ExecutionContext=...
val inverseFuture :Future[Matrix]=Future{
fatMatrix.inverse()
}// ec is implicitly passed
這兩個代碼片段把 fatMatrix.inverse() 的執行委托給 ExecutionContext,在 inverseFuture
中體現計算結果。
Futures
所謂 Future,是一種用於指代某個尚未就緒的值的對象。這個值通常是某個計算過程的結果:
- 若該計算過程尚未完成,我們就說該Future未完成;
- 若該計算過程正常結束,或中途拋出異常,我們就說該Future已完成。
Future 完成分為兩種情況:
- 當Future帶著某個值而完成時,我們就說該Future帶著計算結果成功完成。
- 當Future帶著異常而完成時,計算過程中拋出的異常,我們就說Future因異常而失敗。
Future 具有一個重要的屬性——只能被賦值一次。一旦給定了某個值或某個異常,future對象就變成了不可變對象——無法再被改寫。
創建future對象最簡單的方法是調用future方法,開始非同步(asynchronous)計算,並返回保存有計算結果的futrue。一旦該future計算完成,其結果就變的可用。
註意,Future[T] 是一個類型,表示future對象,而future是一個方法,創建和調度一個非同步計算,並返回一個帶有計算結果的future對象。
下麵通過一個例子來展示。
假設,我們使用某個社交網路假想的API獲取某個用戶的朋友列表,我們將打開一個新對話(session),然後發送一個獲取特定用戶朋友列表的請求。
import scala.concurrent._
importExecutionContext.Implicits.globalval session = socialNetwork.createSessionFor("user", credentials)
val f:Future[List[Friend]]=Future{session.getFriends()}
上面,首先導入 scala.concurrent 包。然後,通過一個假想的 createSessionFor 方法初始化一個向伺服器發送請求 session 變數。這個請求是通過網路發送的,所以可能耗時很長。調用 getFriends 方法返回 List[Friend]。為了更好的利用CPU,知道響應到達,不應該阻塞(block)程式的其他部分,這個計算應該被非同步調度。future方法就是這樣做的,它並行地執行指定的計算塊,在這個例子中,向伺服器發送請求,等待響應。
一旦伺服器響應,future f 中的好友列表將變得可用。
失敗可能會導致一個 exception。在下麵的例子中,session 的值未被正確的初始化,於是,future 塊中計算將拋出一個 NullPointerException。這樣,future f 失敗了。
val session =null
val f:Future[List[Friend]]=Future{
session.getFriends
}
上面的 import ExecutionContext.Implicits.global
導入預設的全局執行上下文(global execution context)。執行上下文執行提交給他們的任務,你也可把執行上下文看作線程池,這對future方法是必不可少的,因為,它們處理如何和何時執行非同步計算。你可以定義自己的執行上下文,並用 future 使用,但現在,只需要知道你能夠通過上面的語句導入預設執行上下文就足夠了。
我們的例子是基於一個假想的社交網路 API,計算包含了發送網路請求和等待響應。下麵,假設你有一個文本文件,想找出一個特定詞第一次出現的位置。當磁碟正在檢索此文件時,這個計算過程可能會陷入阻塞,因此,並行執行程式的剩餘部分將很有意義。
val firstOccurrence:Future[Int]=Future{
val source = scala.io.Source.fromFile("e:\scala\myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
回調函數
現在,我們知道如何開始一個非同步計算來創建一個新的future值,但是我們沒有演示一旦此結果變得可用後如何使用。我們經常對計算結果感興趣而不僅僅是它的副作用(side-effects)。
在許多future的實現中,一旦future的客戶端對結果感興趣,它必須阻塞它自己的計算,並等待直到future完成——然後才能使用future的值繼續它自己的計算。雖然這在Scala Future API(在後面會展示)中是允許的,但從性能角度來看更好的辦法是完全非阻塞,即在future中註冊一個回調。一旦future完成,就非同步調用回調。如果當註冊回調,future已經完成,那麼,回調或是非同步執行,或在相同的線程中循序執行。
註冊回調最通常的形式,是使用OnComplete方法,即創建一個Try[T] => U
類型的回調函數。如果future成功完成,回調則會應用到Success[T]類型的值中,否則應用到 Failure[T]
類型的值中。
Try[T]
跟 Option[T]
或 Either[T, S]
相似,因為它是一個可能持有某種類型值的單子(monda)。然而,它是為持有一個值或異常對象特殊設計的。Option[T]
既可以是一個值(如:Some[T]
)也可以完全不是值(如:None
),如果Try[T]
獲得一個值是,那麼它是 Success[T]
,否則為持有異常的 Failure[T]
。 Failure[T]
有很多信息,不僅僅是關於為什麼沒有值 None。同時,也可以把 Try[T]
看作一種特殊版本的 Either[Throwable, T]
,特別是當左邊值為一個 Throwable 的情形。
“一個單子(Monad)說白了不過就是自函子範疇上的一個么半群而已。”這句話出自Haskell大神Philip Wadler,也是他提議把Monad引入Haskell。
回到我們社交網路的例子,假設,我們想獲取最近的帖子並顯示在屏幕上,可以通過調用 getRecentPosts 方法,它返回 List[String]:
import scala.util.{Success,Failure}
val f:Future[List[String]]=Future{
session.getRecentPosts
}
f onComplete {
caseSuccess(posts)=>for(post <- posts) println(post)
caseFailure(t)=> println("An error has occured: "+ t.getMessage)
}
onComplete 方法允許客戶處理失敗或成功的future 結果。對於成功,onSuccess 回調使用如下:
val f:Future[List[String]]=Future{
session.getRecentPosts
}
f
onSuccess {
case posts =>for(post <- posts
)
println(post)
}
對於失敗,onFailure 回調使用如下:
val f:Future[List[String]]=Future{
session.getRecentPosts
}
f onFailure {
case t => println("An error has occured: "+ t.getMessage)
}
f onSuccess {
case posts =>for(post <- posts) println(post)
}
onFailure 回調只有在 future 失敗,也就是包含一個異常時才會執行。
因為部分函數(partial functions)具有 isDefinedAt 方法, 所以,onFailure
方法只有為了特定 Throwable 而定義才會觸發。下麵的例子,已註冊的 onFailure
回調永遠不會被觸發:
val f =Future{
2/0
}
f onFailure {
case npe:NullPointerException=>
println("I'd be amazed if this printed out.")
}
回到前面例子,查找某個第一次出現的關鍵字,在屏幕上輸出該關鍵字的位置:
val firstOccurrence:Future[Int]=Future{
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
firstOccurrence onSuccess {
case idx => println("The keyword first appears at position: "+ idx)
}
firstOccurrence onFailure {
case t => println("Could not process file: "+ t.getMessage)
}
onComplete,、onSuccess 和 onFailure 方法都具有結果類型 Unit,這意味著這些回調方法不能被鏈接。註意,這種設計是為了避免鏈式調用可能隱含在已註冊回調上一個順序的執行(同一個 future 中註冊的回調是無序的)。
也就是說,我們現在應討論論何時調用回調。因為回調需要future 中的值是可用的,只有future完成後才能被調用。然而,不能保證被完成 future 的線程或創建回調的線程調用。反而, 回調有時會在future對象完成後被某個線程調用。我們可以說,回調最終會被執行。
更進一步,回調被執行的順序不是預先定義的,甚至在同一個應用程式。事實上,回調也許不是一個接一個連續調用的,但在同一時間併發調用。這意味著,下麵例子中,變數 totalA 也許不能從計算的文本中得到大小寫字母數量的正確值。
@volatilevar totalA =0
val text =Future{
"na"*16+"BATMAN!!!"
}
text onSuccess {
case txt => totalA += txt.count(_ =='a')
}
text onSuccess {
case txt => totalA += txt.count(_ =='A')
}
上面,兩個回調可能一個接一個地執行,變數 totalA 得到的預期值為 18。然而,它們也可能是併發執行,於是,totalA 最終可能是16或2,因為+= 不是一個原子操作(即它是由一個讀和一個寫的步驟組成,這樣就可能使其與其他的讀和寫任意交錯執行)。
考慮到完整性,回調的語義如下:
- 在 future 上註冊 onComplete 回調,要確保 future 執行完成後,相應的閉包(closure)最終被調用。
- 註冊 onSuccess 或 onFailure 回調,與 onComplete 語義一樣,不同的是,只有在 future 成功地或失敗地執行完後,才會調用。
- 在 future 上註冊一個已經完成的回調,將導致回調最終被執行。將最終導致一直處於執行狀態的回調(上面 1 所隱含的)。
- 在 future 上註冊多個回調時,這些回調的執行順序是不確定的。事實上,這些回調可能是並行執行的,然而,某個 ExecutionContext 執行可能導致明確的執行順序。
- 在某些回調拋出異常時,其他回調的執行不受影響。
- 在某些回調無法永遠無法結束時(例如,回調包含一個無限迴圈),其他回調可能完全不會執行。這種情況下,那些潛在的阻塞的回調需要使用阻塞結構。將在後面“阻塞”小節說明。
- 一旦執行完後,回調會從 future 對象中移除,這對垃圾回收機制(GC)很合適。
函數組合(Functional Composition)和For解構(For-Comprehensions)
儘管前文所展示的回調機制已經足夠把future的結果和後繼計算結合起來的,但是有時回調機制並不易於使用,且容易造成冗餘的代碼。可以通過一個例子來說明。假設,我們有一個用於進行貨幣交易服務的API,想要在有盈利的時候購進一些美元。讓我們先來看看怎樣用回調來解決這個問題:
val rateQuote =Future{
connection.getCurrentValue(USD)
}
rateQuote onSuccess {case quote =>