akka-typed(1) - actor生命周期管理

来源:https://www.cnblogs.com/tiger-xc/archive/2020/05/27/12976199.html
-Advertisement-
Play Games

akka-typed的actor從創建、啟用、狀態轉換、停用、監視等生命周期管理方式和akka-classic還是有一定的不同之處。這篇我們就介紹一下akka-typed的actor生命周期管理。 每一種actor都是通過定義它的行為屬性behavior形成模版,然後由對上一層的父輩actor用sp ...


   akka-typed的actor從創建、啟用、狀態轉換、停用、監視等生命周期管理方式和akka-classic還是有一定的不同之處。這篇我們就介紹一下akka-typed的actor生命周期管理。

每一種actor都是通過定義它的行為屬性behavior形成模版,然後由對上一層的父輩actor用spawn方法產生actor實例的。產生的actor實例加入一個系統的由上至下樹形結構,直接在spawn產生自己的父輩之下。akka-typed的守護guardian-actor,即根部root-actor是通過在定義ActorSystem時指定並產生的。如下:

    val config = ConfigFactory.load("application.conf")
    val man: ActorSystem[GreetStarter.Command] = ActorSystem(GreetStarter(), "greetDemo",config)
    man ! GreetStarter.RepeatedGreeting("Tiger",1.seconds)

在某種意義上,這個ActorSystem實例man就代表root-actor。我們可以向man發送消息然後由GreetStarter的behavior用自己的ActorContext進行spawn,stop,watch及分派計算任務等,其實就是一個程式的集線器:

  object GreetStarter {
    import Messages._
    def apply(): Behavior[SayHi] = {
      Behaviors.setup { ctx =>
        val props = DispatcherSelector.fromConfig("akka.actor.default-blocking-io-dispatcher")
        val helloActor = ctx.spawn(HelloActor(), "hello-actor",props)
        val greeter = ctx.spawn(Greeter(helloActor), "greeter")
        ctx.watch(greeter)
        ctx.watchWith(helloActor,StopWorker("something happend"))
        Behaviors.receiveMessage { who =>
          if (who.name == "stop") {
            ctx.stop(helloActor)
            ctx.stop(greeter)
            Behaviors.stopped
          } else {
            greeter ! who
            Behaviors.same
          }
        }
      }
    }
  }

但是,總有時候我們需要在root-actor的ActorContext之外來進行一些製造、使用actor的操作。下麵這個官方文檔上的例子是很好的示範:

import akka.actor.typed.Behavior
import akka.actor.typed.SpawnProtocol
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.LoggerOps

object HelloWorldMain {
  def apply(): Behavior[SpawnProtocol.Command] =
    Behaviors.setup { context =>
      // Start initial tasks
      // context.spawn(...)

      SpawnProtocol()
    }
}

object Main extends App {
implicit val system: ActorSystem[SpawnProtocol.Command] =
  ActorSystem(HelloWorldMain(), "hello")

// needed in implicit scope for ask (?)
import akka.actor.typed.scaladsl.AskPattern._
implicit val ec: ExecutionContext = system.executionContext
implicit val timeout: Timeout = Timeout(3.seconds)

val greeter: Future[ActorRef[HelloWorld.Greet]] =
  system.ask(SpawnProtocol.Spawn(behavior = HelloWorld(), name = "greeter", props = Props.empty, _))

val greetedBehavior = Behaviors.receive[HelloWorld.Greeted] { (context, message) =>
  context.log.info2("Greeting for {} from {}", message.whom, message.from)
  Behaviors.stopped
}

val greetedReplyTo: Future[ActorRef[HelloWorld.Greeted]] =
  system.ask(SpawnProtocol.Spawn(greetedBehavior, name = "", props = Props.empty, _))

for (greeterRef <- greeter; replyToRef <- greetedReplyTo) {
  greeterRef ! HelloWorld.Greet("Akka", replyToRef)
}
...
}

可以看到所有操作都在actor框架之外進行的。這個SpawnProtocol本身就是一個actor,如下:

object SpawnProtocol {

...
  final case class Spawn[T](behavior: Behavior[T], name: String, props: Props, replyTo: ActorRef[ActorRef[T]])
      extends Command
...
  def apply(): Behavior[Command] =
    Behaviors.receive { (ctx, msg) =>
      msg match {
        case Spawn(bhvr, name, props, replyTo) =>
          val ref =
            if (name == null || name.equals(""))
              ctx.spawnAnonymous(bhvr, props)
            else {

              @tailrec def spawnWithUniqueName(c: Int): ActorRef[Any] = {
                val nameSuggestion = if (c == 0) name else s"$name-$c"
                ctx.child(nameSuggestion) match {
                  case Some(_) => spawnWithUniqueName(c + 1) // already taken, try next
                  case None    => ctx.spawn(bhvr, nameSuggestion, props)
                }
              }

              spawnWithUniqueName(0)
            }
          replyTo ! ref
          Behaviors.same
      }
    }

}

外界通過發送Spawn消息來指定產生新的actor。

actor的狀態切換就是從一種behavior轉到另一種behavior。我們可以自定義behavior或者用現成的Behaviors.???。如果只是涉及內部變數變化,那麼可以直接生成帶著變數的當前behavior,如下:

object HelloWorldBot {

  def apply(max: Int): Behavior[HelloWorld.Greeted] = {
    bot(0, max)
  }

  private def bot(greetingCounter: Int, max: Int): Behavior[HelloWorld.Greeted] =
    Behaviors.receive { (context, message) =>
      val n = greetingCounter + 1
      context.log.info2("Greeting {} for {}", n, message.whom)
      if (n == max) {
        Behaviors.stopped
      } else {
        message.from ! HelloWorld.Greet(message.whom, context.self)
        bot(n, max)
      }
    }
}

actor停用可以由直屬父輩actor的ActorContext.stop或者自身的Behaviors.stopped來實現。Behaviors.stopped可以帶入一個清理函數。在actor完全停止之前進行一些清理操作: 

object MasterControlProgram {
  sealed trait Command
  final case class SpawnJob(name: String) extends Command
  case object GracefulShutdown extends Command

  // Predefined cleanup operation
  def cleanup(log: Logger): Unit = log.info("Cleaning up!")

  def apply(): Behavior[Command] = {
    Behaviors
      .receive[Command] { (context, message) =>
        message match {
          case SpawnJob(jobName) =>
            context.log.info("Spawning job {}!", jobName)
            context.spawn(Job(jobName), name = jobName)
            Behaviors.same
          case GracefulShutdown =>
            context.log.info("Initiating graceful shutdown...")
            // perform graceful stop, executing cleanup before final system termination
            // behavior executing cleanup is passed as a parameter to Actor.stopped
            Behaviors.stopped { () =>
              cleanup(context.system.log)
            }
        }
      }
      .receiveSignal {
        case (context, PostStop) =>
          context.log.info("Master Control Program stopped")
          Behaviors.same
      }
  }
}

實際上一個actor轉入停用stop狀態可以在另一個作為監視actor的receiveSignal獲取,如下:

  object GreetStarter {
    import Messages._
    def apply(): Behavior[SayHi] = {
      Behaviors.setup { ctx =>
        val props = DispatcherSelector.fromConfig("akka.actor.default-blocking-io-dispatcher")
        val helloActor = ctx.spawn(HelloActor(), "hello-actor",props)
        val greeter = ctx.spawn(Greeter(helloActor), "greeter")
        ctx.watch(greeter)
        ctx.watchWith(helloActor,StopWorker("something happend"))
        Behaviors.receiveMessage { who =>
          if (who.name == "stop") {
            ctx.stop(helloActor)
            ctx.stop(greeter)
            Behaviors.stopped
          } else {
            greeter ! who
            Behaviors.same
          }
        }.receiveSignal {
            case (context, Terminated(ref)) =>
              context.log.info("{} stopped!", ref.path.name)
              Behaviors.same
          }
      }
    }
  }

