Akka-Cluster(4)- DistributedData, 分散式數據類型

来源:https://www.cnblogs.com/tiger-xc/archive/2018/12/24/10166853.html
-Advertisement-
Play Games

在實際應用中,集群環境里共用一些數據是不可避免的。我的意思是有些數據可以在任何節點進行共用同步讀寫,困難的是如何解決更改衝突問題。本來可以通過分散式資料庫來實現這樣的功能,但使用和維護成本又過高,不值得。分散式數據類型distributed-data (ddata)正是為解決這樣的困局而設計的。ak ...


在實際應用中,集群環境里共用一些數據是不可避免的。我的意思是有些數據可以在任何節點進行共用同步讀寫,困難的是如何解決更改衝突問題。本來可以通過分散式資料庫來實現這樣的功能,但使用和維護成本又過高,不值得。分散式數據類型distributed-data (ddata)正是為解決這樣的困局而設計的。akka提供了一組CRDT(ConflictFreeReplicatedDataType 免衝突可複製數據類型)和一套管理方法來實現分散式數據在集群中的免衝突共用共用。

akka提供的分散式數據共用管理方案是通過replicator來實現的。replicator就是一種actor, 在集群的每一個節點運行replicator後,各節點相同actor路徑(去掉地址信息後)的replicator可以通過gissip協議進行溝通,仿佛連接成一個replicator網路通道。replicator提供一套解決數據更新衝突及數據同步的api。首先,共用數據結構是在各節點的replicator中構建的,數據更新時各節點程式把包嵌共用數據類型指定和對該數據更新方法函數的消息發送給本節點的replicator去更新並通過gossip協議向其它節點的replicator同步,同時解決同步時發生的衝突問題。由於數據是存在於replicator內的,所以數據值的讀取同樣是通過向本地replicator發送數據讀取消息實現的。

replicator作為一個actor,可以通過在.conf文件中定義akka-cluster-ddata-DistributedData擴展來啟動,又或者直接通過replicator.prop構建。個人認為直接構建actor會靈活許多,而且可以在一個節點上構建多個replicator,因為不同節點上的replicator是通過actor路徑來分群組的。下麵是通過replicator.prop構建replicator的示範代碼:

  val replicator = system.actorOf(Replicator.props(
    ReplicatorSettings(system).withGossipInterval(1.second)), "replicator")

如果使用配置文件中的akka.extension 進行構建:

akka {
   extensions = ["akka.cluster.ddata.DistributedData"]
 ...
}

val replicator = DistributedData(context.system).replicator

CRDT是某種key,value數據類型。CRDT value主要包括Counter,Flag,Set,Map幾種類型,包括:

