Akka-Cluster(6)- Cluster-Sharding:集群分片,分散式交互程式核心方式

来源:https://www.cnblogs.com/tiger-xc/archive/2019/01/17/10280399.html
-Advertisement-
Play Games

在前面幾篇討論里我們介紹了在集群環境里的一些編程模式、分散式數據結構及具體實現方式。到目前為止,我們已經實現了把程式任務分配給處於很多伺服器上的actor,能夠最大程度的利用整體系統的硬體資源。這是因為通過akka-cluster能夠把很多伺服器組合成一個虛擬的整體系統,編程人員不需要知道負責運算的 ...


在前面幾篇討論里我們介紹了在集群環境里的一些編程模式、分散式數據結構及具體實現方式。到目前為止,我們已經實現了把程式任務分配給處於很多伺服器上的actor,能夠最大程度的利用整體系統的硬體資源。這是因為通過akka-cluster能夠把很多伺服器組合成一個虛擬的整體系統,編程人員不需要知道負責運算的actor具體在那台伺服器上運行。當然,我所指的整體系統是一種分散式的系統,實質底層還是各集群節點作為完整個體獨立運行的,所以核心理念還是需要將程式分割成能獨立運算的任務,然後分派給可能分佈在很多伺服器上的actor去運算。在上一篇的cluster-load-balance里我們採用了一種fire-and-forget模式把多項獨立任務分配給集群節點上的actor,然後任由它們各自完成運算,中途不做任何交互、控制。這也是一種典型的無內部狀態的運算模式。對外界來講就是開始、完成,中間沒有關於運算進展或當前狀態的交流需要。但在現實里,很多任務是無法完全進行獨立細分的,或者再細分會影響系統效率。比如網上購物網站每個客戶的購物車:它記錄了客戶在網上的所有商品揀選過程,每一個揀選動作都代表更新的購物車狀態,直到完成結算。那麼在一個可能有幾十萬用戶同時線上購物的網站,保留在記憶體的購物車狀態應該是任何機器都無法容納的,只有回到傳統的資料庫模式了,還是要面對無法解決的多併發系統效率問題。這麼分析,集群分片技術可能是最好的解決方法了。

簡單講:集群分片技術就是把一堆帶唯一標識identifier的actor,即entity分佈到集群節點上去。控製程序可以通過唯一ID與entityr進行交互,控制整個運算過程。這樣,我們可以把程式分成相對合理的包含多個過程狀態的細分任務。這些細分任務是由分佈在集群節點上的entity來運算的,產生的狀態當然也使用的是各集群節點上的資源,如此解決上面所提到的記憶體容量問題。akka-cluster提供的actor位置透明化機制能在系統崩潰、增減集群節點時自動重新部署所有的actor以達到負責均衡。而用戶通過固定的ID就能聯絡目標entity,無論它被轉移到任何集群節點上。

集群分片由分片管理ShardRegion和分片定位ShardCoordinator共同協作實現,目標是把消息正確傳遞給指定ID的entity。分片定位負責確定分片所在集群節點,分片管理則對每個集群節點上分片內的entity進行定位。ShardCoordinator是個cluster-singleton,而ShardRegion則必須部署在每個集群節點上。每個分片內的entity必須是一個類型的actor。發給entity的消息內部必須包含分片編號和entity ID。通過從消息中解析位置信息後由ShardCoordinator確定負責傳遞消息的ShardRegion,相關的ShardRegion按ID把消息發送至目標entity。

