在實際應用中,集群環境里共用一些數據是不可避免的。我的意思是有些數據可以在任何節點進行共用同步讀寫,困難的是如何解決更改衝突問題。本來可以通過分散式資料庫來實現這樣的功能,但使用和維護成本又過高,不值得。分散式數據類型distributed-data (ddata)正是為解決這樣的困局而設計的。ak ...
在實際應用中,集群環境里共用一些數據是不可避免的。我的意思是有些數據可以在任何節點進行共用同步讀寫,困難的是如何解決更改衝突問題。本來可以通過分散式資料庫來實現這樣的功能,但使用和維護成本又過高,不值得。分散式數據類型distributed-data (ddata)正是為解決這樣的困局而設計的。akka提供了一組CRDT(ConflictFreeReplicatedDataType 免衝突可複製數據類型)和一套管理方法來實現分散式數據在集群中的免衝突共用共用。
akka提供的分散式數據共用管理方案是通過replicator來實現的。replicator就是一種actor, 在集群的每一個節點運行replicator後,各節點相同actor路徑(去掉地址信息後)的replicator可以通過gissip協議進行溝通,仿佛連接成一個replicator網路通道。replicator提供一套解決數據更新衝突及數據同步的api。首先,共用數據結構是在各節點的replicator中構建的,數據更新時各節點程式把包嵌共用數據類型指定和對該數據更新方法函數的消息發送給本節點的replicator去更新並通過gossip協議向其它節點的replicator同步,同時解決同步時發生的衝突問題。由於數據是存在於replicator內的,所以數據值的讀取同樣是通過向本地replicator發送數據讀取消息實現的。
replicator作為一個actor,可以通過在.conf文件中定義akka-cluster-ddata-DistributedData擴展來啟動,又或者直接通過replicator.prop構建。個人認為直接構建actor會靈活許多,而且可以在一個節點上構建多個replicator,因為不同節點上的replicator是通過actor路徑來分群組的。下麵是通過replicator.prop構建replicator的示範代碼:
val replicator = system.actorOf(Replicator.props(
ReplicatorSettings(system).withGossipInterval(1.second)), "replicator")
如果使用配置文件中的akka.extension 進行構建:
akka {
extensions = ["akka.cluster.ddata.DistributedData"]
...
}
val replicator = DistributedData(context.system).replicator
CRDT是某種key,value數據類型。CRDT value主要包括Counter,Flag,Set,Map幾種類型,包括:
/**
* Implements a boolean flag CRDT that is initialized to `false` and
* can be switched to `true`. `true` wins over `false` in merge.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final case class Flag(enabled: Boolean)
final case class FlagKey(_id: String)
/**
* Implements a 'Growing Counter' CRDT, also called a 'G-Counter'.
* A G-Counter is a increment-only counter (inspired by vector clocks) in
* which only increment and merge are possible. Incrementing the counter
* adds 1 to the count for the current node. Divergent histories are
* resolved by taking the maximum count for each node (like a vector
* clock merge). The value of the counter is the sum of all node counts.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final class GCounter
final case class GCounterKey(_id: String)
/**
* Implements a 'Increment/Decrement Counter' CRDT, also called a 'PN-Counter'.
* PN-Counters allow the counter to be incremented by tracking the
* increments (P) separate from the decrements (N). Both P and N are represented
* as two internal [[GCounter]]s. Merge is handled by merging the internal P and N
* counters. The value of the counter is the value of the P counter minus
* the value of the N counter.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final class PNCounter
final case class PNCounterKey(_id: String)
/**
* Implements a 'Add Set' CRDT, also called a 'G-Set'. You can't
* remove elements of a G-Set.
* A G-Set doesn't accumulate any garbage apart from the elements themselves.
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final case class GSet[A]
final case class GSetKey[A](_id: String)
/**
* Implements a 'Observed Remove Set' CRDT, also called a 'OR-Set'.
* Elements can be added and removed any number of times. Concurrent add wins
* over remove.
*
* The ORSet has a version vector that is incremented when an element is added to
* the set. The `node -> count` pair for that increment is stored against the
* element as its "birth dot". Every time the element is re-added to the set,
* its "birth dot" is updated to that of the `node -> count` version vector entry
* resulting from the add. When an element is removed, we simply drop it, no tombstones.
*
* When an element exists in replica A and not replica B, is it because A added
* it and B has not yet seen that, or that B removed it and A has not yet seen that?
* In this implementation we compare the `dot` of the present element to the version vector
* in the Set it is absent from. If the element dot is not "seen" by the Set version vector,
* that means the other set has yet to see this add, and the item is in the merged
* Set. If the Set version vector dominates the dot, that means the other Set has removed this
* element already, and the item is not in the merged Set.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final class ORSet[A]
final case class ORSetKey[A](_id: String)
/**
* Implements a 'Observed Remove Map' CRDT, also called a 'OR-Map'.
*
* It has similar semantics as an [[ORSet]], but in case of concurrent updates
* the values are merged, and must therefore be [[ReplicatedData]] types themselves.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final class ORMap[A, B <: ReplicatedData]
final case class ORMapKey[A, B <: ReplicatedData](_id: String)
/**
* An immutable multi-map implementation. This class wraps an
* [[ORMap]] with an [[ORSet]] for the map's value.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final class ORMultiMap[A, B]
final case class ORMultiMapKey[A, B](_id: String)
/**
* Map of named counters. Specialized [[ORMap]] with [[PNCounter]] values.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
final class PNCounterMap[A]
final case class PNCounterMapKey[A](_id: String)
綜合統計,akka提供現成的CRDT類型包括:
Counters: GCounter, PNCounter
Sets: GSet, ORSet
Maps: ORMap, ORMultiMap, LWWMap, PNCounterMap
Registers: LWWRegister, Flag
CRDT操作結果也可以通過訂閱方式獲取。用戶發送Subscribe消息給replicator訂閱有關Key[A]數據的操作結果:
/**
* Register a subscriber that will be notified with a [[Changed]] message
* when the value of the given `key` is changed. Current value is also
* sent as a [[Changed]] message to a new subscriber.
*
* Subscribers will be notified periodically with the configured `notify-subscribers-interval`,
* and it is also possible to send an explicit `FlushChanges` message to
* the `Replicator` to notify the subscribers immediately.
*
* The subscriber will automatically be unregistered if it is terminated.
*
* If the key is deleted the subscriber is notified with a [[Deleted]]
* message.
*/
final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage
/**
* Unregister a subscriber.
*
* @see [[Replicator.Subscribe]]
*/
final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage
/**
* The data value is retrieved with [[#get]] using the typed key.
*
* @see [[Replicator.Subscribe]]
*/
final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) extends ReplicatorMessage {
/**
* The data value, with correct type.
* Scala pattern matching cannot infer the type from the `key` parameter.
*/
def get[T <: ReplicatedData](key: Key[T]): T = {
require(key == this.key, "wrong key used, must use contained key")
data.asInstanceOf[T]
}
/**
* The data value. Use [[#get]] to get the fully typed value.
*/
def dataValue: A = data
}
final case class Deleted[A <: ReplicatedData](key: Key[A]) extends NoSerializationVerificationNeeded {
override def toString: String = s"Deleted [$key]"
}
replicator完成操作後發佈topic為Key[A]的Changed, Deleted消息。
分散式數據讀寫是通過發送消息給本地的replicator來實現的。讀寫消息包括Update,Get,Delete。讀取數據用Get,也可以訂閱CRDT的更新狀態消息Changed, Deleted。
賦予CRDT複製和免衝突特性的應該是replicator對Update這個消息的處理方式。Update消息的構建代碼如下:
final case class Update[A <: ReplicatedData](key: Key[A], writeConsistency: WriteConsistency,request: Option[Any])(val modify: Option[A] ⇒ A)
extends Command[A] with NoSerializationVerificationNeeded {...}
def apply[A <: ReplicatedData](
key: Key[A], initial: A, writeConsistency: WriteConsistency,
request: Option[Any] = None)(modify: A ⇒ A): Update[A] =
Update(key, writeConsistency, request)(modifyWithInitial(initial, modify))
private def modifyWithInitial[A <: ReplicatedData](initial: A, modify: A ⇒ A): Option[A] ⇒ A = {
case Some(data) ⇒ modify(data)
case None ⇒ modify(initial)
}
我們看到在Update類型里包嵌了數據標示Key[A]和一個函數modify: Option[A] => A。replicator會用這個modify函數來對CRDT數據A進行轉換處理。構建器函數apply還包括了A類型數據的初始值,在第一次引用這個數據時就用initial這個初始值,這個從modifyWithInitial函數和它在apply里的引用可以瞭解。下麵是這個Update消息的使用示範:
val timeout = 3.seconds.dilated
val KeyA = GCounterKey("A")
val KeyB = ORSetKey[String]("B")
val KeyC = PNCounterMapKey[String]("C")
val KeyD = ORMultiMapKey[String, String]("D")
val KeyE = ORMapKey[String, GSet[String]]("E")
replicator ! Update(KeyA, GCounter(), WriteAll(timeout))(_ + 3)
replicator ! Update(KeyB, ORSet(), WriteAll(timeout))(_ + "a" + "b" + "c")
replicator ! Update(KeyC, PNCounterMap.empty[String], WriteAll(timeout)) { _ increment "x" increment "y" }
replicator ! Update(KeyD, ORMultiMap.empty[String, String], WriteAll(timeout)) { _ + ("a" → Set("A")) }
replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteAll(timeout)) { _ + ("a" → GSet.empty[String].add("A")) }
由於CRDT數據讀寫是通過消息發送形式實現的,讀寫結果也是通過消息形式返回的。數據讀取返回消息里包嵌了結果數據。下麵就是讀寫返回結果消息類型:
/*------------------UPDATE STATE MESSAGES -----------*/
final case class UpdateSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any])
extends UpdateResponse[A] with DeadLetterSuppression
sealed abstract class UpdateFailure[A <: ReplicatedData] extends UpdateResponse[A]
/**
* The direct replication of the [[Update]] could not be fulfill according to
* the given [[WriteConsistency consistency level]] and
* [[WriteConsistency#timeout timeout]].
*
* The `Update` was still performed locally and possibly replicated to some nodes.
* It will eventually be disseminated to other replicas, unless the local replica
* crashes before it has been able to communicate with other replicas.
*/
final case class UpdateTimeout[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends UpdateFailure[A]
/**
* If the `modify` function of the [[Update]] throws an exception the reply message
* will be this `ModifyFailure` message. The original exception is included as `cause`.
*/
final case class ModifyFailure[A <: ReplicatedData](key: Key[A], errorMessage: String, cause: Throwable, request: Option[Any])
extends UpdateFailure[A] {
override def toString: String = s"ModifyFailure [$key]: $errorMessage"
}
/**
* The local store or direct replication of the [[Update]] could not be fulfill according to
* the given [[WriteConsistency consistency level]] due to durable store errors. This is
* only used for entries that have been configured to be durable.
*
* The `Update` was still performed in memory locally and possibly replicated to some nodes,
* but it might not have been written to durable storage.
* It will eventually be disseminated to other replicas, unless the local replica
* crashes before it has been able to communicate with other replicas.
*/
final case class StoreFailure[A <: ReplicatedData](key: Key[A], request: Option[Any])
extends UpdateFailure[A] with DeleteResponse[A] {
/* ---------------- GET MESSAGES --------*/
/**
* Reply from `Get`. The data value is retrieved with [[#get]] using the typed key.
*/
final case class GetSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any])(data: A)
extends GetResponse[A] with ReplicatorMessage {
/**
* The data value, with correct type.
* Scala pattern matching cannot infer the type from the `key` parameter.
*/
def get[T <: ReplicatedData](key: Key[T]): T = {
require(key == this.key, "wrong key used, must use contained key")
data.asInstanceOf[T]
}
/**
* The data value. Use [[#get]] to get the fully typed value.
*/
def dataValue: A = data
}
final case class NotFound[A <: ReplicatedData](key: Key[A], request: Option[Any])
extends GetResponse[A] with ReplicatorMessage
/*----------------DELETE MESSAGES ---------*/
final case class DeleteSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends DeleteResponse[A]
final case class ReplicationDeleteFailure[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends DeleteResponse[A]
final case class DataDeleted[A <: ReplicatedData](key: Key[A], request: Option[Any])
extends RuntimeException with NoStackTrace with DeleteResponse[A] {
override def toString: String = s"DataDeleted [$key]"
}
讀取返回消息中定義了數據讀取方法def dataValue: A 獲取數據,或者用類型方法get(Key[A])指定讀取目標。下麵是一些數據讀取例子:
val replicator = DistributedData(system).replicator
val Counter1Key = PNCounterKey("counter1")
val Set1Key = GSetKey[String]("set1")
val Set2Key = ORSetKey[String]("set2")
val ActiveFlagKey = FlagKey("active")
replicator ! Get(Counter1Key, ReadLocal)
val readFrom3 = ReadFrom(n = 3, timeout = 1.second)
replicator ! Get(Set1Key, readFrom3)
val readMajority = ReadMajority(timeout = 5.seconds)
replicator ! Get(Set2Key, readMajority)
val readAll = ReadAll(timeout = 5.seconds)
replicator ! Get(ActiveFlagKey, readAll)
case g @ GetSuccess(Counter1Key, req) ⇒
val value = g.get(Counter1Key).value
case NotFound(Counter1Key, req) ⇒ // key counter1 does not exist
...
case g @ GetSuccess(Set1Key, req) ⇒
val elements = g.get(Set1Key).elements
case GetFailure(Set1Key, req) ⇒
// read from 3 nodes failed within 1.second
case NotFound(Set1Key, req) ⇒ // key set1 does not exist
/*---- return get result to user (sender()) ----*/
case "get-count" ⇒
// incoming request to retrieve current value of the counter
replicator ! Get(Counter1Key, readTwo, request = Some(sender()))
case g @ GetSuccess(Counter1Key, Some(replyTo: ActorRef)) ⇒
val value = g.get(Counter1Key).value.longValue
replyTo ! value
case GetFailure(Counter1Key, Some(replyTo: ActorRef)) ⇒
replyTo ! -1L
case NotFound(Counter1Key, Some(replyTo: ActorRef)) ⇒
replyTo ! 0L
下麵是用消息訂閱方式獲取讀寫狀態的示範:
replicator ! Subscribe(DataKey, self)
...
case c @ Changed(DataKey) ⇒
val data = c.get(DataKey)
log.info("Current elements: {}", data.elements)
在下麵我們做一個例子來示範幾種CRDT數據的讀寫和監控操作:
object DDataUpdator {
case object IncCounter
case class AddToSet(item: String)
case class AddToMap(item: String)
case object ReadSet
case object ReadMap
case object ShutDownDData
val KeyCounter = GCounterKey("counter")
val KeySet = ORSetKey[String]("gset")
val KeyMap = ORMultiMapKey[Long, String]("ormap")
val timeout = 300 millis
val writeAll = WriteAll(timeout)
val readAll = ReadAll(timeout)
def create(port: Int): ActorRef = {
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = $port")
.withFallback(ConfigFactory.load())
val system = ActorSystem("DDataSystem",config)
system.actorOf(Props[DDataUpdator],s"updator-$port")
}
}
class DDataUpdator extends Actor with ActorLogging {
import DDataUpdator._
implicit val cluster = Cluster(context.system)
val replicator = DistributedData(context.system).replicator
replicator ! Subscribe(KeyCounter,self)
replicator ! Subscribe(KeySet,self)
replicator ! Subscribe(KeyMap,self)
override def receive: Receive = {
case IncCounter =>
log.info(s"******* Incrementing counter... *****")
replicator ! Update(KeyCounter,GCounter(),writeAll)(_ + 1)
case UpdateSuccess(KeyCounter,_) =>
log.info(s"********** Counter updated successfully ********")
case UpdateTimeout(KeyCounter,_) =>
log.info(s"******* Counter update timed out! *****")
case ModifyFailure(KeyCounter,msg,err,_) =>
log.info(s"******* Counter update failed with error: ${msg} *****")
case StoreFailure(KeyCounter,_) =>
log.info(s"******* Counter value store failed! *****")
case c @ Changed(KeyCounter) ⇒
val data = c.get(KeyCounter)
log.info("********Current count: {}*******", data.getValue)
case AddToSet(item) =>
replicator ! Update(KeySet,ORSet.empty[String],writeAll)(_ + item)
case UpdateSuccess(KeySet,_) =>
log.info(s"**********Add to ORSet successfully ********")
case UpdateTimeout(KeySet,_) =>
log.info(s"******* Add to ORSet timed out! *****")
case ModifyFailure(KeySet,msg,err,_) =>
log.info(s"******* Add to ORSet failed with error: ${msg} *****")
case StoreFailure(KeySet,_) =>
log.info(s"******* ORSet items store failed! *****")
case c @ Changed(KeySet) =>
val data = c.get(KeySet)
log.info("********Items in ORSet: {}*******", data.elements)
case ReadSet =>
replicator ! Get(KeySet,readAll)
case g @ GetSuccess(KeySet, req) =>
val value = g.get(KeySet)
log.info("********Current items read in ORSet: {}*******", value.elements)
case NotFound(KeySet, req) =>
log.info("******No item found in ORSet!!!*******")
case AddToMap(item) =>
replicator ! Get(KeyCounter,readAll,Some(AddToMap(item)))
case g @ GetSuccess(KeyCounter,Some(AddToMap(item))) =>
val idx: Long = g.get(KeyCounter).getValue.longValue()
log.info(s"*********** got counter=${idx} with item: $item ************")
replicator ! Update(KeyMap,ORMultiMap.empty[Long,String],writeAll)(_ + (idx -> Set(item)))
replicator ! Update(KeyCounter,GCounter(),writeAll)(_ + 1)
case c @ Changed(KeyMap) =>
val data = c.get(KeyMap).entries
log.info("******** Items in ORMultiMap: {}*******", data)
case ReadMap =>
replicator ! Get(KeyMap,readAll)
case g @ GetSuccess(KeyMap, req) =>
val value = g.get(KeyMap)
log.info("********Current items read in ORMultiMap: {}*******", value.entries)
case NotFound(KeyMap, req) =>
log.info("****** No item found in ORMultiMap!!! *******")
case ShutDownDData => context.system.terminate()
}
在這個例子里我們示範了每種CRDT數據的通用操作方法。然後我們再測試一下使用結果:
object DDataDemo extends App {
import DDataUpdator._
val ud1 = create(2551)
val ud2 = create(2552)
val ud3 = create(2553)
scala.io.StdIn.readLine()
ud1 ! IncCounter
ud2 ! AddToSet("Apple")
ud1 ! AddToSet("Orange")
scala.io.StdIn.readLine()
ud2 ! IncCounter
ud2 ! AddToSet("Pineapple")
ud1 ! IncCounter
ud1 ! AddToMap("Cat")
scala.io.StdIn.readLine()
ud1 ! AddToMap("Dog")
ud2 ! AddToMap("Tiger")
scala.io.StdIn.readLine()
ud3 ! ReadSet
ud3 ! ReadMap
scala.io.StdIn.readLine()
ud1 ! ShutDownDData
ud2 ! ShutDownDData
ud3 ! ShutDownDData
}
結果如下:
[INFO] [12/24/2018 08:33:40.500] [DDataSystem-akka.actor.default-dispatcher-16] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******* Incrementing counter... ***** [INFO] [12/24/2018 08:33:40.585] [DDataSystem-akka.actor.default-dispatcher-26] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] **********Add to ORSet successfully ******** [INFO] [12/24/2018 08:33:40.585] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ******** [INFO] [12/24/2018 08:33:40.585] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] **********Add to ORSet successfully ******** [INFO] [12/24/2018 08:33:40.726] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 1******* [INFO] [12/24/2018 08:33:40.726] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Items in ORSet: Set(Orange, Apple)******* [INFO] [12/24/2018 08:33:40.775] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Items in ORSet: Set(Apple, Orange)******* [INFO] [12/24/2018 08:33:40.775] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Current count: 1******* [INFO] [12/24/2018 08:33:40.829] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current count: 1******* [INFO] [12/24/2018 08:33:40.829] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Items in ORSet: Set(Apple, Orange)******* [INFO] [12/24/2018 08:34:19.707] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ******* Incrementing counter... ***** [INFO] [12/24/2018 08:34:19.707] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******* Incrementing counter... ***** [INFO] [12/24/2018 08:34:19.710] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********** Counter updated successfully ******** [INFO] [12/24/2018 08:34:19.711] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ******** [INFO] [12/24/2018 08:34:19.712] [DDataSystem-akka.actor.default-dispatcher-28] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] **********Add to ORSet successfully ******** [INFO] [12/24/2018 08:34:19.723] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 3******* [INFO] [12/24/2018 08:34:19.723] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Items in ORSet: Set(Orange, Apple, Pineapple)******* [INFO] [12/24/2018 08:34:19.733] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] *********** got counter=3 with item: Cat ************ [INFO] [12/24/2018 08:34:19.767] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ******** [INFO] [12/24/2018 08:34:19.772] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Current count: 4******* [INFO] [12/24/2018 08:34:19.773] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Items in ORSet: Set(Apple, Orange, Pineapple)******* [INFO] [12/24/2018 08:34:19.774] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ******** Items in ORMultiMap: Map(3 -> Set(Cat))******* [INFO] [12/24/2018 08:34:19.828] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current count: 4******* [INFO] [12/24/2018 08:34:19.828] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Items in ORSet: Set(Apple, Orange, Pineapple)******* [INFO] [12/24/2018 08:34:19.828] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ******** Items in ORMultiMap: Map(3 -> Set(Cat))******* [INFO] [12/24/2018 08:34:20.222] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******** Items in ORMultiMap: Map(3 -> Set(Cat))******* [INFO] [12/24/2018 08:34:20.223] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 4******* [INFO] [12/24/2018 08:34:45.918] [DDataSystem-akka.actor.default-dispatcher-25] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] *********** got counter=4 with item: Tiger ************ [INFO] [12/24/2018 08:34:45.919] [DDataSystem-akka.actor.default-dispatcher-16] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] *********** got counter=4 with item: Dog ************ [INFO] [12/24/2018 08:34:45.920] [DDataSystem-akka.actor.default-dispatcher-15] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current items read in ORSet: Set(Apple, Orange, Pineapple)******* [INFO] [12/24/2018 08:34:45.922] [DDataSystem-akka.actor.default-dispatcher-22] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current items read in ORMultiMap: Map(3 -> Set(Cat))******* [INFO] [12/24/2018 08:34:45.925] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ******** [INFO] [12/24/2018 08:34:45.926] [DDataSystem-akka.actor.default-dispatcher-27] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********** Counter updated successfully ******** [INFO] [12/24/2018 08:34:46.221] [DDataSystem-akka.actor.default-dispatcher-2] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******** Items in ORMultiMap: Map(4 -> Set(Dog, Tiger), 3 -> Set(Cat))******* [INFO] [12/24/2018 08:34:46.221] [DDataSystem-akka.actor.default-dispatcher-2] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 6******* [INFO] [12/24/2018 08:34:46.272] [DDataSystem-akka.actor.default-dispatcher-27] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ******** Items in ORMultiMap: Map(4 -> Set(Tiger, Dog), 3 -> Set(Cat))******* [INFO] [12/24/2018 08:34:46.272] [DDataSystem-akka.actor.default-dispatcher-27] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Current count: 6******* [INFO] [12/24/2018 08:34:46.326] [DDataSystem-akka.actor.default-dispatcher-22] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ******** Items in ORMultiMap: Map(4 -> Set(Dog, Tiger), 3 -> Set(Cat))******* [INFO] [12/24/2018 08:34:46.326] [DDataSystem-akka.actor.default-dispatcher-22] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current count: 6*******
註意最後一段顯示結果是在另一個節點2553上讀取其它節點上更新的ORSet和ORMultiMap裡面的數據。其中Map(4->set(Dog,Tiger)