/**
 * Implements a boolean flag CRDT that is initialized to `false` and
 * can be switched to `true`. `true` wins over `false` in merge.
 *
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final case class Flag(enabled: Boolean) 
final case class FlagKey(_id: String) 

/**
 * Implements a 'Growing Counter' CRDT, also called a 'G-Counter'.
 * A G-Counter is a increment-only counter (inspired by vector clocks) in
 * which only increment and merge are possible. Incrementing the counter
 * adds 1 to the count for the current node. Divergent histories are
 * resolved by taking the maximum count for each node (like a vector
 * clock merge). The value of the counter is the sum of all node counts.
 *
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final class GCounter
final case class GCounterKey(_id: String)

/**
 * Implements a 'Increment/Decrement Counter' CRDT, also called a 'PN-Counter'.
 * PN-Counters allow the counter to be incremented by tracking the
 * increments (P) separate from the decrements (N). Both P and N are represented
 * as two internal [[GCounter]]s. Merge is handled by merging the internal P and N
 * counters. The value of the counter is the value of the P counter minus
 * the value of the N counter.
 *
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final class PNCounter 
final case class PNCounterKey(_id: String)

/**
 * Implements a 'Add Set' CRDT, also called a 'G-Set'. You can't
 * remove elements of a G-Set.
 * A G-Set doesn't accumulate any garbage apart from the elements themselves.
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final case class GSet[A]
final case class GSetKey[A](_id: String)

/**
 * Implements a 'Observed Remove Set' CRDT, also called a 'OR-Set'.
 * Elements can be added and removed any number of times. Concurrent add wins
 * over remove.
 *
 * The ORSet has a version vector that is incremented when an element is added to
 * the set. The `node -> count` pair for that increment is stored against the
 * element as its "birth dot". Every time the element is re-added to the set,
 * its "birth dot" is updated to that of the `node -> count` version vector entry
 * resulting from the add. When an element is removed, we simply drop it, no tombstones.
 *
 * When an element exists in replica A and not replica B, is it because A added
 * it and B has not yet seen that, or that B removed it and A has not yet seen that?
 * In this implementation we compare the `dot` of the present element to the version vector
 * in the Set it is absent from. If the element dot is not "seen" by the Set version vector,
 * that means the other set has yet to see this add, and the item is in the merged
 * Set. If the Set version vector dominates the dot, that means the other Set has removed this
 * element already, and the item is not in the merged Set.
 *
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final class ORSet[A]
final case class ORSetKey[A](_id: String)

/**
 * Implements a 'Observed Remove Map' CRDT, also called a 'OR-Map'.
 *
 * It has similar semantics as an [[ORSet]], but in case of concurrent updates
 * the values are merged, and must therefore be [[ReplicatedData]] types themselves.
 *
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final class ORMap[A, B <: ReplicatedData]
final case class ORMapKey[A, B <: ReplicatedData](_id: String)

/**
 * An immutable multi-map implementation. This class wraps an
 * [[ORMap]] with an [[ORSet]] for the map's value.
 *
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final class ORMultiMap[A, B]
final case class ORMultiMapKey[A, B](_id: String)

/**
 * Map of named counters. Specialized [[ORMap]] with [[PNCounter]] values.
 *
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final class PNCounterMap[A]
final case class PNCounterMapKey[A](_id: String)

綜合統計,akka提供現成的CRDT類型包括:

Counters: GCounter, PNCounter
Sets: GSet, ORSet
Maps: ORMap, ORMultiMap, LWWMap, PNCounterMap
Registers: LWWRegister, Flag

CRDT操作結果也可以通過訂閱方式獲取。用戶發送Subscribe消息給replicator訂閱有關Key[A]數據的操作結果:

 /**
   * Register a subscriber that will be notified with a [[Changed]] message
   * when the value of the given `key` is changed. Current value is also
   * sent as a [[Changed]] message to a new subscriber.
   *
   * Subscribers will be notified periodically with the configured `notify-subscribers-interval`,
   * and it is also possible to send an explicit `FlushChanges` message to
   * the `Replicator` to notify the subscribers immediately.
   *
   * The subscriber will automatically be unregistered if it is terminated.
   *
   * If the key is deleted the subscriber is notified with a [[Deleted]]
   * message.
   */
  final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage
  /**
   * Unregister a subscriber.
   *
   * @see [[Replicator.Subscribe]]
   */
  final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage
  /**
   * The data value is retrieved with [[#get]] using the typed key.
   *
   * @see [[Replicator.Subscribe]]
   */
  final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) extends ReplicatorMessage {
    /**
     * The data value, with correct type.
     * Scala pattern matching cannot infer the type from the `key` parameter.
     */
    def get[T <: ReplicatedData](key: Key[T]): T = {
      require(key == this.key, "wrong key used, must use contained key")
      data.asInstanceOf[T]
    }

    /**
     * The data value. Use [[#get]] to get the fully typed value.
     */
    def dataValue: A = data
  }

  final case class Deleted[A <: ReplicatedData](key: Key[A]) extends NoSerializationVerificationNeeded {
    override def toString: String = s"Deleted [$key]"
  }

replicator完成操作後發佈topic為Key[A]的Changed, Deleted消息。

分散式數據讀寫是通過發送消息給本地的replicator來實現的。讀寫消息包括Update,Get,Delete。讀取數據用Get,也可以訂閱CRDT的更新狀態消息Changed, Deleted。

