akka集群是高容錯、去中心化、不存在單點故障以及不存在單點瓶頸的集群。它使用gossip協議通信以及具備故障自動檢測功能。 Gossip收斂 集群中每一個節點被其他節點監督(預設的最大數量為5)。集群中的節點互相監督著,某節點所監督的狀態也正在被其他 ...
akka集群是高容錯、去中心化、不存在單點故障以及不存在單點瓶頸的集群。它使用gossip協議通信以及具備故障自動檢測功能。
Gossip收斂
集群中每一個節點被其他節點監督(預設的最大數量為5)。集群中的節點互相監督著,某節點所監督的狀態也正在被其他監督著。通過gossip協議,節點向其他節點傳遞自己所見節點的最新狀態(Up、Joining等等),同時節點也在接收來自其他節點的信息,這些信息包括哪些節點以及這些節點對應的狀態,並這些節點加入到自己的seen表裡去,表示自己已經看見了這些節點的最新狀態了,當所有的節點都把其他節點“看見”了後,我們可以說"Gossip收斂"完成了。
根據以上陳述,當集群中某節點不可達(unreachable)時,gossip收斂不能完成。那些不可達的節點需要變成可達狀態(reachable)或者down狀態,收斂才能進行。
akka集群不存在leader選舉,但是存在leader節點,但是leader節點可以轉移,leader負責執行leader action
,當每次收斂完成後,leader需要做三件事:
- 將處於
joining
狀態節點變更為Up
狀態, 即joining->up
leaving->exiting
exiting->removed
failure Detector
集群中,一個節點被其他節點監督(預設最大數量為5),任何一個節點被探測到不可達時,那麼這個消息將被通過gossip協議傳播到其他節點去,其他節點也將此節點標為不可達。同時故障檢測機制也會將節點從不可達標記為可達,同時擴散給其他節點。
關於評判一個節點是否可達的方式是利用歷史數據中每次心跳時間間隔的平均值與心跳次數為均方差去構建一個正太分佈,F是這個分佈的密度分佈函數,利用以下公式:
phi = -log10(1 - F(timeSinceLastHeartbeat))
phi反應了當前網路的好壞情況,當
akka.cluster.failure-detector.threshold
閾值配置不當時,並不是等待某個心跳檢測超時時,才會把節點標記為不可達。其值預設為18,想要得到更高的靈敏度,需要把閾值設置降低。

實踐
編程方式構建集群
akka.tcp://[email protected]:2551
節點:
application.conf
:
akka {
actor {
provider = cluster
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = []
}
}
package nathan
import akka.actor.{Actor, ActorSystem, Address}
import akka.cluster.Cluster
import com.typesafe.config.ConfigFactory
object Main extends App {
val actorSystem = ActorSystem("myCluster", ConfigFactory.load())
Cluster(actorSystem).join(Address(protocol = "akka.tcp",system = "myCluster",host = "127.0.0.1",port = 2551))
}
上述代碼Cluster(actorSystem).join(address)
是以address為基礎創建集群,集群的名稱為"myCluster",其中包含"akka.tcp://[email protected]:2551"的節點。集群的名稱為其第一個加入的節點的名字決定,其他後加入的節點的名稱應當與其保持一致。當這個單節點集群創建完畢後,這個單節點就成為seedNode,也就是說,其他節點通過向種子節點發出Join指令,就可以加入集群。
akka.tcp://[email protected]:2552
節點
application.conf
akka {
actor {
provider = cluster
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
}
}
package nathan
import akka.actor.{ActorSystem, Address}
import akka.cluster.Cluster
import com.typesafe.config.ConfigFactory
object Main extends App {
val actorSystem = ActorSystem("myCluster", ConfigFactory.load())
Cluster(actorSystem).joinSeedNodes(List(Address(protocol = "akka.tcp",system = "myCluster1",host = "127.0.0.1",port = 2551)))
}
Cluster(actorSystem).joinSeedNodes(List(address))
代碼作用向某個種子節點發出Join命令以加入集群。這裡填寫的種子節點越多越好,這樣消息在集群中擴散可以更快。
監聽集群節點狀態
集群時間有如下幾種有如下幾種:MemberJoined
、MemberWeaklyUp
、MemberUp
、MemberLeft
、MemberExited
、MemberRemoved
、LeaderChanged
、RoleLeaderChanged
、UnreachableMember
和ReachableMember
等等。
class ListenClusterActor extends Actor {
val cluster = Cluster(context.system)
override def preStart(): Unit = {
cluster.subscribe(self, InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
}
override def postStop(): Unit = cluster.unsubscribe(self)
override def receive: Receive = {
case MemberJoined(member) =>
println("join:" + member)
case MemberUp(member) =>
println("up:" + member)
case MemberExited(member) =>
println("exited:" + member)
case MemberRemoved(member,previousStatus) =>
println("removed:" + member+" before status:"+previousStatus)
case UnreachableMember(member) =>
println("unreachable:" + member)
}
}
當其他節點加入集群時和離開時,列印如下:
join:Member(address = akka.tcp://[email protected]:2552, status = Joining)
up:Member(address = akka.tcp://[email protected]:2552, status = Up)
exited:Member(address = akka.tcp://[email protected]:2552, status = Exiting)
removed:Member(address = akka.tcp://[email protected]:2552, status = Removed) before status:Exiting