Akka(5): ConsistentHashing Router - 可選定Routee的任務分配模式

来源:http://www.cnblogs.com/tiger-xc/archive/2017/06/05/6945500.html
-Advertisement-
Play Games

上一篇討論里我們介紹了幾種任務分配(Routing)模式。Akka提供的幾種現成智能化Routing模式大多數是通過對用戶屏蔽具體的運算Routee選擇方式來簡化Router使用,提高智能程度,所以我們提到Router的運算是一種無序的運算,消息之間絕對不容許任何形式的依賴,因為向Router發送的 ...


    上一篇討論里我們介紹了幾種任務分配(Routing)模式。Akka提供的幾種現成智能化Routing模式大多數是通過對用戶屏蔽具體的運算Routee選擇方式來簡化Router使用,提高智能程度,所以我們提到Router的運算是一種無序的運算,消息之間絕對不容許任何形式的依賴,因為向Router發送的消息可能在任何Routee上運算。但是,如果我們能夠把運算任務按照任務的類型分配給專門負責處理此等類型任務的Routee,那麼我們就可以充分利用Routing模式所帶來的運算拓展能力來提高整體運算效率。Akka的ConsistentHashingRouter就是為了滿足這樣的需求而提供的。ConsistentHashingRouter是通過消息的特征來分辨消息類型,然後自動構建和管理處理各種類型消息的Routees。當然,這就要求系統的消息必須具備預先設定的特征,使ConsistentHashingRouter可以正確分辨並分配給指定的Routee去運算。如果我們確定只有一個Routee負責處理一種類型消息的話,甚至可以在這個Routee中維護某種狀態。我們可以設計一個場景來示範ConsistentHashingRouter的應用:模擬一個多貨幣的存錢盒,分n次隨意從盒裡取出錢幣然後統計各種貨幣的總額。這個場景中的特征很明顯:就是貨幣種類了,我們把抽出的貨幣按幣種、金額合成消息發給ConsistentHashingRouter。例子里的Routee應該是按照幣種由Router自動構建的,維護各種貨幣當前總額作為內部狀態。向ConsistentHashingRouter發送的消息被分配給相應幣種的Routee去登記更新貨幣當前總額。這個統計金額的Routee可以如下定義:

import akka.actor._

val currencies = List("RMB","USD","EUR","JPY","GBP","DEM","HKD","FRF","CHF")

object MoneyCounter {
  sealed trait Counting
  case class OneHand(cur: String, amt: Double) extends Counting
  case class ReportTotal(cur: String) extends Counting
}
class MoneyCounter extends Actor with ActorLogging {
  import MoneyCounter._
  var currency: String = "RMB"
  var amount: Double = 0

  override def receive: Receive = {
    case OneHand(cur,amt) => 
      currency = cur
      amount += amt
      log.info(s"${self.path.name} received one hand of $amt$cur")
    case ReportTotal(_) => 
      log.info(s"${self.path.name} has a total of $amount$currency")
  }
}

MoneyCounter支持兩項功能:一是統計某種貨幣收到的總額,二是按指令彙報當前總額。我們在前一篇討論里瞭解到如果MoneyCounter是Routee類型,那它們應該被視為具相同功能的Actor。而且用戶無法分辨或者直接面對某個特定的Routee。任何MoneyCounter都可以收到一手任何貨幣,不同的貨幣金額相加結果是錯誤的。所以我們要用Akka提供的ConsistentHashingRouter來解決這個問題。ConsistentHashingRouter的主要特點是能夠分辨消息類型,然後按照消息類型對應到選定的Routee。在我們上面的例子里每個Routee負責一種貨幣,這樣就可以保證每個Routee里的金額總數都是正確的了。ConsistentHashingRouter有三種分辨消息的方法:

1、定義ConsistentHashingRouter的hashMapping函數:這是個PartialFunction[Any,Any],如下:

object HashingRouter extends App {
 import MoneyCounter._

  val currencies = List("RMB","USD","EUR","JPY","GBP","DEM","HKD","FRF","CHF")

  val routerSystem = ActorSystem("routerSystem")

  def mcHashMapping: PartialFunction[Any,Any] = {
    case OneHand(cur,_) => cur
    case ReportTotal(cur) => cur
  }