賦予CRDT複製和免衝突特性的應該是replicator對Update這個消息的處理方式。Update消息的構建代碼如下:

  final case class Update[A <: ReplicatedData](key: Key[A], writeConsistency: WriteConsistency,request: Option[Any])(val modify: Option[A] ⇒ A) 
extends Command[A] with NoSerializationVerificationNeeded {...}

def apply[A <: ReplicatedData](
      key: Key[A], initial: A, writeConsistency: WriteConsistency,
      request: Option[Any] = None)(modify: A ⇒ A): Update[A] =
      Update(key, writeConsistency, request)(modifyWithInitial(initial, modify))

private def modifyWithInitial[A <: ReplicatedData](initial: A, modify: A ⇒ A): Option[A] ⇒ A = {
      case Some(data) ⇒ modify(data)
      case None       ⇒ modify(initial)
    }

我們看到在Update類型里包嵌了數據標示Key[A]和一個函數modify: Option[A] => A。replicator會用這個modify函數來對CRDT數據A進行轉換處理。構建器函數apply還包括了A類型數據的初始值,在第一次引用這個數據時就用initial這個初始值,這個從modifyWithInitial函數和它在apply里的引用可以瞭解。下麵是這個Update消息的使用示範:

  val timeout = 3.seconds.dilated

  val KeyA = GCounterKey("A")
  val KeyB = ORSetKey[String]("B")
  val KeyC = PNCounterMapKey[String]("C")
  val KeyD = ORMultiMapKey[String, String]("D")
  val KeyE = ORMapKey[String, GSet[String]]("E")

  replicator ! Update(KeyA, GCounter(), WriteAll(timeout))(_ + 3)

  replicator ! Update(KeyB, ORSet(), WriteAll(timeout))(_ + "a" + "b" + "c")

  replicator ! Update(KeyC, PNCounterMap.empty[String], WriteAll(timeout)) { _ increment "x" increment "y" }

  replicator ! Update(KeyD, ORMultiMap.empty[String, String], WriteAll(timeout)) { _ + ("a" → Set("A")) }

  replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteAll(timeout)) { _ + ("a" → GSet.empty[String].add("A")) }

由於CRDT數據讀寫是通過消息發送形式實現的,讀寫結果也是通過消息形式返回的。數據讀取返回消息里包嵌了結果數據。下麵就是讀寫返回結果消息類型:

/*------------------UPDATE STATE MESSAGES -----------*/
  final case class UpdateSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any])
    extends UpdateResponse[A] with DeadLetterSuppression
  sealed abstract class UpdateFailure[A <: ReplicatedData] extends UpdateResponse[A]

  /**
   * The direct replication of the [[Update]] could not be fulfill according to
   * the given [[WriteConsistency consistency level]] and
   * [[WriteConsistency#timeout timeout]].
   *
   * The `Update` was still performed locally and possibly replicated to some nodes.
   * It will eventually be disseminated to other replicas, unless the local replica
   * crashes before it has been able to communicate with other replicas.
   */
  final case class UpdateTimeout[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends UpdateFailure[A]
  /**
   * If the `modify` function of the [[Update]] throws an exception the reply message
   * will be this `ModifyFailure` message. The original exception is included as `cause`.
   */
  final case class ModifyFailure[A <: ReplicatedData](key: Key[A], errorMessage: String, cause: Throwable, request: Option[Any])
    extends UpdateFailure[A] {
    override def toString: String = s"ModifyFailure [$key]: $errorMessage"
  }
  /**
   * The local store or direct replication of the [[Update]] could not be fulfill according to
   * the given [[WriteConsistency consistency level]] due to durable store errors. This is
   * only used for entries that have been configured to be durable.
   *
   * The `Update` was still performed in memory locally and possibly replicated to some nodes,
   * but it might not have been written to durable storage.
   * It will eventually be disseminated to other replicas, unless the local replica
   * crashes before it has been able to communicate with other replicas.
   */
  final case class StoreFailure[A <: ReplicatedData](key: Key[A], request: Option[Any])
    extends UpdateFailure[A] with DeleteResponse[A] {

/* ---------------- GET MESSAGES --------*/
  /**
   * Reply from `Get`. The data value is retrieved with [[#get]] using the typed key.
   */
  final case class GetSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any])(data: A)
    extends GetResponse[A] with ReplicatorMessage {

    /**
     * The data value, with correct type.
     * Scala pattern matching cannot infer the type from the `key` parameter.
     */
    def get[T <: ReplicatedData](key: Key[T]): T = {
      require(key == this.key, "wrong key used, must use contained key")
      data.asInstanceOf[T]
    }

    /**
     * The data value. Use [[#get]] to get the fully typed value.
     */
    def dataValue: A = data
  }
  final case class NotFound[A <: ReplicatedData](key: Key[A], request: Option[Any])
    extends GetResponse[A] with ReplicatorMessage


