Akka-CQRS(5)- CQRS Writer Actor 部署和測試

上篇我們做了一個WriterActor的例子,主要目的是示範WriterActor如何作為集群分片用persistentActor特性及event-sourcing模式實現CQRS的寫功能。既然是集群分片,那麼我們就在這篇講講WriterActor的部署和測試,因為這個裡面還是有些值得註意的地方。下 ...


      typeName = shardName,
      entityProps = writerProps,
      settings = cpsSettings,
      extractEntityId = getPOSId,
      extractShardId = getShopId,
      allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings),
      handOffStopMessage = PassivatePOS



object POSRouter extends LogSupport {
  def main(args: Array[String]) {
    import WriterActor._
    import Commands._

    val argsPat = "(.*):(.*)".r
    val (host, port) = args(0) match {
      case argsPat(h, p) => (h, p)
      case _ => ("localhost", "2551")

    val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=\"" + port + "\"")
      .withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"" + host + "\""))
      //roles can be deployed on this node
      .withFallback(ConfigFactory.parseString("akka.cluster.roles = [poswriter]"))

    log.info(s"******* hostname = $host,  port = $port *******")

    val shardName = "POSShard"

    case class POSMessage(id: Long, cmd: POSCommand) {
      def shopId = id.toString.head.toString

      def posId = id.toString

    val getPOSId: ShardRegion.ExtractEntityId = {
      case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
    val getShopId: ShardRegion.ExtractShardId = {
      case posCommand: POSMessage => posCommand.shopId

    val system = ActorSystem("cloud-pos-server", config)
    val role = "poswriter"    //role of this shard
    val cpsSettings = ClusterShardingSettings(system).withRole(role) //.withPassivateIdleAfter(10 minutes)

      typeName = shardName,
      entityProps = writerProps,
      settings = cpsSettings,
      extractEntityId = getPOSId,
      extractShardId = getShopId,
      allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings),
      handOffStopMessage = PassivatePOS

    system.actorOf(ClusterMonitor.props, "cps-cluster-monitor")




package sdp.cluster.monitor

import akka.actor._
import akka.cluster.ClusterEvent._
import akka.cluster._
import sdp.logging.LogSupport

object ClusterMonitor {
  def props = Props(new ClusterMonitor)

class ClusterMonitor extends Actor with LogSupport {
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self,initialStateMode = InitialStateAsEvents
      ,classOf[MemberEvent],classOf[UnreachableMember])  //訂閱集群狀態轉換信息

  override def postStop(): Unit = {
    cluster.unsubscribe(self)    //取消訂閱

  override def receive: Receive = {
    case MemberJoined(member) =>
      log.info(s"Member is Joining: {${member.address}}")
    case MemberUp(member) =>
      log.info(s"Member is Up: {${member.address}}")
    case MemberLeft(member) =>
      log.info(s"Member is Leaving: {${member.address}}")
    case MemberExited(member) =>
      log.info(s"Member is Exiting: {${member.address}}")
    case MemberRemoved(member, previousStatus) =>
        s"Member is Removed: {${member.address}} after {${previousStatus}")
    case UnreachableMember(member) =>
      log.info(s"Member detected as unreachable: {${member.address}}")
      cluster.down(member.address)      //手工驅除,不用auto-down
    case _: MemberEvent => // ignore




    case class POSMessage(id: Long, cmd: POSCommand) {
      def shopId = id.toString.head.toString

      def posId = id.toString

    val getPOSId: ShardRegion.ExtractEntityId = {
      case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
    val getShopId: ShardRegion.ExtractShardId = {
      case posCommand: POSMessage => posCommand.shopId


   //no shard deployed on this node  2558, use proxy
    val posHandler = ClusterSharding(system).startProxy(
      typeName = shardName,
      role = Some("poswriter"),
      extractEntityId = getPOSId,
      extractShardId = getShopId

    //val posHandler = ClusterSharding(system).shardRegion(shardName)

    system.actorOf(POSClient.props(posHandler), "pos-client")


object POSClient {
  def props(pos: ActorRef) = Props(new POSClient(pos))
class POSClient(posHandler: ActorRef)  extends Actor with LogSupport {

  override def receive: Receive = {
    case msg @ POSMessage(_,_) => posHandler ! msg
    case resp: POSResponse  =>
      log.info(s"response from server: $resp")


    val posref = system.actorOf(POSClient.props(posHandler), "pos-client")
    posref ! POSMessage(1021, LogSales(SALESTYPE.plu, "", apple.code, 1, 0))
    posref ! POSMessage(2021, LogSales(SALESTYPE.plu, "", pineapple.code, 2, 0))
    posref ! POSMessage(3021, LogSales(SALESTYPE.plu, "", banana.code, 1, 0))
    posref ! POSMessage(4021, LogSales(SALESTYPE.plu, "", grape.code, 3, 0))
    posref ! POSMessage(4021,Subtotal)




akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off

akka {
  loglevel = INFO
  actor {
    provider = "cluster"

  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = ""
      port = 0

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551"]
    log-info = off
    sharding {
      role = "poswriter"
      passivate-idle-entity-after = 5 m

  persistence {
    journal.plugin = "cassandra-journal"
    snapshot-store.plugin = "cassandra-snapshot-store"


cassandra-journal {
  contact-points = [""]

cassandra-snapshot-store {
  contact-points = [""]


package cloud.pos.server

import akka.actor._
import akka.cluster.sharding._
import akka.cluster.sharding.ClusterSharding
import com.typesafe.config.ConfigFactory
import sdp.cluster.monitor._
import sdp.logging._

object POSRouter extends LogSupport {
  def main(args: Array[String]) {
    import WriterActor._
    import Commands._

    val argsPat = "(.*):(.*)".r
    val (host, port) = args(0) match {
      case argsPat(h, p) => (h, p)
      case _ => ("localhost", "2551")

    val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=\"" + port + "\"")
      .withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"" + host + "\""))
      //roles can be deployed on this node
      .withFallback(ConfigFactory.parseString("akka.cluster.roles = [poswriter]"))

    log.info(s"******* hostname = $host,  port = $port *******")

    val shardName = "POSShard"

    case class POSMessage(id: Long, cmd: POSCommand) {
      def shopId = id.toString.head.toString

      def posId = id.toString

    val getPOSId: ShardRegion.ExtractEntityId = {
      case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
    val getShopId: ShardRegion.ExtractShardId = {
      case posCommand: POSMessage => posCommand.shopId

    val system = ActorSystem("cloud-pos-server", config)
    val role = "poswriter"    //role of this shard
    val cpsSettings = ClusterShardingSettings(system).withRole(role) //.withPassivateIdleAfter(10 minutes)

      typeName = shardName,
      entityProps = writerProps,
      settings = cpsSettings,
      extractEntityId = getPOSId,
      extractShardId = getShopId,
      allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings),
      handOffStopMessage = PassivatePOS

    system.actorOf(ClusterMonitor.props, "cps-cluster-monitor")




name := "cloud-pos-client"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies := Seq(
  "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19",
  "com.typesafe.akka" %% "akka-persistence" % "2.5.19",
  "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.93",
  "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.93" % Test,
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3"


akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off

akka {
  loglevel = INFO
  actor {
    provider = "cluster"

  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = ""
      port = 2558

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551"]
    log-info = off



<?xml version="1.0" encoding="UTF-8"?>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
                %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n

    <root level="debug">
        <appender-ref ref="STDOUT" />


package cloud.pos.client
import akka.actor._
import akka.cluster.sharding.ClusterSharding
import sdp.cluster.monitor._
import sdp.logging._
import Commands._
import States._
import Items._
import akka.cluster.sharding._

object POSClientDemo extends LogSupport {
  def main(args: Array[String]) {

    val system = ActorSystem("cloud-pos-server")

    val shardName = "POSShard"

    val getPOSId: ShardRegion.ExtractEntityId = {
      case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
    val getShopId: ShardRegion.ExtractShardId = {
      case posCommand: POSMessage => posCommand.shopId

    //no shard deployed on this node  2558, use proxy
    val posHandler = ClusterSharding(system).startProxy(
      typeName = shardName,
      role = Some("poswriter"),
      extractEntityId = getPOSId,
      extractShardId = getShopId

    //val posHandler = ClusterSharding(system).shardRegion(shardName)

    system.actorOf(ClusterMonitor.props, "cps-cluster-monitor")

    val posref = system.actorOf(POSClient.props(posHandler), "pos-client")

    posref ! POSMessage(1021, LogSales(SALESTYPE.plu, "", apple.code, 1, 0))
    posref ! POSMessage(2021, LogSales(SALESTYPE.plu, "", pineapple.code, 2, 0))
    posref ! POSMessage(3021, LogSales(SALESTYPE.plu, "", banana.code, 1, 0))
    posref ! POSMessage(4021, LogSales(SALESTYPE.plu, "", grape.code, 3, 0))
    posref ! POSMessage(4021,Subtotal)





package cloud.pos.client

object Commands {

  sealed trait POSCommand {}

  case class LogOn(opr: String, passwd: String) extends POSCommand
  case object LogOff extends POSCommand
  case class SuperOn(su: String, passwd: String) extends POSCommand
  case object SuperOff extends POSCommand
  case class MemberOn(cardnum: String, passwd: String) extends POSCommand
  case object MemberOff extends POSCommand   //remove member status for the voucher
  case object RefundOn extends POSCommand
  case object RefundOff extends POSCommand
  case object VoidOn extends POSCommand
  case object VoidOff extends POSCommand
  case object VoidAll extends POSCommand
  case object Suspend extends POSCommand

  case class VoucherNum(vnum: Int) extends POSCommand

  case class LogSales(salesType: Int, dpt: String, code: String, qty: Int, price: Int) extends POSCommand
  case object Subtotal extends POSCommand
  case class Discount(code: String, percent: Int) extends POSCommand

  case class OfflinePay(acct: String, num: String, amount: Int) extends POSCommand          //settlement   結算支付
  //read only command, no event process
  case class VCBalance(acct: String, num: String, passwd: String) extends POSCommand
  case class VCPay(acct: String, num: String, passwd: String, amount: Int) extends POSCommand
  case class AliPay(acct: String, num: String, amount: Int) extends POSCommand
  case class WxPay(acct: String, num: String, amount: Int) extends POSCommand

  // read only command, no update event
  case class Plu(itemCode: String) extends POSCommand  //read only

  case class POSMessage(id: Long, cmd: POSCommand) {
    def shopId = id.toString.head.toString
    def posId = id.toString



package cloud.pos.client

object States {

  object TXNTYPE {
    val sales: Int = 0
    val refund: Int = 1
    val void: Int = 2
    val voided: Int = 3
    val voidall: Int = 4
    val subtotal: Int = 5
    val logon: Int = 6
    val supon: Int = 7       // super user on/off
    val suspend: Int = 8


  object SALESTYPE {
    val plu: Int = 0
    val dpt: Int = 1
    val cat: Int = 2
    val brd: Int = 3
    val ra:  Int = 4
    val sub: Int = 5
    val ttl: Int = 6
    val dsc: Int = 7
    val crd: Int = 8

  case class TxnItem(
                      txndate: String = ""
                      ,txntime: String = ""
                      ,opr: String = ""//工號
                      ,num: Int = 0 //銷售單號
                      ,seq: Int = 1 //交易序號
                      ,txntype: Int = TXNTYPE.sales//交易類型
                      ,salestype: Int = SALESTYPE.plu //銷售類型
                      ,qty: Int =  1 //交易數量
                      ,price: Int = 0 //單價(分)
                      ,amount: Int = 0 //碼洋(分)
                      ,dscamt: Int = 0 //折扣:負值  net實洋 = amount + dscamt
                      ,member: String = "" //會員卡號
                      ,code: String = "" //編號(商品、賬號...)
                      ,desc: String = "" //項目名稱
                      ,dpt: String = ""
                      ,department: String = ""
                      ,cat: String = ""
                      ,category: String = ""
                      ,brd: String = ""
                      ,brand: String = ""

  case class VchStatus( //操作狀態鎖留給前端維護
                        qty: Int = 1,
                        refund: Boolean = false,
                        void: Boolean = false)

  case class VchStates(
                        opr: String = "",      //收款員
                        jseq: BigInt = 0,      //begin journal sequence for read-side replay
                        num: Int = 0,          //當前單號
                        seq: Int = 1,          //當前序號
                        void: Boolean = false, //取消模式
                        refd: Boolean = false, //退款模式
                        due: Boolean = true,   //當前餘額
                        su: String = "",
                        mbr: String = ""



package cloud.pos.client

import akka.actor._
import sdp.logging._
import Responses._
import Commands._

object POSClient {
  def props(pos: ActorRef) = Props(new POSClient(pos))
class POSClient(posHandler: ActorRef)  extends Actor with LogSupport {

  override def receive: Receive = {
    case msg @ POSMessage(_,_) => posHandler ! msg
    case resp: POSResponse  =>
      log.info(s"response from server: $resp")


package cloud.pos.client

import States._
object Responses {

  object STATUS {
    val OK: Int = 0
    val FAIL: Int = -1

  case class POSResponse (sts: Int, msg: String, voucher: VchStates, txnItems: List[TxnItem])


package cloud.pos.client

import java.time.LocalDate
import java.time.format.DateTimeFormatter

case class Item(
                 brd: String
                 ,dpt: String
                 ,cat: String
                 ,code: String
                 ,name: String
                 ,price: Int

object Items {
  val apple = Item("01","02","01","001", "green apple", 820)
  val grape = Item("01","02","01","002", "red grape", 1050)
  val orage = Item("01","02","01","003", "sunkist orage", 350)
  val banana = Item("01","02","01","004", "demon banana", 300)
  val pineapple = Item("01","02","01","005", "hainan pineapple", 1300)
  val peach = Item("01","02","01","006", "xinjiang peach", 2390)

  val tblItems = List(apple, grape, orage, banana, pineapple, peach)

  sealed trait QueryItemsResult {}

  case class QueryItemsOK(items: List[Item]) extends QueryItemsResult

  case class QueryItemsFail(msg: String) extends QueryItemsResult


object Codes {
  case class User(code: String, name: String, passwd: String)
  case class Department(code: String, name: String)
  case class Category(code: String, name: String)
  case class Brand(code: String, name: String)
  case class Ra(code: String, name: String)
  case class Account(code: String, name: String)
  case class Disc(code: String, best: Boolean, aggr: Boolean, group: Boolean)

  val ras = List(Ra("01","Delivery"),Ra("02","Cooking"))
  val dpts = List(Department("01","Fruit"),Department("02","Grocery"))
  val cats = List(Category("0101","Fresh Fruit"),Category("0201","Dry Grocery"))
  val brds = List(Brand("01","Sunkist"),Brand("02","Demon"))
  val accts = List(Account("001","Cash"),Account("002","Value Card"), Account("003", "Visa")

  val users = List(User("1001","Tiger", "123"),User("1002","John", "123"),User("1003","Maria", "123"))

  def getDpt(code: String) = dpts.find(d => d.code == code)
  def getCat(code: String) = cats.find(d => d.code == code)
  def getBrd(code: String) = brds.find(b => b.code == code)
  def getAcct(code: String) = accts.find(a => a.code == code)
  def getRa(code: String) = ras.find(a => a.code == code)

object DAO {
  import Items._
  import Codes._

  def getItem(code: String): QueryItemsResult = {
    val optItem = tblItems.find(it => it.code == code)
    optItem match {
      case Some(item) => QueryItemsOK(List(item))
      case None => QueryItemsFail("Invalid item code!")

  def validateDpt(code: String) = dpts.find(d => d.code == code)
  def validateCat(code: String) = cats.find(d => d.code == code)
  def validateBrd(code: String) = brds.find(b => b.code == code)
  def validateRa(code: String) = ras.find(ac => ac.code == code)
  def validateAcct(code: String) = accts.find(ac => ac.code == code)

  def validateUser(userid: String, passwd: String) = users.find(u => (u.code == userid && u.passwd == passwd))

  def lastSecOfDateStr(ldate: LocalDate): String = {
    ldate.format(DateTimeFormatter.ofPattern( "yyyy-MM-dd"))+" 23:59:59"



package sdp.logging

import org.slf4j.Logger

  * Logger which just wraps org.slf4j.Logger internally.
  * @param logger logger
class Log(logger: Logger) {

  // use var consciously to enable squeezing later
  var isDebugEnabled: Boolean = logger.isDebugEnabled
  var isInfoEnabled: Boolean = logger.isInfoEnabled
  var isWarnEnabled: Boolean = logger.isWarnEnabled
  var isErrorEnabled: Boolean = logger.isErrorEnabled

  def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
    level match {
      case 'debug | 'DEBUG => debug(msg)
      case 'info | 'INFO => info(msg)
      case 'warn | 'WARN => warn(msg)
      case 'error | 'ERROR => error(msg)
      case _ => // nothing to do

  def debug(msg: => String): Unit = {
    if (isDebugEnabled && logger.isDebugEnabled) {

  def debug(msg: => String, e: Throwable): Unit = {
    if (isDebugEnabled && logger.isDebugEnabled) {
      logger.debug(msg, e)

  def info(msg: => String): Unit = {
    if (isInfoEnabled && logger.isInfoEnabled) {

  def info(msg: => String, e: Throwable): Unit = {
    if (isInfoEnabled && logger.isInfoEnabled) {
      logger.info(msg, e)

  def warn(msg: => String): Unit = {
    if (isWarnEnabled && logger.isWarnEnabled) {

  def warn(msg: => String, e: Throwable): Unit = {
    if (isWarnEnabled && logger.isWarnEnabled) {
      logger.warn(msg, e)

  def error(msg: => String): Unit = {
    if (isErrorEnabled && logger.isErrorEnabled) {

  def error(msg: => String, e: Throwable): Unit = {
    if (isErrorEnabled && logger.isErrorEnabled) {
      logger.error(msg, e)



package sdp.logging

import org.slf4j.LoggerFactory

trait LogSupport {

    * Logger
  protected val log = new Log(LoggerFactory.getLogger(this.getClass))



package sdp.cluster.monitor

import akka.actor._
import akka.cluster.ClusterEvent._
import akka.clust


