akka-typed(0) - typed-actor, typed messages

来源:https://www.cnblogs.com/tiger-xc/archive/2020/05/26/12968756.html
-Advertisement-
Play Games

akka 2.6.x正式發佈以來已經有好一段時間了。核心變化是typed-actor的正式啟用,當然persistence,cluster等模塊也有較大變化。一開始從名稱估摸就是把傳統any類型的消息改成強類型消息,所以想拖一段時間看看到底能對我們現有基於akka-classic的應用軟體有什麼深層 ...


   akka 2.6.x正式發佈以來已經有好一段時間了。核心變化是typed-actor的正式啟用,當然persistence,cluster等模塊也有較大變化。一開始從名稱估摸就是把傳統any類型的消息改成強類型消息,所以想拖一段時間看看到底能對我們現有基於akka-classic的應用軟體有什麼深層次的影響。不過最近考慮的一些系統架構逼的我不得不立即開始akka-typed的調研,也就是說akka-classic已經無法或者很困難去實現新的系統架構,且聽我道來:最近在考慮一個微服務中台。作為後臺數據服務調用的唯一入口,平臺應該是個分散式軟體,那麼採用akka-cluster目前是唯一的選擇,畢竟前期搞過很多基於akka-cluster的應用軟體。但是,akka-cluster-sharding只能支持一種entity actor。畢竟,由於akka-classic的消息是沒有類型的,只能在收到消息後再通過類型模式匹配的方式確定應該運行的代碼。所以,這個actor必須包括所有的業務邏輯處理運算。也就是說對於一個大型應用來說這就是一塊巨型代碼。還有,如果涉及到維護actor狀態的話,比如persistenceActor,或者綜合類型業務運算,那麼又需要多少種類的數據結構,又怎樣去維護、管理這些結構呢?對我來說這基本上是mission-impossible。實際上logom應該正符合這個中台的要求:cluster-sharding, CQRS... 抱著一種好奇的心態瞭解了一下lagom源碼,忽然恍然大悟:這個東西是基於akka-typed的!想想看也是:如果我們可以把actor和消息類型綁在一起,那麼我們就可以通過消息類型對應到某種actor。也就是說基於akka-typed,我們可以把綜合性的業務劃分成多個actor模塊,然後我們可以指定那種actor做那些事情。當然,經過了功能細分,actor的設計也簡單了許多。現在這個新的中台可以實現前臺應用直接調用對應的actor處理業務了。不用多想了,這註定就是akka應用的將來,還等什麼呢?

先從一個最簡單的hello程式開始吧:基本上是兩個actor相互交換消息。先用第一個來示範標準的actor構建過程:

 

  object HelloActor {
    sealed trait Request
    case class Greeting(fromWhom: String, replyTo: ActorRef[Greeter.Greeted]) extends Request

    def apply(): Behavior[Greeting] = {
      Behaviors.receive { (ctx, greeter) =>
        ctx.log.info("receive greeting from {}", greeter.fromWhom)
        greeter.replyTo ! Greeter.Greeted(s"hello ${greeter.fromWhom}!")
        Behaviors.same
      }
    }
  }

 

akka-typed的actor構建是通過定義它的Behavior行為實現的。特別的是類型參數Behavior[Greeting],代表這個actor只處理Greeting類型的消息,因而是個typed-actor。akka-typed已經不支持sender()了,在消息里自帶,如Greeting.replyTo。Behavior定義是通過工廠模式Behaviors實現的,看看Behaviors的定義:

 

/**
 * Factories for [[akka.actor.typed.Behavior]].
 */
object Behaviors {
  def setup[T](factory: ActorContext[T] => Behavior[T]): Behavior[T] 

  def withStash[T](capacity: Int)(factory: StashBuffer[T] => Behavior[T]): Behavior[T] 

  def same[T]: Behavior[T] 

  def unhandled[T]: Behavior[T] 

  def stopped[T]: Behavior[T] 

  def stopped[T](postStop: () => Unit): Behavior[T]

  def empty[T]: Behavior[T]

  def ignore[T]: Behavior[T] 

  def receive[T](onMessage: (ActorContext[T], T) => Behavior[T]): Receive[T]

  def receiveMessage[T](onMessage: T => Behavior[T]): Receive[T]