每個節點上的ShardRegion是通過下麵這個start函數構建的:

  /**
   * Scala API: Register a named entity type by defining the [[akka.actor.Props]] of the entity actor
   * and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor
   * for this type can later be retrieved with the [[#shardRegion]] method.
   *
   * Some settings can be configured as described in the `akka.cluster.sharding` section
   * of the `reference.conf`.
   *
   * @param typeName the name of the entity type
   * @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion`
   * @param settings configuration settings, see [[ClusterShardingSettings]]
   * @param extractEntityId partial function to extract the entity id and the message to send to the
   *   entity from the incoming message, if the partial function does not match the message will
   *   be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
   * @param extractShardId function to determine the shard id for an incoming message, only messages
   *   that passed the `extractEntityId` will be used
   * @param allocationStrategy possibility to use a custom shard allocation and
   *   rebalancing logic
   * @param handOffStopMessage the message that will be sent to entities when they are to be stopped
   *   for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`.
   * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
   */
  def start(
    typeName:           String,
    entityProps:        Props,
    settings:           ClusterShardingSettings,
    extractEntityId:    ShardRegion.ExtractEntityId,
    extractShardId:     ShardRegion.ExtractShardId,
    allocationStrategy: ShardAllocationStrategy,
    handOffStopMessage: Any): ActorRef = {...}

這個函數登記了名稱為typeName類型entity的分片。函數返回ActorRef,說明ShardRegion是在本節點上的一個actor。下麵是調用示範:

     ClusterSharding(system).start(
        typeName = Counter.shardName,
        entityProps = Counter.props(),
        settings = ClusterShardingSettings(system),
        extractEntityId = Counter.idExtractor,
        extractShardId = Counter.shardResolver)
...

object Counter {

  trait Command
  case object Increment extends Command
  case object Decrement extends Command
  case object Get extends Command
  case object Stop extends Command

  trait Event
  case class CounterChanged(delta: Int) extends Event

  // Sharding Name
  val shardName: String = "Counter"

  // outside world if he want to send message to sharding should use this message
  case class CounterMessage(id: Long, cmd: Command)

  // id extrator
  val idExtractor: ShardRegion.ExtractEntityId = {
    case CounterMessage(id, msg) => (id.toString, msg)
  }
 
  // shard resolver
  val shardResolver: ShardRegion.ExtractShardId = {
    case CounterMessage(id, msg) => (id % 12).toString
  }

  def props() = Props[Counter]

}

entityProps是ShardRegion用來重構entity的。typeName是用來查找ShardRegion的,如下:

val counterRegion: ActorRef = ClusterSharding(system).shardRegion("Counter")
counterRegion ! Get(123)

用"Counter"獲得ShardRegion的ActorRef後所有本節點的消息都是通過這個ShardRegion actor來定位,轉達。所以每個ShardRegion都必須具備消息目的地entity的分片編號及entityID的解析方法:extractShardId和extractEntityId。在有些情況下由於節點角色的關係在某個節點不部署任何entity,但本節點需要向其它節點的entity發送消息,這時需要構建一個中介ProxyOnlyShardRegion:

  /**
   * Java/Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode,
   * i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
   * entity actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the
   * [[#shardRegion]] method.
   *
   * Some settings can be configured as described in the `akka.cluster.sharding` section
   * of the `reference.conf`.
   *
   * @param typeName the name of the entity type
   * @param role specifies that this entity type is located on cluster nodes with a specific role.
   *   If the role is not specified all nodes in the cluster are used.
   * @param messageExtractor functions to extract the entity id, shard id, and the message to send to the
   *   entity from the incoming message
   * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
   */
  def startProxy(
    typeName:         String,
    role:             Optional[String],
    messageExtractor: ShardRegion.MessageExtractor): ActorRef = {...}

還有一個重要問題是如何棄用passivate entity,以釋放占用資源。akka-cluster提供的方法是通過定義一個空轉時間值idle-timeout,如果空轉超出此時間段則可以進行passivate。下麵是一段應用示範:兩分鐘空轉就passivate entity

class ABC extends Actor {
...
 // passivate the entity when no activity
  context.setReceiveTimeout(2.minutes)

...

override def receive .....