  val router = routerSystem.actorOf(ConsistentHashingPool(
    nrOfInstances = 5,hashMapping = mcHashMapping,virtualNodesFactor = 2)
  .props(MoneyCounter.props),name = "moneyCounter" )


  router ! OneHand("RMB",10.00)
  router ! OneHand("USD",10.00)
  router ! OneHand("HKD",10.00)
  router ! OneHand("RMB",10.00)
  router ! OneHand("CHF",10.00)

  router ! ReportTotal("RMB")
  router ! ReportTotal("USD")

  scala.io.StdIn.readLine()

  routerSystem.terminate()
}

我們在定義router時直接把mcHashingMapping傳到ConsistentHashingPool的構建器里就行了。特別要註意nrOfInstances,這個參數必須比消息類型的數量大才行,否則Router會錯誤引導消息。測試運算結果顯示如下:

INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-6] [akka://routerSystem/user/moneyCounter/$e] $e received one hand of 10.0RMB
[INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-3] [akka://routerSystem/user/moneyCounter/$b] $b received one hand of 10.0USD
[INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-7] [akka://routerSystem/user/moneyCounter/$d] $d received one hand of 10.0CHF
[INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-2] [akka://routerSystem/user/moneyCounter/$a] $a received one hand of 10.0HKD
[INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-6] [akka://routerSystem/user/moneyCounter/$e] $e received one hand of 10.0RMB
[INFO] [06/05/2017 15:20:09.337] [routerSystem-akka.actor.default-dispatcher-2] [akka://routerSystem/user/moneyCounter/$b] $b has a total of 10.0USD
[INFO] [06/05/2017 15:20:09.337] [routerSystem-akka.actor.default-dispatcher-6] [akka://routerSystem/user/moneyCounter/$e] $e has a total of 20.0RMB

Router自動調用了e,b,d,a4個Routees,並且能把消息引導到正確的Routee。

2、可以讓消息繼承ConsistentHashable,如此我們要在消息里實現函數constentHashKey, 如下:

object MoneyCounter {
  sealed class Counting(cur: String) extends ConsistentHashable {
    override def consistentHashKey: Any = cur
  }
  case class OneHand(cur: String, amt: Double) extends Counting(cur)
  case class ReportTotal(cur: String) extends Counting(cur)
  def props = Props(new MoneyCounter)
}

現在消息都是ConsistentHashable類型的了。構建新的Router來測試效果:

  val router = routerSystem.actorOf(ConsistentHashingPool(
    nrOfInstances = 5, virtualNodesFactor = 2).props(
    MoneyCounter.props),name = "moneyCounter")


  router ! OneHand("RMB",10.00)
  router ! OneHand("USD",10.00)
  router ! OneHand("HKD",10.00)
  router ! OneHand("RMB",10.00)
  router ! OneHand("CHF",10.00)

  router ! ReportTotal("RMB")
  router ! ReportTotal("USD")

運算結果同樣正確:

[INFO] [06/05/2017 15:36:29.746] [routerSystem-akka.actor.default-dispatcher-7] [akka://routerSystem/user/moneyCounter/$e] $e received one hand of 10.0RMB
[INFO] [06/05/2017 15:36:29.746] [routerSystem-akka.actor.default-dispatcher-5] [akka://routerSystem/user/moneyCounter/$b] $b received one hand of 10.0USD
[INFO] [06/05/2017 15:36:29.746] [routerSystem-akka.actor.default-dispatcher-6] [akka://routerSystem/user/moneyCounter/$a] $a received one hand of 10.0HKD
[INFO] [06/05/2017 15:36:29.746] [routerSystem-akka.actor.default-dispatcher-4] [akka://routerSystem/user/moneyCounter/$d] $d received one hand of 10.0CHF
[INFO] [06/05/2017 15:36:29.746] [routerSystem-akka.actor.default-dispatcher-7] [akka://routerSystem/user/moneyCounter/$e] $e received one hand of 10.0RMB
[INFO] [06/05/2017 15:36:29.749] [routerSystem-akka.actor.default-dispatcher-7] [akka://routerSystem/user/moneyCounter/$e] $e has a total of 20.0RMB
[INFO] [06/05/2017 15:36:29.749] [routerSystem-akka.actor.default-dispatcher-4] [akka://routerSystem/user/moneyCounter/$b] $b has a total of 10.0USD

3、直接把消息包在ConsistentHashableEnvelope里:

  router ! ConsistentHashableEnvelope(message = OneHand("RMB",23.00),hashKey = "RMB")

這種方式需要用戶手工指定Routee,如果用這種方式,我們其實不必用Router,直接把消息傳給專職的Actor就行了。

看來還是第二種方法比較合適。因為比起第一種方法多了類型安全和與Router的鬆散耦合。下麵就是一個用第二種方法的完整示範源代碼:

import akka.actor._
import akka.routing.ConsistentHashingRouter.{ConsistentHashMapping, ConsistentHashable, ConsistentHashableEnvelope}
import akka.routing._


object MoneyCounter {
  sealed class Counting(cur: String) extends ConsistentHashable {
    override def consistentHashKey: Any = cur
  }
  case class OneHand(cur: String, amt: Double) extends Counting(cur)
  case class ReportTotal(cur: String) extends Counting(cur)
  def props = Props(new MoneyCounter)
}
class MoneyCounter extends Actor with ActorLogging {
  import MoneyCounter._
  var currency: String = "RMB"
  var amount: Double = 0

  override def receive: Receive = {
    case OneHand(cur,amt) =>
      currency = cur
      amount += amt
      log.info(s"${self.path.name} received one hand of $amt$cur")
    case ReportTotal(_) =>
      log.info(s"${self.path.name} has a total of $amount$currency")
  }
}
object HashingRouter extends App {
 import MoneyCounter._
  import scala.util.Random

  val currencies = List("RMB","USD","EUR","JPY","GBP","DEM","HKD","FRF","CHF")

  val routerSystem = ActorSystem("routerSystem")

  val router = routerSystem.actorOf(ConsistentHashingPool(
    nrOfInstances = currencies.size+1, virtualNodesFactor = 2).props(
    MoneyCounter.props),name = "moneyCounter")

  (1 to 20).toList foreach (_ => router ! OneHand(
    currencies(Random.nextInt(currencies.size-1))
  ,Random.nextInt(100) * 1.00))

  currencies foreach (c => router ! ReportTotal(c))

  scala.io.StdIn.readLine()

  routerSystem.terminate()
}

 

 

 

 

 

 

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 有時候我們需要寫一些簡單的性能測試代碼,恰好在stackoverflow上看到一篇經驗之談,https://stackoverflow.com/questions/504103/how do i write a correct micro benchmark in java, 怎樣寫基準測試來儘量屏 ...
  • 今天想看HttpServlet的源碼,關聯了tomcat版本的javaee源碼後,按F3依然無法看到源碼,後來查資料發現原來還需要下載編譯tomcat源碼的jar包,地址如下http://download.csdn.net/download/come_on_ha/9555502 裡面總共4個jar包 ...
  • 前言 Bottle是一個Python Web框架。整個框架只有一個文件,不到4k行的代碼,沒有Python標準庫以外的依賴,卻包含了路由、模板和插件等Web框架常用功能。通過閱讀Bottle源碼來瞭解什麼是Web框架和Web框架是怎麼工作是再合適不過了。由於Bottle是一個支持WSGI的框架,在閱 ...
  • 這個錯誤 怎麼解決? ...
  • 在使用Alamofire進行網路請求的時候,相信大部分的同學都會封裝一個抽象的NetworkLayer,如"APIManager" 或者 "NetworkModel"等等。但是位置業務功能增加,會漸漸混合各種請求,不夠清晰,而Moya能很好地解決這類問題。Moya在Alamofire基礎上進行封裝, ...
  • 在初學Java時,可能會經常碰到下麵的代碼: 1 String str1 = new String("hello"); 2 String str2 = new String("hello"); 3 4 System.out.println(str1==str2); 5 System.out.prin ...
  • 類是指:描述一種事物的定義,是個抽象的概念 實例指:該種事物的一個具體的個體,是具體的東西 打個比方: “人”是一個類。“張三”是人類的一個具體例子 在編程時也是同樣的道理,你先自己定義一個“類”,當你需要用時,用“類”的定義來創造一個具體的例子。 用類的定義來創建一個實例,就叫做類的實例化。實例化 ...
  • 一.創建項目 命令:django-admin startproject mysite mysite ├── manage.py └── mysite ├── __init__.py ├── settings.py ├── urls.py └── wsgi.py 1.manage.py 與Django ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...