/*----------------DELETE MESSAGES ---------*/
  final case class DeleteSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends DeleteResponse[A]
  final case class ReplicationDeleteFailure[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends DeleteResponse[A]
  final case class DataDeleted[A <: ReplicatedData](key: Key[A], request: Option[Any])
    extends RuntimeException with NoStackTrace with DeleteResponse[A] {
    override def toString: String = s"DataDeleted [$key]"
  }

讀取返回消息中定義了數據讀取方法def dataValue: A 獲取數據,或者用類型方法get(Key[A])指定讀取目標。下麵是一些數據讀取例子:

val replicator = DistributedData(system).replicator
val Counter1Key = PNCounterKey("counter1")
val Set1Key = GSetKey[String]("set1")
val Set2Key = ORSetKey[String]("set2")
val ActiveFlagKey = FlagKey("active")

replicator ! Get(Counter1Key, ReadLocal)

val readFrom3 = ReadFrom(n = 3, timeout = 1.second)
replicator ! Get(Set1Key, readFrom3)

val readMajority = ReadMajority(timeout = 5.seconds)
replicator ! Get(Set2Key, readMajority)

val readAll = ReadAll(timeout = 5.seconds)
replicator ! Get(ActiveFlagKey, readAll)

case g @ GetSuccess(Counter1Key, req) ⇒
  val value = g.get(Counter1Key).value
case NotFound(Counter1Key, req) ⇒ // key counter1 does not exist

...

case g @ GetSuccess(Set1Key, req) ⇒
  val elements = g.get(Set1Key).elements
case GetFailure(Set1Key, req) ⇒
// read from 3 nodes failed within 1.second
case NotFound(Set1Key, req)   ⇒ // key set1 does not exist

/*---- return get result to user (sender())  ----*/

  case "get-count"// incoming request to retrieve current value of the counter
    replicator ! Get(Counter1Key, readTwo, request = Some(sender()))

  case g @ GetSuccess(Counter1Key, Some(replyTo: ActorRef)) ⇒
    val value = g.get(Counter1Key).value.longValue
    replyTo ! value
  case GetFailure(Counter1Key, Some(replyTo: ActorRef)) ⇒
    replyTo ! -1L
  case NotFound(Counter1Key, Some(replyTo: ActorRef)) ⇒
    replyTo ! 0L

下麵是用消息訂閱方式獲取讀寫狀態的示範:

  replicator ! Subscribe(DataKey, self)

...

    case c @ Changed(DataKey) ⇒
      val data = c.get(DataKey)
      log.info("Current elements: {}", data.elements)

在下麵我們做一個例子來示範幾種CRDT數據的讀寫和監控操作:

object DDataUpdator {

  case object IncCounter
  case class AddToSet(item: String)
  case class AddToMap(item: String)
  case object ReadSet
  case object ReadMap
  case object ShutDownDData


  val KeyCounter = GCounterKey("counter")
  val KeySet = ORSetKey[String]("gset")
  val KeyMap = ORMultiMapKey[Long, String]("ormap")

  val timeout = 300 millis
  val writeAll = WriteAll(timeout)
  val readAll = ReadAll(timeout)


  def create(port: Int): ActorRef = {
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = $port")
      .withFallback(ConfigFactory.load())
    val system = ActorSystem("DDataSystem",config)
    system.actorOf(Props[DDataUpdator],s"updator-$port")
  }

}

class DDataUpdator extends Actor with ActorLogging {
  import DDataUpdator._
  implicit val cluster = Cluster(context.system)
  val replicator = DistributedData(context.system).replicator


  replicator ! Subscribe(KeyCounter,self)
  replicator ! Subscribe(KeySet,self)
  replicator ! Subscribe(KeyMap,self)

  override def receive: Receive = {
    case IncCounter =>
       log.info(s"******* Incrementing counter... *****")
       replicator ! Update(KeyCounter,GCounter(),writeAll)(_ + 1)
    case UpdateSuccess(KeyCounter,_) =>
      log.info(s"********** Counter updated successfully ********")
    case UpdateTimeout(KeyCounter,_) =>
      log.info(s"******* Counter update timed out! *****")
    case ModifyFailure(KeyCounter,msg,err,_) =>
      log.info(s"******* Counter update failed with error: ${msg} *****")
    case StoreFailure(KeyCounter,_) =>
      log.info(s"******* Counter value store failed! *****")
    case c @ Changed(KeyCounter) ⇒
      val data = c.get(KeyCounter)
      log.info("********Current count: {}*******", data.getValue)


    case AddToSet(item) =>
      replicator ! Update(KeySet,ORSet.empty[String],writeAll)(_ + item)
    case UpdateSuccess(KeySet,_) =>
      log.info(s"**********Add to ORSet successfully ********")
    case UpdateTimeout(KeySet,_) =>
      log.info(s"******* Add to ORSet timed out! *****")
    case ModifyFailure(KeySet,msg,err,_) =>
      log.info(s"******* Add to ORSet failed with error: ${msg} *****")
    case StoreFailure(KeySet,_) =>
      log.info(s"******* ORSet items store failed! *****")
    case c @ Changed(KeySet) =>
      val data = c.get(KeySet)
      log.info("********Items in ORSet: {}*******", data.elements)
    case ReadSet =>
      replicator ! Get(KeySet,readAll)
    case g @ GetSuccess(KeySet, req) =>
      val value = g.get(KeySet)
      log.info("********Current items read in ORSet: {}*******", value.elements)
    case NotFound(KeySet, req) =>
      log.info("******No item found in ORSet!!!*******")



    case AddToMap(item) =>
       replicator ! Get(KeyCounter,readAll,Some(AddToMap(item)))
    case g @ GetSuccess(KeyCounter,Some(AddToMap(item))) =>
      val idx: Long = g.get(KeyCounter).getValue.longValue()
      log.info(s"*********** got counter=${idx} with item: $item ************")
      replicator ! Update(KeyMap,ORMultiMap.empty[Long,String],writeAll)(_ + (idx -> Set(item)))
      replicator ! Update(KeyCounter,GCounter(),writeAll)(_ + 1)
    case c @ Changed(KeyMap) =>
      val data = c.get(KeyMap).entries
      log.info("******** Items in ORMultiMap: {}*******", data)
    case ReadMap =>
      replicator ! Get(KeyMap,readAll)
    case g @ GetSuccess(KeyMap, req) =>
      val value = g.get(KeyMap)
      log.info("********Current items read in ORMultiMap: {}*******", value.entries)
    case NotFound(KeyMap, req) =>
      log.info("****** No item found in ORMultiMap!!! *******")



    case ShutDownDData => context.system.terminate()

  }

在這個例子里我們示範了每種CRDT數據的通用操作方法。然後我們再測試一下使用結果:

object DDataDemo extends App {
  import DDataUpdator._

  val ud1 = create(2551)
  val ud2 = create(2552)
  val ud3 = create(2553)
  scala.io.StdIn.readLine()

  ud1 ! IncCounter
  ud2 ! AddToSet("Apple")
  ud1 ! AddToSet("Orange")

  scala.io.StdIn.readLine()

  ud2 ! IncCounter
  ud2 ! AddToSet("Pineapple")
  ud1 ! IncCounter
  ud1 ! AddToMap("Cat")

  scala.io.StdIn.readLine()

  ud1 ! AddToMap("Dog")
  ud2 ! AddToMap("Tiger")
  scala.io.StdIn.readLine()

  ud3 ! ReadSet
  ud3 ! ReadMap
  scala.io.StdIn.readLine()


   ud1 ! ShutDownDData
   ud2 ! ShutDownDData
   ud3 ! ShutDownDData
}

結果如下:

[INFO] [12/24/2018 08:33:40.500] [DDataSystem-akka.actor.default-dispatcher-16] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******* Incrementing counter... *****
[INFO] [12/24/2018 08:33:40.585] [DDataSystem-akka.actor.default-dispatcher-26] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] **********Add to ORSet successfully ********
[INFO] [12/24/2018 08:33:40.585] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ********
[INFO] [12/24/2018 08:33:40.585] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] **********Add to ORSet successfully ********
[INFO] [12/24/2018 08:33:40.726] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 1*******
[INFO] [12/24/2018 08:33:40.726] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Items in ORSet: Set(Orange, Apple)*******
[INFO] [12/24/2018 08:33:40.775] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Items in ORSet: Set(Apple, Orange)*******
[INFO] [12/24/2018 08:33:40.775] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Current count: 1*******
[INFO] [12/24/2018 08:33:40.829] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current count: 1*******
[INFO] [12/24/2018 08:33:40.829] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Items in ORSet: Set(Apple, Orange)*******