  def receivePartial[T](onMessage: PartialFunction[(ActorContext[T], T), Behavior[T]]): Receive[T] 
 
  def receiveMessagePartial[T](onMessage: PartialFunction[T, Behavior[T]]): Receive[T] 

  def receiveSignal[T](handler: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Behavior[T] 

  def supervise[T](wrapped: Behavior[T]): Supervise[T] 

  def withTimers[T](factory: TimerScheduler[T] => Behavior[T]): Behavior[T] 
 
 ...

}

上面的構建函數除返回Behavior[T]外還有Receive[T]和Supervise[T],這兩個類型是什麼?它們還是Behavior[T]:

  trait Receive[T] extends Behavior[T] {
    def receiveSignal(onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Behavior[T]
  }


  def supervise[T](wrapped: Behavior[T]): Supervise[T] =
    new Supervise[T](wrapped)

  private final val ThrowableClassTag = ClassTag(classOf[Throwable])
  final class Supervise[T] private[akka] (val wrapped: Behavior[T]) extends AnyVal {

    /** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. */
    def onFailure[Thr <: Throwable: ClassTag](strategy: SupervisorStrategy): Behavior[T] = {
      val tag = classTag[Thr]
      val effectiveTag = if (tag == ClassTag.Nothing) ThrowableClassTag else tag
      Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag)
    }
  }

註意,Supervise.onFailure返回了Behavior[T]。

helloActor的Behavior是通過Behaviors.receive構建的。還可以用setup,receiveMessage來構建。註意:構建函數的入參數也是Behavior[T],所以這些構造器可以一層層嵌套著使用。setup,receive為函數內層提供了ActorContext, withTimers提供TimerScheduler[T]。那麼我可以把HelloActor的功能再完善點,加個監管策略SupervisorStrategy:

  object HelloActor {
    sealed trait Request
    case class Greeting(fromWhom: String, replyTo: ActorRef[Greeter.Greeted]) extends Request

    def apply(): Behavior[Greeting] = {
      Behaviors.supervise(
        Behaviors.receive[Greeting] { (ctx, greeter) =>
          ctx.log.info("receive greeting from {}", greeter.fromWhom)
          greeter.replyTo ! Greeter.Greeted(s"hello ${greeter.fromWhom}!")
          Behaviors.same
        }
      ).onFailure(SupervisorStrategy.restartWithBackoff(10.seconds, 1.minute, 0.20))
    }
  }

在akka-typed里,actor監管已經從父輩轉到自身。再就是增加了BackOff-SupervisorStrategy,不需要獨立的BackOffSupervisor actor了。

再看看另一個Greeter:

 object Greeter {

    sealed trait Response
    case class Greeted(hello: String) extends Response
    
    def apply(): Behavior[Greeted] = {
      Behaviors.setup ( ctx =>
        Behaviors.receiveMessage { message =>
          ctx.log.info(message.hello)
          Behaviors.same
        }
      )
    }
  }

這個跟HelloActor沒什麼不同,不過用了setup,receiveMessage套裝。值得註意的是Greeter負責處理Greeted消息,這是一個不帶sender ActorRef的類型,意味著處理這類消息後不需要答覆消息發送者。

然後還需要一個actor來構建上面兩個actor實例,啟動對話:

 object GreetStarter {
    sealed trait Command
    case class SayHiTo(whom: String) extends Command
    case class RepeatedGreeting(whom: String, interval: FiniteDuration) extends Command

    def apply(): Behavior[Command] = {
      Behaviors.setup[Command] { ctx =>
        val helloActor = ctx.spawn(HelloActor(), "hello-actor")
        val greeter = ctx.spawn(Greeter(), "greeter")
        Behaviors.withTimers { timer =>
          new GreetStarter(
            helloActor,greeter,ctx,timer)
            .repeatGreeting(1,3)
        }
      }
    }
  }
  class GreetStarter private (
     helloActor: ActorRef[HelloActor.Greeting],
     greeter: ActorRef[Greeter.Greeted],
     ctx: ActorContext[GreetStarter.Command],
     timer: TimerScheduler[GreetStarter.Command]){
    import GreetStarter._

    private def repeatGreeting(count: Int, max: Int): Behavior[Command] =
       Behaviors.receiveMessage { msg =>
         msg match {
           case RepeatedGreeting(whom, interval) =>
             ctx.log.info2("start greeting to {} with interval {}", whom, interval)
             timer.startSingleTimer(SayHiTo(whom), interval)
             Behaviors.same
           case SayHiTo(whom) =>
             ctx.log.info2("{}th time greeting to {}",count,whom)
             if (max == count)
               Behaviors.stopped
             else {
               helloActor ! HelloActor.Greeting(whom, greeter)
               repeatGreeting(count + 1, max)
             }
         }
       }
  }

上面這個例子有點複雜,邏輯也有些問題,主要是為了示範一種函數式actor構建模式及actor狀態轉換虛構出來的。akka-typed已經不再支持become方法了。

最後,需要一個相當於main這麼一個頂層的程式:

  def main(args: Array[String]) {
    val man: ActorSystem[GreetStarter.Command] = ActorSystem(GreetStarter(), "greetDemo")
    man ! GreetStarter.RepeatedGreeting("Tiger",5.seconds)
    man ! GreetStarter.RepeatedGreeting("Peter",5.seconds)
    man ! GreetStarter.RepeatedGreeting("Susanna",5.seconds)
  }

akka-classic的頂級actor,即: /users是由系統預設創建的。akka-typed需要用戶提供這個頂層actor。這個是在ActorSystem的第一個參數指定的。我們再看看akka-typed的ActorSystem的構建函數:

object ActorSystem {

  /**
   * Scala API: Create an ActorSystem
   */
  def apply[T](guardianBehavior: Behavior[T], name: String): ActorSystem[T] =
    createInternal(name, guardianBehavior, Props.empty, ActorSystemSetup.create(BootstrapSetup()))

  /**
   * Scala API: Create an ActorSystem
   */
  def apply[T](guardianBehavior: Behavior[T], name: String, config: Config): ActorSystem[T] =
    createInternal(name, guardianBehavior, Props.empty, ActorSystemSetup.create(BootstrapSetup(config)))

  /**
   * Scala API: Create an ActorSystem
   */
  def apply[T](guardianBehavior: Behavior[T], name: String, config: Config, guardianProps: Props): ActorSystem[T] =
    createInternal(name, guardianBehavior, guardianProps, ActorSystemSetup.create(BootstrapSetup(config)))
...
}

其中一個apply與akka-classic的ActorSystem構建方式很相似:

  def main(args: Array[String]) {
    val config = ConfigFactory.load("application.conf")
    val man: ActorSystem[GreetStarter.Command] = ActorSystem(GreetStarter(), "greetDemo",config)
    man ! GreetStarter.RepeatedGreeting("Tiger",5.seconds)
    man ! GreetStarter.RepeatedGreeting("Peter",5.seconds)
    man ! GreetStarter.RepeatedGreeting("Susanna",5.seconds)
  }

下麵是本次討論的完整源代碼:

build.sbt

name := "learn-akka-typed"

version := "0.1"

scalaVersion := "2.13.2"

lazy val akkaVersion = "2.6.5"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor-typed"            % akkaVersion,
  "ch.qos.logback"     % "logback-classic"             % "1.2.3"
)

fork in Test := true

Lesson01.scala

import akka.actor.typed._
import scaladsl._
import scala.concurrent.duration._
import com.typesafe.config._
object Lesson01 {

  object HelloActor {
    sealed trait Request
    case class Greeting(fromWhom: String, replyTo: ActorRef[Greeter.Greeted]) extends Request

    def apply(): Behavior[Greeting] = {
      Behaviors.supervise(
        Behaviors.receive[Greeting] { (ctx, greeter) =>
          ctx.log.info("receive greeting from {}", greeter.fromWhom)
          greeter.replyTo ! Greeter.Greeted(s"hello ${greeter.fromWhom}!")
          Behaviors.same
        }
      ).onFailure(SupervisorStrategy.restartWithBackoff(10.seconds, 1.minute, 0.20))
    }
  }

  object Greeter {

    sealed trait Response
    case class Greeted(hello: String) extends Response

    def apply(): Behavior[Greeted] = {
      Behaviors.setup ( ctx =>
        Behaviors.receiveMessage { message =>
          ctx.log.info(message.hello)
          Behaviors.same
        }
      )
    }
  }

  object GreetStarter {
    sealed trait Command
    case class SayHiTo(whom: String) extends Command
    case class RepeatedGreeting(whom: String, interval: FiniteDuration) extends Command

