上一篇討論里我們介紹了幾種任務分配(Routing)模式。Akka提供的幾種現成智能化Routing模式大多數是通過對用戶屏蔽具體的運算Routee選擇方式來簡化Router使用,提高智能程度,所以我們提到Router的運算是一種無序的運算,消息之間絕對不容許任何形式的依賴,因為向Router發送的 ...
上一篇討論里我們介紹了幾種任務分配(Routing)模式。Akka提供的幾種現成智能化Routing模式大多數是通過對用戶屏蔽具體的運算Routee選擇方式來簡化Router使用,提高智能程度,所以我們提到Router的運算是一種無序的運算,消息之間絕對不容許任何形式的依賴,因為向Router發送的消息可能在任何Routee上運算。但是,如果我們能夠把運算任務按照任務的類型分配給專門負責處理此等類型任務的Routee,那麼我們就可以充分利用Routing模式所帶來的運算拓展能力來提高整體運算效率。Akka的ConsistentHashingRouter就是為了滿足這樣的需求而提供的。ConsistentHashingRouter是通過消息的特征來分辨消息類型,然後自動構建和管理處理各種類型消息的Routees。當然,這就要求系統的消息必須具備預先設定的特征,使ConsistentHashingRouter可以正確分辨並分配給指定的Routee去運算。如果我們確定只有一個Routee負責處理一種類型消息的話,甚至可以在這個Routee中維護某種狀態。我們可以設計一個場景來示範ConsistentHashingRouter的應用:模擬一個多貨幣的存錢盒,分n次隨意從盒裡取出錢幣然後統計各種貨幣的總額。這個場景中的特征很明顯:就是貨幣種類了,我們把抽出的貨幣按幣種、金額合成消息發給ConsistentHashingRouter。例子里的Routee應該是按照幣種由Router自動構建的,維護各種貨幣當前總額作為內部狀態。向ConsistentHashingRouter發送的消息被分配給相應幣種的Routee去登記更新貨幣當前總額。這個統計金額的Routee可以如下定義:
import akka.actor._
val currencies = List("RMB","USD","EUR","JPY","GBP","DEM","HKD","FRF","CHF")
object MoneyCounter {
sealed trait Counting
case class OneHand(cur: String, amt: Double) extends Counting
case class ReportTotal(cur: String) extends Counting
}
class MoneyCounter extends Actor with ActorLogging {
import MoneyCounter._
var currency: String = "RMB"
var amount: Double = 0
override def receive: Receive = {
case OneHand(cur,amt) =>
currency = cur
amount += amt
log.info(s"${self.path.name} received one hand of $amt$cur")
case ReportTotal(_) =>
log.info(s"${self.path.name} has a total of $amount$currency")
}
}
MoneyCounter支持兩項功能:一是統計某種貨幣收到的總額,二是按指令彙報當前總額。我們在前一篇討論里瞭解到如果MoneyCounter是Routee類型,那它們應該被視為具相同功能的Actor。而且用戶無法分辨或者直接面對某個特定的Routee。任何MoneyCounter都可以收到一手任何貨幣,不同的貨幣金額相加結果是錯誤的。所以我們要用Akka提供的ConsistentHashingRouter來解決這個問題。ConsistentHashingRouter的主要特點是能夠分辨消息類型,然後按照消息類型對應到選定的Routee。在我們上面的例子里每個Routee負責一種貨幣,這樣就可以保證每個Routee里的金額總數都是正確的了。ConsistentHashingRouter有三種分辨消息的方法:
1、定義ConsistentHashingRouter的hashMapping函數:這是個PartialFunction[Any,Any],如下:
object HashingRouter extends App {
import MoneyCounter._
val currencies = List("RMB","USD","EUR","JPY","GBP","DEM","HKD","FRF","CHF")
val routerSystem = ActorSystem("routerSystem")
def mcHashMapping: PartialFunction[Any,Any] = {
case OneHand(cur,_) => cur
case ReportTotal(cur) => cur
}
val router = routerSystem.actorOf(ConsistentHashingPool(
nrOfInstances = 5,hashMapping = mcHashMapping,virtualNodesFactor = 2)
.props(MoneyCounter.props),name = "moneyCounter" )
router ! OneHand("RMB",10.00)
router ! OneHand("USD",10.00)
router ! OneHand("HKD",10.00)
router ! OneHand("RMB",10.00)
router ! OneHand("CHF",10.00)
router ! ReportTotal("RMB")
router ! ReportTotal("USD")
scala.io.StdIn.readLine()
routerSystem.terminate()
}
我們在定義router時直接把mcHashingMapping傳到ConsistentHashingPool的構建器里就行了。特別要註意nrOfInstances,這個參數必須比消息類型的數量大才行,否則Router會錯誤引導消息。測試運算結果顯示如下:
INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-6] [akka://routerSystem/user/moneyCounter/$e] $e received one hand of 10.0RMB
[INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-3] [akka://routerSystem/user/moneyCounter/$b] $b received one hand of 10.0USD
[INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-7] [akka://routerSystem/user/moneyCounter/$d] $d received one hand of 10.0CHF
[INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-2] [akka://routerSystem/user/moneyCounter/$a] $a received one hand of 10.0HKD
[INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-6] [akka://routerSystem/user/moneyCounter/$e] $e received one hand of 10.0RMB
[INFO] [06/05/2017 15:20:09.337] [routerSystem-akka.actor.default-dispatcher-2] [akka://routerSystem/user/moneyCounter/$b] $b has a total of 10.0USD
[INFO] [06/05/2017 15:20:09.337] [routerSystem-akka.actor.default-dispatcher-6] [akka://routerSystem/user/moneyCounter/$e] $e has a total of 20.0RMB
Router自動調用了e,b,d,a4個Routees,並且能把消息引導到正確的Routee。
2、可以讓消息繼承ConsistentHashable,如此我們要在消息里實現函數constentHashKey, 如下:
object MoneyCounter {
sealed class Counting(cur: String) extends ConsistentHashable {
override def consistentHashKey: Any = cur
}
case class OneHand(cur: String, amt: Double) extends Counting(cur)
case class ReportTotal(cur: String) extends Counting(cur)
def props = Props(new MoneyCounter)
}
現在消息都是ConsistentHashable類型的了。構建新的Router來測試效果:
val router = routerSystem.actorOf(ConsistentHashingPool(
nrOfInstances = 5, virtualNodesFactor = 2).props(
MoneyCounter.props),name = "moneyCounter")
router ! OneHand("RMB",10.00)
router ! OneHand("USD",10.00)
router ! OneHand("HKD",10.00)
router ! OneHand("RMB",10.00)
router ! OneHand("CHF",10.00)
router ! ReportTotal("RMB")
router ! ReportTotal("USD")
運算結果同樣正確:
[INFO] [06/05/2017 15:36:29.746] [routerSystem-akka.actor.default-dispatcher-7] [akka://routerSystem/user/moneyCounter/$e] $e received one hand of 10.0RMB [INFO] [06/05/2017 15:36:29.746] [routerSystem-akka.actor.default-dispatcher-5] [akka://routerSystem/user/moneyCounter/$b] $b received one hand of 10.0USD [INFO] [06/05/2017 15:36:29.746] [routerSystem-akka.actor.default-dispatcher-6] [akka://routerSystem/user/moneyCounter/$a] $a received one hand of 10.0HKD [INFO] [06/05/2017 15:36:29.746] [routerSystem-akka.actor.default-dispatcher-4] [akka://routerSystem/user/moneyCounter/$d] $d received one hand of 10.0CHF [INFO] [06/05/2017 15:36:29.746] [routerSystem-akka.actor.default-dispatcher-7] [akka://routerSystem/user/moneyCounter/$e] $e received one hand of 10.0RMB [INFO] [06/05/2017 15:36:29.749] [routerSystem-akka.actor.default-dispatcher-7] [akka://routerSystem/user/moneyCounter/$e] $e has a total of 20.0RMB [INFO] [06/05/2017 15:36:29.749] [routerSystem-akka.actor.default-dispatcher-4] [akka://routerSystem/user/moneyCounter/$b] $b has a total of 10.0USD
3、直接把消息包在ConsistentHashableEnvelope里:
router ! ConsistentHashableEnvelope(message = OneHand("RMB",23.00),hashKey = "RMB")
這種方式需要用戶手工指定Routee,如果用這種方式,我們其實不必用Router,直接把消息傳給專職的Actor就行了。
看來還是第二種方法比較合適。因為比起第一種方法多了類型安全和與Router的鬆散耦合。下麵就是一個用第二種方法的完整示範源代碼:
import akka.actor._
import akka.routing.ConsistentHashingRouter.{ConsistentHashMapping, ConsistentHashable, ConsistentHashableEnvelope}
import akka.routing._
object MoneyCounter {
sealed class Counting(cur: String) extends ConsistentHashable {
override def consistentHashKey: Any = cur
}
case class OneHand(cur: String, amt: Double) extends Counting(cur)
case class ReportTotal(cur: String) extends Counting(cur)
def props = Props(new MoneyCounter)
}
class MoneyCounter extends Actor with ActorLogging {
import MoneyCounter._
var currency: String = "RMB"
var amount: Double = 0
override def receive: Receive = {
case OneHand(cur,amt) =>
currency = cur
amount += amt
log.info(s"${self.path.name} received one hand of $amt$cur")
case ReportTotal(_) =>
log.info(s"${self.path.name} has a total of $amount$currency")
}
}
object HashingRouter extends App {
import MoneyCounter._
import scala.util.Random
val currencies = List("RMB","USD","EUR","JPY","GBP","DEM","HKD","FRF","CHF")
val routerSystem = ActorSystem("routerSystem")
val router = routerSystem.actorOf(ConsistentHashingPool(
nrOfInstances = currencies.size+1, virtualNodesFactor = 2).props(
MoneyCounter.props),name = "moneyCounter")
(1 to 20).toList foreach (_ => router ! OneHand(
currencies(Random.nextInt(currencies.size-1))
,Random.nextInt(100) * 1.00))
currencies foreach (c => router ! ReportTotal(c))
scala.io.StdIn.readLine()
routerSystem.terminate()
}