[INFO] [12/24/2018 08:34:19.707] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ******* Incrementing counter... *****
[INFO] [12/24/2018 08:34:19.707] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******* Incrementing counter... *****
[INFO] [12/24/2018 08:34:19.710] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********** Counter updated successfully ********
[INFO] [12/24/2018 08:34:19.711] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ********
[INFO] [12/24/2018 08:34:19.712] [DDataSystem-akka.actor.default-dispatcher-28] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] **********Add to ORSet successfully ********
[INFO] [12/24/2018 08:34:19.723] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 3*******
[INFO] [12/24/2018 08:34:19.723] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Items in ORSet: Set(Orange, Apple, Pineapple)*******
[INFO] [12/24/2018 08:34:19.733] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] *********** got counter=3 with item: Cat ************
[INFO] [12/24/2018 08:34:19.767] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ********
[INFO] [12/24/2018 08:34:19.772] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Current count: 4*******
[INFO] [12/24/2018 08:34:19.773] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Items in ORSet: Set(Apple, Orange, Pineapple)*******
[INFO] [12/24/2018 08:34:19.774] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ******** Items in ORMultiMap: Map(3 -> Set(Cat))*******
[INFO] [12/24/2018 08:34:19.828] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current count: 4*******
[INFO] [12/24/2018 08:34:19.828] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Items in ORSet: Set(Apple, Orange, Pineapple)*******
[INFO] [12/24/2018 08:34:19.828] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ******** Items in ORMultiMap: Map(3 -> Set(Cat))*******
[INFO] [12/24/2018 08:34:20.222] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******** Items in ORMultiMap: Map(3 -> Set(Cat))*******
[INFO] [12/24/2018 08:34:20.223] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 4*******