  override def receiveCommand: Receive = {
    case Increment      ⇒ persist(CounterChanged(+1))(updateState)
    case Decrement      ⇒ persist(CounterChanged(-1))(updateState)
    case Get(_)         ⇒ sender() ! count
    case ReceiveTimeout ⇒ context.parent ! Passivate(stopMessage = Stop)
    case Stop           ⇒ context.stop(self)
  }
/* 或者
  override def unhandled(msg: Any): Unit = msg match {
    case ReceiveTimeout => context.parent ! Passivate(stopMessage = PoisonPill)
    case _              => super.unhandled(msg)
  }
*/
}

又或者通過設定配置來實現自動的passivation:

在配置文件中設定:akka.cluster.sharding.passivate-idle-entity-after = 120 s   // off to disable

下麵是官網提供的一個說明passivation-stop-message的示範代碼:

trait CounterCommand
case object Increment extends CounterCommand
final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand

case object Idle extends CounterCommand
case object GoodByeCounter extends CounterCommand

def counter2(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[CounterCommand] = {
  Behaviors.setup { ctx ⇒

    def become(value: Int): Behavior[CounterCommand] =
      Behaviors.receiveMessage[CounterCommand] {
        case Increment ⇒
          become(value + 1)
        case GetValue(replyTo) ⇒
          replyTo ! value
          Behaviors.same
        case Idle ⇒
          // after receive timeout
          shard ! ClusterSharding.Passivate(ctx.self)
          Behaviors.same
        case GoodByeCounter ⇒
          // the stopMessage, used for rebalance and passivate
          Behaviors.stopped
      }

    ctx.setReceiveTimeout(30.seconds, Idle)
    become(0)
  }
}

sharding.init(Entity(
  typeKey = TypeKey,
  createBehavior = ctx ⇒ counter2(ctx.shard, ctx.entityId))
  .withStopMessage(GoodByeCounter))

實際上是向主管ShardRegion發送Passivation消息,並指定停止方式。

還有必須註意的是如果使用BackoffSupervisor監控entity:必須使用Backoff.OnStop,因為persist異常會直接停掉entity。Backoff.OnStop策略會重構entity(BackoffSupervisedEntity),再啟動。那麼如果實施passivation時真的需要停止entity呢?我們可以如下操作:

    case "stop" =>
      context.stop(self)
      context.parent ! PoisonPill

context.parent是BackoffSupervisor,需要同時停掉。

下麵我們就設計一個例子來示範集群分片應用。為了更貼近現實,在例子使用了event-sourcing,persistentActor等尚未完整介紹的技術和工具。我會在接著的討論里介紹它們的原理和使用方式。這個例子模仿一個水果店收銀業務:有三台pos機,顧客到任何pos機前錄入商品、數量,然後結賬。這個示範的主要目的是任何時間如果後端伺服器出現故障,正在錄入過程中的銷售單狀態都能得到完整恢復。

我們先看看這個pos前端的源代碼:

import akka.actor._
import akka.cluster._
import akka.persistence._
import akka.pattern._
import scala.concurrent.duration._


object POSTerminal {
  case class Fruit(code: String, name: String, price: Double)
  case class Item(fruit: Fruit, qty: Int)

  sealed trait Command {
  }
  case class Checkout(fruit: Fruit, qty: Int) extends Command
  case object ShowTotol extends Command
  case class PayCash(amount: Double) extends Command
  case object Shutdown extends Command

  sealed trait Event {}
  case class ItemScanned(fruit: Fruit, qty: Int) extends Event
  case object Paid extends Event

  case class Items(items: List[Item] = Nil) {
    def itemAdded(evt: Event): Items = evt match {
      case ItemScanned(fruit,qty) =>
        copy( Item(fruit,qty) :: items )   //append item

      case _ => this     //nothing happens
    }
    def billPaid = copy(Nil)     //clear all items
    override def toString = items.reverse.toString()
  }

  def termProps = Props(new POSTerminal())

  //backoff suppervisor  must use onStop mode
  def POSProps: Props = {
    val options = Backoff.onStop(
      childProps = termProps,
      childName = "posterm",
      minBackoff = 1 second,
      maxBackoff = 5 seconds,
      randomFactor = 0.20
    )
    BackoffSupervisor.props(options)
  }

}

class POSTerminal extends PersistentActor with ActorLogging {
  import POSTerminal._
  val cluster = Cluster(context.system)

  // self.path.parent.name is the type name (utf-8 URL-encoded)
  // self.path.name is the entry identifier (utf-8 URL-encoded)  but entity has a supervisor
  override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.parent.name

  var currentItems = Items()


  override def receiveRecover: Receive = {
    case evt: Event => currentItems = currentItems.itemAdded(evt)
      log.info(s"*****  ${persistenceId} recovering events ...  ********")
    case SnapshotOffer(_,loggedItems: Items) =>
      log.info(s"*****  ${persistenceId} recovering snapshot ...  ********")
      currentItems = loggedItems
  }

  override def receiveCommand: Receive = {
    case Checkout(fruit,qty) =>
      log.info(s"*********${persistenceId} is scanning item: $fruit, qty: $qty *********")
      persist(ItemScanned(fruit,qty))(evt =>  currentItems = currentItems.itemAdded(evt))

    case ShowTotol =>
      log.info(s"*********${persistenceId} on ${cluster.selfAddress} has current scanned items: *********")
      if (currentItems.items == Nil)
        log.info(s"**********${persistenceId} None transaction found! *********")
      else
        currentItems.items.reverse.foreach (item =>
          log.info(s"*********${persistenceId}: ${item.fruit.name} ${item.fruit.price} X ${item.qty} = ${item.fruit.price * item.qty} *********"))

    case PayCash(amt) =>
      log.info(s"**********${persistenceId} paying $amt to settle ***********")
      persist(Paid) { _ =>
        currentItems = currentItems.billPaid
        saveSnapshot(currentItems)     //no recovery
      }

    //shutdown this node to validate entity relocation and proper state recovery
    case Shutdown =>
      log.info(s"******** node ${cluster.selfAddress} is leaving cluster ... *******")
      cluster.leave(cluster.selfAddress)
  }
}

我用下麵幾項來總結一下:

1、POSTerminal是具體的業務運算前端,包裹在BackoffSupervisor里。能保證這個entity在因異常如持久化失敗造成停頓時能進行重試。所以,使用了Backoff.onStop方式。

2、persistenceId=self.path.parent.parent.name+"-"+self.path.parent.name 代表: 店號-機號 如: 1-1021。actor.path.name的產生是由ShardRegion具體操作的,其實就是ExtactShardId-ExtractEntityId。

3、註意這個狀態類型Item,它的方法itemAdded(evt): Item 即返回新狀態。所以必須謹記用currentItems=itemAdded(evt)這樣的語法。

下麵是構建和啟動ClusterSharding的源代碼:

object POSShard {
 import POSTerminal._

 val shardName = "POSManager"
 case class POSCommand(id: Long, cmd: Command) {
   def shopId = id.toString.head.toString
   def posId = id.toString
 }

 val getPOSId: ShardRegion.ExtractEntityId =  {
   case posCommand: POSCommand => (posCommand.posId,posCommand.cmd)
 }
 val getShopId: ShardRegion.ExtractShardId = {
   case posCommand: POSCommand => posCommand.shopId
 }


 def create(port: Int) = {
   val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
     .withFallback(ConfigFactory.load())
   val system = ActorSystem("posSystem",config)

   ClusterSharding(system).start(
     typeName = shardName,
     entityProps = POSProps,
     settings = ClusterShardingSettings(system),
     extractEntityId = getPOSId,
     extractShardId = getShopId
   )
 }

}

用下麵的代碼來測試:

object POSDemo extends App {
  POSShard.create(2551)
  Thread.sleep(1000)
  POSShard.create(2552)
  POSShard.create(2553)
  val posref = POSShard.create(2554)
  scala.io.StdIn.readLine()

  val apple = Fruit("0001","high grade apple",10.5)
  val orange = Fruit("0002","sunkist orage",12.0)
  val grape = Fruit("0003","xinjiang red grape",15.8)


  posref ! POSCommand(1021, Checkout(apple,2))
  posref ! POSCommand(1021,Checkout(grape,1))

  posref ! POSCommand(1021,ShowTotol)
  scala.io.StdIn.readLine()

  posref ! POSCommand(1021,Shutdown)
  scala.io.StdIn.readLine()

  posref ! POSCommand(1021,Checkout(orange,10))


  posref ! POSCommand(1021,ShowTotol)
  scala.io.StdIn.readLine()

  posref ! POSCommand(1028,Checkout(orange,10))

  posref ! POSCommand(1028,ShowTotol)
  scala.io.StdIn.readLine()


}

運算結果如下:

[akka.tcp://[email protected]:2551*********1-1021 is scanning item: Fruit(0001,high grade apple,10.5), qty: 2 *********
[akka.tcp://[email protected]:2551*********1-1021 is scanning item: Fruit(0003,xinjiang red grape,15.8), qty: 1 *********
[akka.tcp://[email protected]:2551*********1-1021 on akka.tcp://[email protected]:2551 has current scanned items: *********
[akka.tcp://[email protected]:2551*********1-1021: high grade apple 10.5 X 2 = 21.0 *********
[akka.tcp://[email protected]:2551*********1-1021: xinjiang red grape 15.8 X 1 = 15.8 *********

[akka.tcp://[email protected]:2551******** node akka.tcp://[email protected]:2551 is leaving cluster ... *******
[akka.tcp://[email protected]:2551/system/remoting-terminator] Remoting shut down.

[akka.tcp://[email protected]:2552*****  1-1021 recovering events ...  ********
[akka.tcp://[email protected]:2552*****  1-1021 recovering events ...  ********
[akka.tcp://[email protected]:2552********1-1021 is scanning item: Fruit(0002,sunkist orage,12.0), qty: 10 *********
[akka.tcp://[email protected]:2552*********1-1021 on akka.tcp://[email protected]:2552 has current scanned items: *********
[akka.tcp://[email protected]:2552*********1-1021: high grade apple 10.5 X 2 = 21.0 *********
[akka.tcp://[email protected]:2552*********1-1021: xinjiang red grape 15.8 X 1 = 15.8 *********
[akka.tcp://[email protected]:2552*********1-1021: sunkist orage 12.0 X 10 = 120.0 *********

從結果顯示看到:一開始1-1021是在2551節點上運行的。我們用Shutdown關停2551後ClusterSharding立即在2552上重構了1-1021並且恢復了之前的狀態。能夠在系統出現故障無法使用的情況下自動對運行中的actor進行遷移、狀態恢復,正是我們這次討論的核心內容。

下麵是本次示範的源代碼:

build.sbt

name := "akka-cluster-sharding"

version := "0.2"

scalaVersion := "2.12.8"

libraryDependencies := Seq(
  "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19",
  "com.typesafe.akka" %% "akka-persistence" % "2.5.19",
  "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.92",
  "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.92" % Test
)

resources/application.conf

akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off

akka {
  loglevel = INFO
  actor {
    provider = "cluster"
  }

  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551"]
    log-info = off
  }

  persistence {
    journal.plugin = "cassandra-journal"
    snapshot-store.plugin = "cassandra-snapshot-store"
  }

}

Entities.scala

import akka.actor._
import akka.cluster._
import akka.persistence._
import akka.pattern._
import scala.concurrent.duration._


object POSTerminal {
  case class Fruit(code: String, name: String, price: Double)
  case class Item(fruit: Fruit, qty: Int)

  sealed trait Command {
  }
  case class Checkout(fruit: Fruit, qty: Int) extends Command
  case object ShowTotol extends Command
  case class PayCash(amount: Double) extends Command
  case object Shutdown extends Command

  sealed trait Event {}
  case class ItemScanned(fruit: Fruit, qty: Int) extends Event
  case object Paid extends Event

  case class Items(items: List[Item] = Nil) {
    def itemAdded(evt: Event): Items = evt match {
      case ItemScanned(fruit,qty) =>
        copy( Item(fruit,qty) :: items )   //append item

      case _ => this     //nothing happens
    }
    def billPaid = copy(Nil)     //clear all items
    override def toString = items.reverse.toString()
  }

  def termProps = Props(new POSTerminal())

  //backoff suppervisor  must use onStop mode
  def POSProps: Props = {
    val options = Backoff.onStop(
      childProps = termProps,
      childName = "posterm",
      minBackoff = 1 second,
      maxBackoff = 5 seconds,
      randomFactor = 0.20
    )
    BackoffSupervisor.props(options)
  }

}

class POSTerminal extends PersistentActor with ActorLogging {
  import POSTerminal._
  val cluster = Cluster(context.system)

  // self.path.parent.name is the type name (utf-8 URL-encoded)
  // self.path.name is the entry identifier (utf-8 URL-encoded)  but entity has a supervisor
  override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.parent.name

  var currentItems = Items()


  override def receiveRecover: Receive = {
    case evt: Event => currentItems = currentItems.itemAdded(evt)
      log.info(s"*****  ${persistenceId} recovering events ...  ********")
    case SnapshotOffer(_,loggedItems: Items) =>
      log.info(s"*****  ${persistenceId} recovering snapshot ...  ********")
      currentItems = loggedItems
  }

  override def receiveCommand: Receive = {
    case Checkout(fruit,qty) =>
      log.info(s"*********${persistenceId} is scanning item: $fruit, qty: $qty *********")
      persist(ItemScanned(fruit,qty))(evt =>  currentItems = currentItems.itemAdded(evt))

    case ShowTotol =>
      log.info(s"*********${persistenceId} on ${cluster.selfAddress} has current scanned items: *********")
      if (currentItems.items == Nil)
        log.info(s"**********${persistenceId} None transaction found! *********")
      else
        currentItems.items.reverse.foreach (item =>
          log.info(s"*********${persistenceId}: ${item.fruit.name} ${item.fruit.price} X ${item.qty} = ${item.fruit.price * item.qty} *********"))

    case PayCash(amt) =>
      log.info(s"**********${persistenceId} paying $amt to settle ***********")
      persist(Paid) { _ =>
        currentItems = currentItems.billPaid
        saveSnapshot(currentItems)     //no recovery
      }

    //shutdown this node to validate entity relocation and proper state recovery
    case Shutdown =>
      log.info(s"******** node ${cluster.selfAddress} is leaving cluster ... *******")
      cluster.leave(cluster.selfAddress)
  }
}

Shards.scala

import akka.actor._
import akka.cluster.sharding._
import com.typesafe.config.ConfigFactory

object POSShard {
 import POSTerminal._

 val shardName = "POSManager"
 case class POSCommand(id: Long, cmd: Command) {
   def shopId = id.toString.head.toString
   def posId = id.toString
 }

 val getPOSId: ShardRegion.ExtractEntityId =  {
   case posCommand: POSCommand => (posCommand.posId,posCommand.cmd)
 }
 val getShopId: ShardRegion.ExtractShardId = {
   case posCommand: POSCommand => posCommand.shopId
 }


 def create(port: Int) = {
   val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
     .withFallback(ConfigFactory.load())
   val system = ActorSystem("posSystem",config)

   ClusterSharding(system).start(
     typeName = shardName,
     entityProps = POSProps,
     settings = ClusterShardingSettings(system),
     extractEntityId = getPOSId,
     extractShardId = getShopId
   )
 }

}

POSDemo.scala

import POSTerminal._
import POSShard._

object POSDemo extends App {
  POSShard.create(2551)
  Thread.sleep(1000)
  POSShard.create(2552)
  POSShard.create(2553)
  val posref = POSShard.create(2554)
  scala.io.StdIn.readLine()

  val apple = Fruit("0001","high grade apple",10.5)
  val orange = Fruit("0002","sunkist orage",12.0)
  val grape = Fruit("0003","xinjiang red grape",15.8)


  posref ! POSCommand(1021, Checkout(apple,2))
  posref ! POSCommand(1021,Checkout(grape,1))

  posref ! POSCommand(1021,ShowTotol)
  scala.io.StdIn.readLine()

  posref ! POSCommand(1021,Shutdown)
  scala.io.StdIn.readLine()

  posref ! POSCommand(1021,Checkout(orange,10))


  posref ! POSCommand(1021,ShowTotol)
  scala.io.StdIn.readLine()

  posref ! POSCommand(1028,Checkout(orange,10))

  posref ! POSCommand(1028,ShowTotol)
  scala.io.StdIn.readLine()


}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.form聲明如下 2.ajax設置如下 var formData = new FormData(document.getElementById("form")); $.ajax({ url:document.form.action, type:"POST", data:formData, pro ...
  • 經常需要copy一個對象,又怕拷貝有問題,那下麵這段就很方便啦,不用擔心copy只是一個引用了。 / @ values 需要copy的變數 / function deepClone(values) { var copy; if(values == null || typeof values != " ...
  • 實現效果如圖: 功能需求: ①滑鼠進入商品名稱,商品名稱變色,同時對應的物品展示圖片顯示對應的物品,滑鼠移出時候,商品名稱恢複原來的顏色 實現分析: 1.HTML+CSS頁面結構如下: 頁面結構設計中,需要註意的知識點: ①商品li設置邊框時候,只設置下邊框 ②三個列表之間的分割線,使用id為cen ...
  • 一、概念 狀態模式:允許對象在內部狀態改變時改變它的行為,對象看起來好像修改了它的類。這個模式將狀態封裝成為獨立的類,並將動作委托到代表當前狀態的對象,我們知道行為會隨著內部狀態而改變。 一個對象“看起來好像修改了它的類”是什麼意思呢?從客戶的視角來看:如果說你使用的對象能夠完全改變它的行為,那麼你 ...
  • 分散式架構設計:1、橫向分層,將系統在橫向維度上切分成幾個部分,每個部分負責一部分相對比較單一的職責,例如:MVC經典模式。2、縱向分割,將系統功能模塊歸類切分成幾個部分,包裝成高內聚低耦合的模塊單元,不同功能模塊直接通過遠程調用協助工作。3、緩存技術,將是改善系統性能的第一手段,從上到下依次:瀏覽 ...
  • 沒圖,別找了。。。 我在MyEclipse上從SVN中導項目,導下的項目跑不起來,發現tomcat的classes中是空文件夾。 以下是在網上找的其他方法: 1.確保:Project->build automatically 已經被選上。(預設選上) 2.project->clean項目,一般會重新 ...
  • 1. 知乎文章圖片爬取器之二博客背景 昨天寫了知乎文章圖片爬取器的一部分代碼,針對知乎問題的答案json進行了數據抓取,博客中出現了部分寫死的內容,今天把那部分信息調整完畢,並且將圖片下載完善到代碼中去。 首先,需要獲取任意知乎的問題,只需要你輸入問題的ID,就可以獲取相關的頁面信息,比如最重要的合 ...
  • 一、WebSocket與HTTP長輪詢 WebSocket 屬於HTML5 規範的一部分,提供的一種在單個 TCP 連接上進行全雙工通訊的協議。允許服務端主動向客戶端推送數據。在 WebSocket API 中,瀏覽器和伺服器只需要完成一次握手,兩者之間就直接可以創建持久性的連接,併進行雙向數據傳輸 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...