kka-typed(5) - cluster:集群節點狀態監視

来源:https://www.cnblogs.com/tiger-xc/archive/2020/06/07/13062642.html
-Advertisement-
Play Games

akka-cluster對每個節點的每種狀態變化都會在系統消息隊列里發佈相關的事件。通過訂閱有關節點狀態變化的消息就可以獲取每個節點的狀態。這部分已經在之前關於akka-cluster的討論里介紹過了。由於akka-typed里採用了新的消息交流協議,而系統消息的發佈和訂閱也算是消息交換,也受交流協 ...


 akka-cluster對每個節點的每種狀態變化都會在系統消息隊列里發佈相關的事件。通過訂閱有關節點狀態變化的消息就可以獲取每個節點的狀態。這部分已經在之前關於akka-cluster的討論里介紹過了。由於akka-typed里採用了新的消息交流協議,而系統消息的發佈和訂閱也算是消息交換,也受交流協議約束。所以想通過重寫以前示範的ClusterMemberStatus來瞭解一下akka-typed環境下節點狀態變化消息監聽的一些機制。

我們需要一個actor來訂閱系統發佈的節點狀態變化消息。這裡涉及到系統、actor兩端的信息交流。假設向系統訂閱是一種消息的發送,那麼得到的節點狀態變化消息就是系統的response了。先看看actor里的消息定義:

 

object MonitorActor {
  sealed trait ClusterEvent
  private case class MemberStatus(event: MemberEvent) extends ClusterEvent
  private case class ReachStatus(event: ReachabilityEvent) extends ClusterEvent

  def apply(): Behavior[ClusterEvent] = Behaviors.setup[ClusterEvent] { ctx =>
    val memberEventAdapter: ActorRef[MemberEvent] = ctx.messageAdapter(MemberStatus)
    val reachEventAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(ReachStatus)
    Cluster(ctx.system).subscriptions ! Subscribe(memberEventAdapter,classOf[MemberEvent])
    Cluster(ctx.system).subscriptions ! Subscribe(reachEventAdapter,classOf[ReachabilityEvent])

...
}

首先,response 分為 MemberEvent, ReachabilityEvent兩種。MonitorActor處理的消息類型是ClusterEvent。為了處理系統返回的response類型,即MemberEvent,ReachabilityEvent,必須提供這兩種類型到ClusterEvent的轉換。通過ctx.messageAdapter登記MemberEvent -> MemberStatus, ReachabilityEvent -> ReachStatus兩種類型轉換機制使MonitorActor可以接收到MemberStatus, ReachStatus兩種消息:

object MonitorActor {
  sealed trait ClusterEvent
  private case class MemberStatus(event: MemberEvent) extends ClusterEvent
  private case class ReachStatus(event: ReachabilityEvent) extends ClusterEvent