[INFO] [12/24/2018 08:34:45.918] [DDataSystem-akka.actor.default-dispatcher-25] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] *********** got counter=4 with item: Tiger ************
[INFO] [12/24/2018 08:34:45.919] [DDataSystem-akka.actor.default-dispatcher-16] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] *********** got counter=4 with item: Dog ************
[INFO] [12/24/2018 08:34:45.920] [DDataSystem-akka.actor.default-dispatcher-15] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current items read in ORSet: Set(Apple, Orange, Pineapple)*******
[INFO] [12/24/2018 08:34:45.922] [DDataSystem-akka.actor.default-dispatcher-22] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current items read in ORMultiMap: Map(3 -> Set(Cat))*******
[INFO] [12/24/2018 08:34:45.925] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ********
[INFO] [12/24/2018 08:34:45.926] [DDataSystem-akka.actor.default-dispatcher-27] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********** Counter updated successfully ********
[INFO] [12/24/2018 08:34:46.221] [DDataSystem-akka.actor.default-dispatcher-2] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******** Items in ORMultiMap: Map(4 -> Set(Dog, Tiger), 3 -> Set(Cat))*******
[INFO] [12/24/2018 08:34:46.221] [DDataSystem-akka.actor.default-dispatcher-2] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 6*******
[INFO] [12/24/2018 08:34:46.272] [DDataSystem-akka.actor.default-dispatcher-27] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ******** Items in ORMultiMap: Map(4 -> Set(Tiger, Dog), 3 -> Set(Cat))*******
[INFO] [12/24/2018 08:34:46.272] [DDataSystem-akka.actor.default-dispatcher-27] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Current count: 6*******