下麵是.receiveSignal函數及其捕獲的Signal消息:

  trait Receive[T] extends Behavior[T] {
    def receiveSignal(onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Behavior[T]
  }



trait Signal

/**
 * Lifecycle signal that is fired upon restart of the Actor before replacing
 * the behavior with the fresh one (i.e. this signal is received within the
 * behavior that failed).
 */
sealed abstract class PreRestart extends Signal
case object PreRestart extends PreRestart {
  def instance: PreRestart = this
}

/**
 * Lifecycle signal that is fired after this actor and all its child actors
 * (transitively) have terminated. The [[Terminated]] signal is only sent to
 * registered watchers after this signal has been processed.
 */
sealed abstract class PostStop extends Signal
// comment copied onto object for better hints in IDEs
/**
 * Lifecycle signal that is fired after this actor and all its child actors
 * (transitively) have terminated. The [[Terminated]] signal is only sent to
 * registered watchers after this signal has been processed.
 */
case object PostStop extends PostStop {
  def instance: PostStop = this
}

object Terminated {
  def apply(ref: ActorRef[Nothing]): Terminated = new Terminated(ref)
  def unapply(t: Terminated): Option[ActorRef[Nothing]] = Some(t.ref)
}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 截至 Laravel 7,共有 6 個可用的緩存驅動程式,其中 APC 是最佳實踐,而文件驅動程式是唯一不需要額外設置的驅動程式。 我昨晚與一位朋友交談,他提到他們使用 Redis 作為緩存驅動程式,這讓我想到我還有一個仍然使用文件驅動程式的項目。 我想我可以使用一些記憶體驅動緩存,以獲得更好的性能, ...
  • 案例故事: 場景一:反覆重啟Android終端產品100次,每重啟一次錄一個視頻; 場景二:做壓力測試比如Monkey一晚上,我們需要涉及長時間錄像; 場景三:做自動化測試的時候,跑一條自動化用例,錄製每條用例執行的整個過程視頻。 許多需要長時間的壓測的場景,我們都可以通過攝像頭錄像記錄下被測試設備 ...
  • Java 四種訪問許可權 一、概述 訪問等級比較:public > protected > default > private 無論是方法還是成員變數,這四種訪問許可權修飾符作用都一樣 public:不管包外包內,所有類(子類+非子類)均可使用 protected 包內:所有類可使用 包外:子類可使用, ...
  • JAVA 每次從List中取出100條記錄 package com.blmlove; import java.util.*; public class Test { public static void main(String[] args) { Test test = new Test(); Li ...
  • 前言 本文的文字及圖片來源於網路,僅供學習、交流使用,不具有任何商業用途,版權歸原作者所有,如有問題請及時聯繫我們以作處理。 這次數據可視化,我差點認輸了 故事的開頭是,昨天#5000億資產是什麼水平#上了熱搜,因為賭王的離去,他的家產公佈激起各種白日夢想家的誕生,坐我旁邊的小師妹也算了半天要是放餘 ...
  • 前段時間做項目時候,想要在不改變方法簽名的情況下,給 Model::find 方法做個緩存。而且想要做到即插即用。 1.先看一下當我們調用 find 方法時,框架幹了什麼? 找到 Illuminate\Database\Eloquent\Model 的代碼,搜索 find,沒有該方法。看來是走了 _ ...
  • 前言 本文的文字及圖片來源於網路,僅供學習、交流使用,不具有任何商業用途,版權歸原作者所有,如有問題請及時聯繫我們以作處理。 先來看看繪製的動態水球圖:沒有安裝PyEcharts的,先安裝PyEcharts: # 安裝pyecharts模塊,直接安裝就是最新的版本 pip install pyech ...
  • 在編寫程式時,使用3種基本控制結構來構造程式。可以說,程式基本上都含有順序、選擇、迴圈3種基本控制結構,這3種結構到目前為止仍是主要的控制結構。程式以控制結構為單位,只有一個入口和一個出口,基於控制結構可以從前往後地順序閱讀程式,程式的靜態描述與執行時的控制流程容易對應,所以可以獨立地理解各個部分。... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...