  def apply(): Behavior[ClusterEvent] = Behaviors.setup[ClusterEvent] { ctx =>
    val memberEventAdapter: ActorRef[MemberEvent] = ctx.messageAdapter(MemberStatus)
    val reachEventAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(ReachStatus)
    Cluster(ctx.system).subscriptions ! Subscribe(memberEventAdapter,classOf[MemberEvent])
    Cluster(ctx.system).subscriptions ! Subscribe(reachEventAdapter,classOf[ReachabilityEvent])
    Behaviors.receiveMessage { event =>
      event match {
        case MemberStatus(status) =>
          status match {
            case MemberJoined(member) =>
              ctx.log.info("**************** Member joined: [{}] ***************", member.address)
            case MemberJoined(member) =>
              ctx.log.info("**************** Member joined: [{}] ***************", member.address)
            case MemberUp(member) =>
              ctx.log.info("**************** Member is Up: [{}] ***************", member.address)
            case MemberRemoved(member, previousStatus) =>
              ctx.log.info("**************** Member is Removed: [{}] after {} ***************",
                member.address, previousStatus)
            case MemberLeft(member) =>
              ctx.log.info("**************** Member left: [{}] ***************", member.address)
            case MemberExited(member) =>
              ctx.log.info("**************** Member exited: [{}] ***************", member.address)
            case _: MemberEvent => // ignore
          }
        case ReachStatus(status) =>
            status match {
              case UnreachableMember(member) =>
                ctx.log.info("**************** Member detected as unreachable: [{}] ***************", member)
              case ReachableMember(member) =>
                ctx.log.info("**************** Member back to reachable: [{}] ***************", member)
            }
      }
      Behaviors.same
    }
  }
}

還需要一個actor, 什麼都不幹。存粹構建一個MonitorActor:

object RootActor {
  def apply(): Behavior[Nothing] = Behaviors.setup[Nothing] {ctx =>
     ctx.spawn(MonitorActor(),"listner")
     Behaviors.empty
  }
}

好了,看看main是怎麼實現的吧:

object ClusterMemberStatus {
  import com.typesafe.config.ConfigFactory
  def main(args: Array[String]): Unit = {
    val ports =
      if (args.isEmpty)
        Seq(25251, 25252, 0)
      else
        args.toSeq.map(_.toInt)
    ports.foreach { port =>
      startup(port)
    }

  }