[INFO] [12/24/2018 08:34:46.326] [DDataSystem-akka.actor.default-dispatcher-22] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ******** Items in ORMultiMap: Map(4 -> Set(Dog, Tiger), 3 -> Set(Cat))*******
[INFO] [12/24/2018 08:34:46.326] [DDataSystem-akka.actor.default-dispatcher-22] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current count: 6*******

註意最後一段顯示結果是在另一個節點2553上讀取其它節點上更新的ORSet和ORMultiMap裡面的數據。其中Map(4->set(Dog,Tiger)

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1、先寫結構 a.如果列表沒有時間 結構為:<li><a>新聞內容</a></li> b.如果列表有時間 結構為:<li><a>新聞內容</a><span>時間</span></li> 2、給li添加高度 3、若列表有時間,給a標簽,span標簽添加浮動 4、調整文本樣式(大小,顏色等) 5、以背景 ...
  • 函數式編程 filter的使用 reduce curry let dragon = (name,size,element) = console.log(dragon('fluffykins','tiny','lightling')) //curry let dragon = name = size ...
  • JavaScript 系列博客(五) 前言 本篇博客學習 js 選擇器來控制 css 和 html、使用事件(鉤子函數)來處理事件完成後完成指定功能以及js 事件控制頁面內容。 js 選擇器 在學習 js 選擇器前需要瞭解幾個概念。 節點(一):在文檔(document)中出現的所有內容都是 doc ...
  • 1.同源問題解決 首先,在同一個域下搭建網路功能變數名稱訪問,需要nginx軟體,下載之後修改部分配置 然後再終端下cmd nginx.exe命令,或者打開nginx.exe文件,會運行nginx一閃而過,在後臺運行而且一次是打開兩個的,可以在任務管理器控制結束進程, 接下來,你就可以打開8080介面給同域 ...
  • 本文是 "Rxjs 響應式編程 第一章:響應式" 這篇文章的學習筆記。 示例代碼地址: "【示例代碼】" 更多文章: "【《大史住在大前端》博文集目錄】" [TOC] 一. 劃重點 三句非常重要的話: 從理念上來理解,Rx模式引入了一種新的 “一切皆流” 的編程範式 從設計模式的角度來看, 是 發佈 ...
  • 國內的設計師大都喜歡用px,而國外的網站大都喜歡用em和rem,那麼三者有什麼區別,又各自有什麼優劣呢? PX特點 1. IE無法調整那些使用px作為單位的字體大小; 2. 國外的大部分網站能夠調整的原因在於其使用了em或rem作為字體單位; 3. Firefox能夠調整px和em,rem,但是96 ...
  • react本身能夠完成動態數據的監聽和更新,如果不是必要可以不適用redux。 安裝redux: cnpm install redux --save,或者yarn add redux。 一、react基本用法 redux是獨立的用於狀態管理的第三方包,它建立狀態機來對單項數據進行管理。 上圖是個人粗 ...
  • 裝飾器模式允許向現有對象中添加新功能,同時又不改變其結構。 介紹 裝飾器模式屬於結構型模式,主要功能是能夠動態地為一個對象添加額外功能。在保證現有功能的基礎上,再添加新功能,可聯想到 WPF 中的附件屬性。 類圖描述 由上圖可知,我們定義了一個基礎介面 IShape 用於約定對象的基礎行為。然後通過 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...