    def apply(): Behavior[Command] = {
      Behaviors.setup[Command] { ctx =>
        val helloActor = ctx.spawn(HelloActor(), "hello-actor")
        val greeter = ctx.spawn(Greeter(), "greeter")
        Behaviors.withTimers { timer =>
          new GreetStarter(
            helloActor,greeter,ctx,timer)
            .repeatGreeting(1,3)
        }
      }
    }
  }
  class GreetStarter private (
     helloActor: ActorRef[HelloActor.Greeting],
     greeter: ActorRef[Greeter.Greeted],
     ctx: ActorContext[GreetStarter.Command],
     timer: TimerScheduler[GreetStarter.Command]){
    import GreetStarter._

    private def repeatGreeting(count: Int, max: Int): Behavior[Command] =
       Behaviors.receiveMessage { msg =>
         msg match {
           case RepeatedGreeting(whom, interval) =>
             ctx.log.info2("start greeting to {} with interval {}", whom, interval)
             timer.startSingleTimer(SayHiTo(whom), interval)
             Behaviors.same
           case SayHiTo(whom) =>
             ctx.log.info2("{}th time greeting to {}",count,whom)
             if (max == count)
               Behaviors.stopped
             else {
               helloActor ! HelloActor.Greeting(whom, greeter)
               repeatGreeting(count + 1, max)
             }
         }
       }
  }


  def main(args: Array[String]) {
    val config = ConfigFactory.load("application.conf")
    val man: ActorSystem[GreetStarter.Command] = ActorSystem(GreetStarter(), "greetDemo",config)
    man ! GreetStarter.RepeatedGreeting("Tiger",5.seconds)
    man ! GreetStarter.RepeatedGreeting("Peter",5.seconds)
    man ! GreetStarter.RepeatedGreeting("Susanna",5.seconds)
  }
}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 簡介 scalable open financial architecture stack , 可擴展開放的金融架構棧; github: https://github.com/sofastack/sofa-boot 快速構建金融級別雲原生架構的中間件。 特點: 開放,金融級別,雲原生; 微服務體系組 ...
  • 項目簡介 項目來源於:https://gitee.com/gepanjiang/LibrarySeats 因原gitee倉庫無資料庫文件且存在水印,經過本人修改,現將該倉庫重新上傳至個人gitee倉庫。 https://gitee.com/coderzcr/LibrarySeats 本系統基於JSP ...
  • 9.條件語句 9.1 基本語法 在編程語言中,會經常碰到在不同條件下完成不同的操作功能,在Python中僅提供了if-elfif...else等條件語句,並未提供其他語言中的switch語句(如果深刻字典,也可以用字典實現switch功能),其基本語法格式如下所示: 1.基本格式 if condit ...
  • 8.數據嵌套 在Python中,各種數據是可以相互嵌套的,如列表中嵌套元組、整型、字典等,字典中也可以嵌套元組、列表等,甚至可以嵌套自身。使用起來非常靈活。這種嵌套可以在實際項目中靈活運用各種數據類型進行嵌套。示例如下所示: a=[ 1, 23.45, "name", ("name","age"), ...
  • 一個經典的程式猿名言是:”如果只能有一種數據結構,那就用哈希表吧。“ ...
  • 7.集合 集合的主要特性如下所示: 1.集合中不會存在重覆元素,天生自帶去重功能 2.集合可使用{item1,item2,...itemn}或set()進行定義,如果要定義一個空的集合,必須使用set()函數 3.使用set()函數定義集合時,裡面的參數必須為列表或元組 4.集合是無序的 7.1 常 ...
  • 6.37(格式化整數)使用下麵的方法頭編寫一個方法,用於將整數格式化為指定寬度: public static String format(int number, int width) 方法為數字number返回一個帶有一個或多個以0作為首碼的字元串。字元串的位數就是寬度。比如,format(34,4 ...
  • 天下武功,唯快不破,雖然支持C/C++ 開發工具(俗稱:IDE)有很多,但是在團隊項目開發中使用最多的還是Visual Studio(簡稱VS),好用而且功能強大,畢竟親爸爸是微軟! 現在Visual Studio 已經更新到VS2019,VS 支持開發人員編寫跨平臺的應用程式,從 Windows ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...