在前面幾篇討論里我們介紹了在集群環境里的一些編程模式、分散式數據結構及具體實現方式。到目前為止,我們已經實現了把程式任務分配給處於很多伺服器上的actor,能夠最大程度的利用整體系統的硬體資源。這是因為通過akka-cluster能夠把很多伺服器組合成一個虛擬的整體系統,編程人員不需要知道負責運算的 ...
在前面幾篇討論里我們介紹了在集群環境里的一些編程模式、分散式數據結構及具體實現方式。到目前為止,我們已經實現了把程式任務分配給處於很多伺服器上的actor,能夠最大程度的利用整體系統的硬體資源。這是因為通過akka-cluster能夠把很多伺服器組合成一個虛擬的整體系統,編程人員不需要知道負責運算的actor具體在那台伺服器上運行。當然,我所指的整體系統是一種分散式的系統,實質底層還是各集群節點作為完整個體獨立運行的,所以核心理念還是需要將程式分割成能獨立運算的任務,然後分派給可能分佈在很多伺服器上的actor去運算。在上一篇的cluster-load-balance里我們採用了一種fire-and-forget模式把多項獨立任務分配給集群節點上的actor,然後任由它們各自完成運算,中途不做任何交互、控制。這也是一種典型的無內部狀態的運算模式。對外界來講就是開始、完成,中間沒有關於運算進展或當前狀態的交流需要。但在現實里,很多任務是無法完全進行獨立細分的,或者再細分會影響系統效率。比如網上購物網站每個客戶的購物車:它記錄了客戶在網上的所有商品揀選過程,每一個揀選動作都代表更新的購物車狀態,直到完成結算。那麼在一個可能有幾十萬用戶同時線上購物的網站,保留在記憶體的購物車狀態應該是任何機器都無法容納的,只有回到傳統的資料庫模式了,還是要面對無法解決的多併發系統效率問題。這麼分析,集群分片技術可能是最好的解決方法了。
簡單講:集群分片技術就是把一堆帶唯一標識identifier的actor,即entity分佈到集群節點上去。控製程序可以通過唯一ID與entityr進行交互,控制整個運算過程。這樣,我們可以把程式分成相對合理的包含多個過程狀態的細分任務。這些細分任務是由分佈在集群節點上的entity來運算的,產生的狀態當然也使用的是各集群節點上的資源,如此解決上面所提到的記憶體容量問題。akka-cluster提供的actor位置透明化機制能在系統崩潰、增減集群節點時自動重新部署所有的actor以達到負責均衡。而用戶通過固定的ID就能聯絡目標entity,無論它被轉移到任何集群節點上。
集群分片由分片管理ShardRegion和分片定位ShardCoordinator共同協作實現,目標是把消息正確傳遞給指定ID的entity。分片定位負責確定分片所在集群節點,分片管理則對每個集群節點上分片內的entity進行定位。ShardCoordinator是個cluster-singleton,而ShardRegion則必須部署在每個集群節點上。每個分片內的entity必須是一個類型的actor。發給entity的消息內部必須包含分片編號和entity ID。通過從消息中解析位置信息後由ShardCoordinator確定負責傳遞消息的ShardRegion,相關的ShardRegion按ID把消息發送至目標entity。
每個節點上的ShardRegion是通過下麵這個start函數構建的:
/**
* Scala API: Register a named entity type by defining the [[akka.actor.Props]] of the entity actor
* and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor
* for this type can later be retrieved with the [[#shardRegion]] method.
*
* Some settings can be configured as described in the `akka.cluster.sharding` section
* of the `reference.conf`.
*
* @param typeName the name of the entity type
* @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion`
* @param settings configuration settings, see [[ClusterShardingSettings]]
* @param extractEntityId partial function to extract the entity id and the message to send to the
* entity from the incoming message, if the partial function does not match the message will
* be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
* @param extractShardId function to determine the shard id for an incoming message, only messages
* that passed the `extractEntityId` will be used
* @param allocationStrategy possibility to use a custom shard allocation and
* rebalancing logic
* @param handOffStopMessage the message that will be sent to entities when they are to be stopped
* for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`.
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
*/
def start(
typeName: String,
entityProps: Props,
settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: Any): ActorRef = {...}
這個函數登記了名稱為typeName類型entity的分片。函數返回ActorRef,說明ShardRegion是在本節點上的一個actor。下麵是調用示範:
ClusterSharding(system).start(
typeName = Counter.shardName,
entityProps = Counter.props(),
settings = ClusterShardingSettings(system),
extractEntityId = Counter.idExtractor,
extractShardId = Counter.shardResolver)
...
object Counter {
trait Command
case object Increment extends Command
case object Decrement extends Command
case object Get extends Command
case object Stop extends Command
trait Event
case class CounterChanged(delta: Int) extends Event
// Sharding Name
val shardName: String = "Counter"
// outside world if he want to send message to sharding should use this message
case class CounterMessage(id: Long, cmd: Command)
// id extrator
val idExtractor: ShardRegion.ExtractEntityId = {
case CounterMessage(id, msg) => (id.toString, msg)
}
// shard resolver
val shardResolver: ShardRegion.ExtractShardId = {
case CounterMessage(id, msg) => (id % 12).toString
}
def props() = Props[Counter]
}
entityProps是ShardRegion用來重構entity的。typeName是用來查找ShardRegion的,如下:
val counterRegion: ActorRef = ClusterSharding(system).shardRegion("Counter")
counterRegion ! Get(123)
用"Counter"獲得ShardRegion的ActorRef後所有本節點的消息都是通過這個ShardRegion actor來定位,轉達。所以每個ShardRegion都必須具備消息目的地entity的分片編號及entityID的解析方法:extractShardId和extractEntityId。在有些情況下由於節點角色的關係在某個節點不部署任何entity,但本節點需要向其它節點的entity發送消息,這時需要構建一個中介ProxyOnlyShardRegion:
/**
* Java/Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode,
* i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
* entity actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the
* [[#shardRegion]] method.
*
* Some settings can be configured as described in the `akka.cluster.sharding` section
* of the `reference.conf`.
*
* @param typeName the name of the entity type
* @param role specifies that this entity type is located on cluster nodes with a specific role.
* If the role is not specified all nodes in the cluster are used.
* @param messageExtractor functions to extract the entity id, shard id, and the message to send to the
* entity from the incoming message
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
*/
def startProxy(
typeName: String,
role: Optional[String],
messageExtractor: ShardRegion.MessageExtractor): ActorRef = {...}
還有一個重要問題是如何棄用passivate entity,以釋放占用資源。akka-cluster提供的方法是通過定義一個空轉時間值idle-timeout,如果空轉超出此時間段則可以進行passivate。下麵是一段應用示範:兩分鐘空轉就passivate entity
class ABC extends Actor {
...
// passivate the entity when no activity
context.setReceiveTimeout(2.minutes)
...
override def receive .....
override def receiveCommand: Receive = {
case Increment ⇒ persist(CounterChanged(+1))(updateState)
case Decrement ⇒ persist(CounterChanged(-1))(updateState)
case Get(_) ⇒ sender() ! count
case ReceiveTimeout ⇒ context.parent ! Passivate(stopMessage = Stop)
case Stop ⇒ context.stop(self)
}
/* 或者
override def unhandled(msg: Any): Unit = msg match {
case ReceiveTimeout => context.parent ! Passivate(stopMessage = PoisonPill)
case _ => super.unhandled(msg)
}
*/
}
又或者通過設定配置來實現自動的passivation:
在配置文件中設定:akka.cluster.sharding.passivate-idle-entity-after = 120 s // off to disable
下麵是官網提供的一個說明passivation-stop-message的示範代碼:
trait CounterCommand case object Increment extends CounterCommand final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand case object Idle extends CounterCommand case object GoodByeCounter extends CounterCommand def counter2(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[CounterCommand] = { Behaviors.setup { ctx ⇒ def become(value: Int): Behavior[CounterCommand] = Behaviors.receiveMessage[CounterCommand] { case Increment ⇒ become(value + 1) case GetValue(replyTo) ⇒ replyTo ! value Behaviors.same case Idle ⇒ // after receive timeout shard ! ClusterSharding.Passivate(ctx.self) Behaviors.same case GoodByeCounter ⇒ // the stopMessage, used for rebalance and passivate Behaviors.stopped } ctx.setReceiveTimeout(30.seconds, Idle) become(0) } } sharding.init(Entity( typeKey = TypeKey, createBehavior = ctx ⇒ counter2(ctx.shard, ctx.entityId)) .withStopMessage(GoodByeCounter))
實際上是向主管ShardRegion發送Passivation消息,並指定停止方式。
還有必須註意的是如果使用BackoffSupervisor監控entity:必須使用Backoff.OnStop,因為persist異常會直接停掉entity。Backoff.OnStop策略會重構entity(BackoffSupervisedEntity),再啟動。那麼如果實施passivation時真的需要停止entity呢?我們可以如下操作:
case "stop" =>
context.stop(self)
context.parent ! PoisonPill
context.parent是BackoffSupervisor,需要同時停掉。
下麵我們就設計一個例子來示範集群分片應用。為了更貼近現實,在例子使用了event-sourcing,persistentActor等尚未完整介紹的技術和工具。我會在接著的討論里介紹它們的原理和使用方式。這個例子模仿一個水果店收銀業務:有三台pos機,顧客到任何pos機前錄入商品、數量,然後結賬。這個示範的主要目的是任何時間如果後端伺服器出現故障,正在錄入過程中的銷售單狀態都能得到完整恢復。
我們先看看這個pos前端的源代碼:
import akka.actor._
import akka.cluster._
import akka.persistence._
import akka.pattern._
import scala.concurrent.duration._
object POSTerminal {
case class Fruit(code: String, name: String, price: Double)
case class Item(fruit: Fruit, qty: Int)
sealed trait Command {
}
case class Checkout(fruit: Fruit, qty: Int) extends Command
case object ShowTotol extends Command
case class PayCash(amount: Double) extends Command
case object Shutdown extends Command
sealed trait Event {}
case class ItemScanned(fruit: Fruit, qty: Int) extends Event
case object Paid extends Event
case class Items(items: List[Item] = Nil) {
def itemAdded(evt: Event): Items = evt match {
case ItemScanned(fruit,qty) =>
copy( Item(fruit,qty) :: items ) //append item
case _ => this //nothing happens
}
def billPaid = copy(Nil) //clear all items
override def toString = items.reverse.toString()
}
def termProps = Props(new POSTerminal())
//backoff suppervisor must use onStop mode
def POSProps: Props = {
val options = Backoff.onStop(
childProps = termProps,
childName = "posterm",
minBackoff = 1 second,
maxBackoff = 5 seconds,
randomFactor = 0.20
)
BackoffSupervisor.props(options)
}
}
class POSTerminal extends PersistentActor with ActorLogging {
import POSTerminal._
val cluster = Cluster(context.system)
// self.path.parent.name is the type name (utf-8 URL-encoded)
// self.path.name is the entry identifier (utf-8 URL-encoded) but entity has a supervisor
override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.parent.name
var currentItems = Items()
override def receiveRecover: Receive = {
case evt: Event => currentItems = currentItems.itemAdded(evt)
log.info(s"***** ${persistenceId} recovering events ... ********")
case SnapshotOffer(_,loggedItems: Items) =>
log.info(s"***** ${persistenceId} recovering snapshot ... ********")
currentItems = loggedItems
}
override def receiveCommand: Receive = {
case Checkout(fruit,qty) =>
log.info(s"*********${persistenceId} is scanning item: $fruit, qty: $qty *********")
persist(ItemScanned(fruit,qty))(evt => currentItems = currentItems.itemAdded(evt))
case ShowTotol =>
log.info(s"*********${persistenceId} on ${cluster.selfAddress} has current scanned items: *********")
if (currentItems.items == Nil)
log.info(s"**********${persistenceId} None transaction found! *********")
else
currentItems.items.reverse.foreach (item =>
log.info(s"*********${persistenceId}: ${item.fruit.name} ${item.fruit.price} X ${item.qty} = ${item.fruit.price * item.qty} *********"))
case PayCash(amt) =>
log.info(s"**********${persistenceId} paying $amt to settle ***********")
persist(Paid) { _ =>
currentItems = currentItems.billPaid
saveSnapshot(currentItems) //no recovery
}
//shutdown this node to validate entity relocation and proper state recovery
case Shutdown =>
log.info(s"******** node ${cluster.selfAddress} is leaving cluster ... *******")
cluster.leave(cluster.selfAddress)
}
}
我用下麵幾項來總結一下:
1、POSTerminal是具體的業務運算前端,包裹在BackoffSupervisor里。能保證這個entity在因異常如持久化失敗造成停頓時能進行重試。所以,使用了Backoff.onStop方式。
2、persistenceId=self.path.parent.parent.name+"-"+self.path.parent.name 代表: 店號-機號 如: 1-1021。actor.path.name的產生是由ShardRegion具體操作的,其實就是ExtactShardId-ExtractEntityId。
3、註意這個狀態類型Item,它的方法itemAdded(evt): Item 即返回新狀態。所以必須謹記用currentItems=itemAdded(evt)這樣的語法。
下麵是構建和啟動ClusterSharding的源代碼:
object POSShard {
import POSTerminal._
val shardName = "POSManager"
case class POSCommand(id: Long, cmd: Command) {
def shopId = id.toString.head.toString
def posId = id.toString
}
val getPOSId: ShardRegion.ExtractEntityId = {
case posCommand: POSCommand => (posCommand.posId,posCommand.cmd)
}
val getShopId: ShardRegion.ExtractShardId = {
case posCommand: POSCommand => posCommand.shopId
}
def create(port: Int) = {
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
.withFallback(ConfigFactory.load())
val system = ActorSystem("posSystem",config)
ClusterSharding(system).start(
typeName = shardName,
entityProps = POSProps,
settings = ClusterShardingSettings(system),
extractEntityId = getPOSId,
extractShardId = getShopId
)
}
}
用下麵的代碼來測試:
object POSDemo extends App {
POSShard.create(2551)
Thread.sleep(1000)
POSShard.create(2552)
POSShard.create(2553)
val posref = POSShard.create(2554)
scala.io.StdIn.readLine()
val apple = Fruit("0001","high grade apple",10.5)
val orange = Fruit("0002","sunkist orage",12.0)
val grape = Fruit("0003","xinjiang red grape",15.8)
posref ! POSCommand(1021, Checkout(apple,2))
posref ! POSCommand(1021,Checkout(grape,1))
posref ! POSCommand(1021,ShowTotol)
scala.io.StdIn.readLine()
posref ! POSCommand(1021,Shutdown)
scala.io.StdIn.readLine()
posref ! POSCommand(1021,Checkout(orange,10))
posref ! POSCommand(1021,ShowTotol)
scala.io.StdIn.readLine()
posref ! POSCommand(1028,Checkout(orange,10))
posref ! POSCommand(1028,ShowTotol)
scala.io.StdIn.readLine()
}
運算結果如下:
[akka.tcp://[email protected]:2551*********1-1021 is scanning item: Fruit(0001,high grade apple,10.5), qty: 2 ********* [akka.tcp://[email protected]:2551*********1-1021 is scanning item: Fruit(0003,xinjiang red grape,15.8), qty: 1 ********* [akka.tcp://[email protected]:2551*********1-1021 on akka.tcp://[email protected]:2551 has current scanned items: ********* [akka.tcp://[email protected]:2551*********1-1021: high grade apple 10.5 X 2 = 21.0 ********* [akka.tcp://[email protected]:2551*********1-1021: xinjiang red grape 15.8 X 1 = 15.8 ********* [akka.tcp://[email protected]:2551******** node akka.tcp://[email protected]:2551 is leaving cluster ... ******* [akka.tcp://[email protected]:2551/system/remoting-terminator] Remoting shut down. [akka.tcp://[email protected]:2552***** 1-1021 recovering events ... ******** [akka.tcp://[email protected]:2552***** 1-1021 recovering events ... ******** [akka.tcp://[email protected]:2552********1-1021 is scanning item: Fruit(0002,sunkist orage,12.0), qty: 10 ********* [akka.tcp://[email protected]:2552*********1-1021 on akka.tcp://[email protected]:2552 has current scanned items: ********* [akka.tcp://[email protected]:2552*********1-1021: high grade apple 10.5 X 2 = 21.0 ********* [akka.tcp://[email protected]:2552*********1-1021: xinjiang red grape 15.8 X 1 = 15.8 ********* [akka.tcp://[email protected]:2552*********1-1021: sunkist orage 12.0 X 10 = 120.0 *********
從結果顯示看到:一開始1-1021是在2551節點上運行的。我們用Shutdown關停2551後ClusterSharding立即在2552上重構了1-1021並且恢復了之前的狀態。能夠在系統出現故障無法使用的情況下自動對運行中的actor進行遷移、狀態恢復,正是我們這次討論的核心內容。
下麵是本次示範的源代碼:
build.sbt
name := "akka-cluster-sharding" version := "0.2" scalaVersion := "2.12.8" libraryDependencies := Seq( "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19", "com.typesafe.akka" %% "akka-persistence" % "2.5.19", "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.92", "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.92" % Test )
resources/application.conf
akka.actor.warn-about-java-serializer-usage = off akka.log-dead-letters-during-shutdown = off akka.log-dead-letters = off akka { loglevel = INFO actor { provider = "cluster" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://[email protected]:2551"] log-info = off } persistence { journal.plugin = "cassandra-journal" snapshot-store.plugin = "cassandra-snapshot-store" } }
Entities.scala
import akka.actor._ import akka.cluster._ import akka.persistence._ import akka.pattern._ import scala.concurrent.duration._ object POSTerminal { case class Fruit(code: String, name: String, price: Double) case class Item(fruit: Fruit, qty: Int) sealed trait Command { } case class Checkout(fruit: Fruit, qty: Int) extends Command case object ShowTotol extends Command case class PayCash(amount: Double) extends Command case object Shutdown extends Command sealed trait Event {} case class ItemScanned(fruit: Fruit, qty: Int) extends Event case object Paid extends Event case class Items(items: List[Item] = Nil) { def itemAdded(evt: Event): Items = evt match { case ItemScanned(fruit,qty) => copy( Item(fruit,qty) :: items ) //append item case _ => this //nothing happens } def billPaid = copy(Nil) //clear all items override def toString = items.reverse.toString() } def termProps = Props(new POSTerminal()) //backoff suppervisor must use onStop mode def POSProps: Props = { val options = Backoff.onStop( childProps = termProps, childName = "posterm", minBackoff = 1 second, maxBackoff = 5 seconds, randomFactor = 0.20 ) BackoffSupervisor.props(options) } } class POSTerminal extends PersistentActor with ActorLogging { import POSTerminal._ val cluster = Cluster(context.system) // self.path.parent.name is the type name (utf-8 URL-encoded) // self.path.name is the entry identifier (utf-8 URL-encoded) but entity has a supervisor override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.parent.name var currentItems = Items() override def receiveRecover: Receive = { case evt: Event => currentItems = currentItems.itemAdded(evt) log.info(s"***** ${persistenceId} recovering events ... ********") case SnapshotOffer(_,loggedItems: Items) => log.info(s"***** ${persistenceId} recovering snapshot ... ********") currentItems = loggedItems } override def receiveCommand: Receive = { case Checkout(fruit,qty) => log.info(s"*********${persistenceId} is scanning item: $fruit, qty: $qty *********") persist(ItemScanned(fruit,qty))(evt => currentItems = currentItems.itemAdded(evt)) case ShowTotol => log.info(s"*********${persistenceId} on ${cluster.selfAddress} has current scanned items: *********") if (currentItems.items == Nil) log.info(s"**********${persistenceId} None transaction found! *********") else currentItems.items.reverse.foreach (item => log.info(s"*********${persistenceId}: ${item.fruit.name} ${item.fruit.price} X ${item.qty} = ${item.fruit.price * item.qty} *********")) case PayCash(amt) => log.info(s"**********${persistenceId} paying $amt to settle ***********") persist(Paid) { _ => currentItems = currentItems.billPaid saveSnapshot(currentItems) //no recovery } //shutdown this node to validate entity relocation and proper state recovery case Shutdown => log.info(s"******** node ${cluster.selfAddress} is leaving cluster ... *******") cluster.leave(cluster.selfAddress) } }
Shards.scala
import akka.actor._ import akka.cluster.sharding._ import com.typesafe.config.ConfigFactory object POSShard { import POSTerminal._ val shardName = "POSManager" case class POSCommand(id: Long, cmd: Command) { def shopId = id.toString.head.toString def posId = id.toString } val getPOSId: ShardRegion.ExtractEntityId = { case posCommand: POSCommand => (posCommand.posId,posCommand.cmd) } val getShopId: ShardRegion.ExtractShardId = { case posCommand: POSCommand => posCommand.shopId } def create(port: Int) = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.load()) val system = ActorSystem("posSystem",config) ClusterSharding(system).start( typeName = shardName, entityProps = POSProps, settings = ClusterShardingSettings(system), extractEntityId = getPOSId, extractShardId = getShopId ) } }
POSDemo.scala
import POSTerminal._ import POSShard._ object POSDemo extends App { POSShard.create(2551) Thread.sleep(1000) POSShard.create(2552) POSShard.create(2553) val posref = POSShard.create(2554) scala.io.StdIn.readLine() val apple = Fruit("0001","high grade apple",10.5) val orange = Fruit("0002","sunkist orage",12.0) val grape = Fruit("0003","xinjiang red grape",15.8) posref ! POSCommand(1021, Checkout(apple,2)) posref ! POSCommand(1021,Checkout(grape,1)) posref ! POSCommand(1021,ShowTotol) scala.io.StdIn.readLine() posref ! POSCommand(1021,Shutdown) scala.io.StdIn.readLine() posref ! POSCommand(1021,Checkout(orange,10)) posref ! POSCommand(1021,ShowTotol) scala.io.StdIn.readLine() posref ! POSCommand(1028,Checkout(orange,10)) posref ! POSCommand(1028,ShowTotol) scala.io.StdIn.readLine() }