  def startup(port: Int): Unit = {
    val config = ConfigFactory.parseString(s"""
      akka.remote.artery.canonical.port=$port
      """).withFallback(ConfigFactory.load("cluster.conf"))
    ActorSystem[Nothing](RootActor(),"ClusterSystem",config)
  }

}

下麵是測試結果顯示:

22:14:52.755 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://[email protected]:25251] ***************
22:14:52.810 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://[email protected]:51081] - Received InitJoinAck message from [Actor[akka://[email protected]:25251/system/cluster/core/daemon#313431252]] to [akka://[email protected]:51081]
22:14:52.825 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://[email protected]:25251] - Node [akka://[email protected]:51081] is JOINING, roles [dc-default]
22:14:52.825 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member joined: [akka://[email protected]:51081] ***************
22:14:52.829 [ClusterSystem-akka.actor.internal-dispatcher-7] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://[email protected]:25251] - Node added [UniqueAddress(akka://[email protected]:51081,567025403336682144)]
22:14:52.858 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://[email protected]:51081] - Welcome from [akka://[email protected]:25251]
22:14:52.858 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://[email protected]:25251] ***************
22:14:52.858 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member joined: [akka://[email protected]:51081] ***************
22:14:52.858 [ClusterSystem-akka.actor.internal-dispatcher-13] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://[email protected]:51081] - Node added [UniqueAddress(akka://[email protected]:25251,6076326462170320177)]
22:14:53.044 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://[email protected]:25251] - Leader is moving node [akka://[email protected]:51081] to [Up]
22:14:53.044 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://[email protected]:51081] ***************
22:14:53.679 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://[email protected]:51081] ***************
22:14:57.707 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://[email protected]:25251] - Received InitJoin message from [Actor[akka://[email protected]:25252/system/cluster/core/daemon/joinSeedNodeProcess-1#1472023843]] to [akka://[email protected]:25251]
22:14:57.707 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://[email protected]:25251] - Sending InitJoinAck message from node [akka://[email protected]:25251] to [Actor[akka://[email protected]:25252/system/cluster/core/daemon/joinSeedNodeProcess-1#1472023843]] (version [2.6.5])
22:14:57.732 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://[email protected]:25252] - Received InitJoinAck message from [Actor[akka://[email protected]:25251/system/cluster/core/daemon#313431252]] to [akka://[email protected]:25252]
22:14:57.734 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://[email protected]:25251] - Node [akka://[email protected]:25252] is JOINING, roles [dc-default]
22:14:57.735 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member joined: [akka://[email protected]:25252] ***************
22:14:57.735 [ClusterSystem-akka.actor.internal-dispatcher-26] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://[email protected]:25251] - Node added [UniqueAddress(akka://[email protected]:25252,-6913064885699273532)]
22:14:57.737 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://[email protected]:25252] - Welcome from [akka://[email protected]:25251]
22:14:57.737 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://[email protected]:25251] ***************
22:14:57.738 [ClusterSystem-akka.actor.internal-dispatcher-30] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://[email protected]:25252] - Node added [UniqueAddress(akka://[email protected]:25251,6076326462170320177)]
22:14:57.738 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member joined: [akka://[email protected]:25252] ***************
22:14:57.738 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://[email protected]:51081] ***************
22:14:57.738 [ClusterSystem-akka.actor.internal-dispatcher-30] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://[email protected]:25252] - Node added [UniqueAddress(akka://[email protected]:51081,567025403336682144)]
22:14:57.740 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member joined: [akka://[email protected]:25252] ***************
22:14:57.740 [ClusterSystem-akka.actor.internal-dispatcher-16] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://[email protected]:51081] - Node added [UniqueAddress(akka://[email protected]:25252,-6913064885699273532)]
22:14:58.134 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://[email protected]:25251] - Leader is moving node [akka://[email protected]:25252] to [Up]
22:14:58.134 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://[email protected]:25252] ***************
22:14:58.755 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://[email protected]:25252] ***************
22:14:59.146 [ClusterSystem-akka.actor.default-dispatcher-14] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://[email protected]:25252] ***************

下麵是本次示範的全部源代碼:

build.sbt

name := "learn-akka-typed"

version := "0.1"

scalaVersion := "2.13.1"
scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint")
javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation")

val AkkaVersion = "2.6.5"
val AkkaPersistenceCassandraVersion = "1.0.0"


libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
  "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion,
  "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
  "ch.qos.logback"     % "logback-classic"             % "1.2.3"
)

cluster.conf

akka {
  actor {
    provider = cluster

    serialization-bindings {
      "com.learn.akka.CborSerializable" = jackson-cbor
    }
  }
  remote {
    artery {
      canonical.hostname = "127.0.0.1"
      canonical.port = 0
    }
  }
  cluster {
    seed-nodes = [
      "akka://[email protected]:25251",
      "akka://[email protected]:25252"]
  }
}

ClusterMemberStatus.scala

package com.learn.akka
import akka.actor.typed._
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.ClusterEvent._
import akka.cluster.typed.Subscribe
import akka.cluster.typed.Cluster
import akka.actor.typed.ActorSystem

object MonitorActor {
  sealed trait ClusterEvent
  private case class MemberStatus(event: MemberEvent) extends ClusterEvent
  private case class ReachStatus(event: ReachabilityEvent) extends ClusterEvent

  def apply(): Behavior[ClusterEvent] = Behaviors.setup[ClusterEvent] { ctx =>
    val memberEventAdapter: ActorRef[MemberEvent] = ctx.messageAdapter(MemberStatus)
    val reachEventAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(ReachStatus)
    Cluster(ctx.system).subscriptions ! Subscribe(memberEventAdapter,classOf[MemberEvent])
    Cluster(ctx.system).subscriptions ! Subscribe(reachEventAdapter,classOf[ReachabilityEvent])
    Behaviors.receiveMessage { event =>
      event match {
        case MemberStatus(status) =>
          status match {
            case MemberJoined(member) =>
              ctx.log.info("**************** Member joined: [{}] ***************", member.address)
            case MemberJoined(member) =>
              ctx.log.info("**************** Member joined: [{}] ***************", member.address)
            case MemberUp(member) =>
              ctx.log.info("**************** Member is Up: [{}] ***************", member.address)
            case MemberRemoved(member, previousStatus) =>
              ctx.log.info("**************** Member is Removed: [{}] after {} ***************",
                member.address, previousStatus)
            case MemberLeft(member) =>
              ctx.log.info("**************** Member left: [{}] ***************", member.address)
            case MemberExited(member) =>
              ctx.log.info("**************** Member exited: [{}] ***************", member.address)
            case _: MemberEvent => // ignore
          }
        case ReachStatus(status) =>
            status match {
              case UnreachableMember(member) =>
                ctx.log.info("**************** Member detected as unreachable: [{}] ***************", member)
              case ReachableMember(member) =>
                ctx.log.info("**************** Member back to reachable: [{}] ***************", member)
            }
      }
      Behaviors.same
    }
  }
}
object RootActor {
  def apply(): Behavior[Nothing] = Behaviors.setup[Nothing] {ctx =>
     ctx.spawn(MonitorActor(),"listner")
     Behaviors.empty
  }
}
object ClusterMemberStatus {
  import com.typesafe.config.ConfigFactory
  def main(args: Array[String]): Unit = {
    val ports =
      if (args.isEmpty)
        Seq(25251, 25252, 0)
      else
        args.toSeq.map(_.toInt)
    ports.foreach { port =>
      startup(port)
    }

  }

  def startup(port: Int): Unit = {
    val config = ConfigFactory.parseString(s"""
      akka.remote.artery.canonical.port=$port
      """).withFallback(ConfigFactory.load("cluster.conf"))
    ActorSystem[Nothing](RootActor(),"ClusterSystem",config)
  }

}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 列表標簽 無序列表: 無序列表是一個項目的列表,此列項目使用粗體圓點(典型的小黑圓圈)進行標記。 無序列表使用 <ul> 標簽 <ul> <li>劉備</li> <li>關羽</li> <li>孫尚香</li> <li>諸葛亮</li> <li>劉禪</li> </ul> 有序列表: 有序列表也是一 ...
  • # 從零開始的前端生活-理解content(二) 應用 清除浮動 偽元素加content最常見的應用是清除浮動帶來的影響 .clear::after{ content:''; display:table; clear:both; } 字元內容的生成 content還可以插入Unicode字元(萬國 ...
  • 關於《SpringBoot-2.3容器化技術》系列 《SpringBoot-2.3容器化技術》系列,旨在和大家一起學習實踐2.3版本帶來的最新容器化技術,讓咱們的Java應用更加適應容器化環境,在雲計算時代依舊緊跟主流,保持競爭力; 全系列文章分為主題和輔助兩部分,主題部分如下: 《體驗Spring ...
  • 20.裝飾器 20.1 函數基礎知識 在Python中函數為一等公民,我們可以: 把函數賦值給變數 在函數中定義函數 在函數中返回函數 把函數傳遞給函數 20.1.1 把函數賦值給變數 在Python里,函數是對象,因此可以把它賦值給變數,如下所示: def hello(name="Surpass" ...
  • 關於《SpringBoot-2.3容器化技術》系列 《SpringBoot-2.3容器化技術》系列,旨在和大家一起學習實踐2.3版本帶來的最新容器化技術,讓咱們的Java應用更加適應容器化環境,在雲計算時代依舊緊跟主流,保持競爭力; 全系列文章分為主題和輔助兩部分,主題部分如下: 《體驗Spring ...
  • 刷到一個題腦子一下子沒有反應過來記錄一下子學習 如下: 答案就是A 這是為什麼呢 我乍一看nums1 new 了一個數組對象並把長度定為3,nums2聲明瞭一個數組,並定義了12345的值,如果 把nums2賦值給nums1它不是會越界嘛長度不一樣嘛,這是我乍一看的想法。 理解了好一會後發現這個題考 ...
  • 前端採用vue,後臺採用spring cloud微服務,進行前後端分離。公共模塊的搭建 ...
  • 題目:學習static定義靜態變數的用法。 程式分析:無。 實例: 1 #include<stdio.h> 2 int main() 3 { 4 void fun(); 5 for(int i=0;i<3;i++) 6 fun(); 7 return 0; 8 } 9 void fun() 10 { ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...