Akka-CQRS(7)- CQRS Reader Actor 示範

来源:https://www.cnblogs.com/tiger-xc/archive/2019/05/21/10897748.html
-Advertisement-
Play Games

我們在這篇通過一個具體CQRS-Reader-Actor的例子來示範akka-persistence的query端編程和應用。在前面的博客里我們設計了一個CQRS模式POS機程式的操作動作錄入過程,並示範瞭如何實現CQRS的寫端編程。現在我們可以根據這個例子來示範如何通過CQRS的讀端reader- ...


   我們在這篇通過一個具體CQRS-Reader-Actor的例子來示範akka-persistence的query端編程和應用。在前面的博客里我們設計了一個CQRS模式POS機程式的操作動作錄入過程,並示範瞭如何實現CQRS的寫端編程。現在我們可以根據這個例子來示範如何通過CQRS的讀端reader-actor讀取由writer-actor所寫入的操作事件並將二進位格式的事件恢覆成資料庫表的行數據。

首先看看reader是如何從cassandra資料庫里按順序讀出寫入事件的:cassandra-plugin提供了currentEventsByPersistenceId函數,使用方法如下:

  // obtain read journal by plugin id
  val readJournal =
    PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

  // issue query to journal
  val source: Source[EventEnvelope, NotUsed] =
    readJournal.currentEventsByPersistenceId("2022", 0, Long.MaxValue)

這個函數返回一個akka-stream的Source[EventEnvelope,_]類型,一個靜態Source。還有另外一個eventsByPersistenceId函數可以返回實時動態的akka-stream Source。我們可以runFold一個靜態的Source對流元素進行彙總形成一個集合: 

 

  // materialize stream, consuming events
  val futureActions: Future[List[Any]]  = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }

 

我們可以通過模式匹配pattern-matching把List元素分辨出來:

  implicit val system = ActorSystem("reader")
  implicit val ec = system.dispatcher
  implicit val mat = ActorMaterializer()

  futureActions.onComplete {
      case Success(acts) => acts.reverse.foreach {act => act match {
        case LogOned(txn) => println(s"LogOn: $txn" )
        case SalesLogged(txn) => println(s"LogSales: $txn")
        case _ => println("unkown action !!!!!")
      }}
      case Failure(exception) => println(exception.getMessage)
  }

試著運行一下得到下麵的輸出: 

LogOn: TxnItem(20190509,16:12:23,1001,0,1,6,8,1,0,0,0,,1001,,,,,,,)
LogSales: TxnItem(20190509,16:12:34,1001,0,2,0,0,0,1300,0,0,,005,hainan pineapple,02,Grocery,01,,01,Sunkist)
LogSales: TxnItem(20190509,16:12:35,1001,0,3,0,0,0,300,0,0,,004,demon banana,02,Grocery,01,,01,Sunkist)
LogSales: TxnItem(20190509,16:12:36,1001,0,4,0,0,0,1050,0,0,,002,red grape,02,Grocery,01,,01,Sunkist)
unkown action !!!!!
unkown action !!!!!

如此已經可以將寫操作中存入的事件恢覆成為Action類結構了。

好了,現在我們稍微認真點做個詳細的reader示範。回到我們的POS例子:如果我們調用以下寫端指令:

        posref ! POSMessage(1022, LogOn("1001", "123"))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, LogSales("0", "0", apple.code, 1,820))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, LogSales("0", "0", pineapple.code, 2, 1300))
        scala.io.StdIn.readLine()


        posref ! POSMessage(1022, Subtotal(0))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, Discount(DISCTYPE.duplicated,false,"001",5))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, LogSales("0", "0", banana.code, 1, 300))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, LogSales("0", "0", grape.code, 3, 1050))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, Subtotal(1))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, Discount(DISCTYPE.duplicated,true,"001",10))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, LogSales("0", "0", orage.code, 10, 350))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, Discount(DISCTYPE.duplicated,false,"001",10))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, Subtotal(0))
        scala.io.StdIn.readLine()

        posref ! POSMessage(1022, Suspend)
        scala.io.StdIn.readLine()

前面的博文里一再強調:CQRS的寫端應簡單直接地將指令存寫入資料庫,避免任何複雜的控製程序,儘量在讀端讀出指令、模擬處理過程中的狀態轉變處理,還原指令預期應產生的結果,在這個示範里就是正式的交易數據了。實際上我們在寫端已經做了些狀態轉變處理過程:就在updataState函數里: 

 def updateState(evt: Action, lastSeqNr: BigInt = 0)(implicit nodeAddress: NodeAddress, persistenceId: PID, state: VchStates, items: VchItems, curItem: TxnItem): (VchStates, VchItems) = {
    val (vs, vi) = updateStateImpl(evt, lastSeqNr)
    log.step(s"${nodeAddress.address}-${persistenceId.id} run updateState($evt, $lastSeqNr) with results state[$vs], txns[$vi].")
    (vs, vi)
  }

  def updateStateImpl(evt: Action, lastSeqNr: BigInt = 0)(implicit state: VchStates, items: VchItems, curItem: TxnItem): (VchStates, VchItems) = evt match {
    case LogOned(csr) => (state.copy(seq = state.seq + 1, opr = csr, jseq = lastSeqNr), items)
    case LogOffed => (state.copy(seq = state.seq + 1, opr = ""), items)
    case RefundOned => (state.copy(seq = state.seq + 1, refd = true), items)
    case RefundOffed => (state.copy(seq = state.seq + 1, refd = false), items)
    case VoidOned => (state.copy(seq = state.seq + 1, void = true), items)
    case VoidOffed => (state.copy(seq = state.seq + 1, void = false), items)
    case SuperOned(suser) => (state.copy(seq = state.seq + 1, su = suser), items)
    case SuperOffed => (state.copy(seq = state.seq + 1, su = ""), items)
    case MemberOned(num) => (state.copy(seq = state.seq + 1, mbr = num), items)
    case MemberOffed => (state.copy(seq = state.seq + 1, mbr = ""), items)


    case SalesLogged(_,_,_,_,_) => (state.copy(
      seq = state.seq + 1)
      , items.addItem(curItem))

    case Subtotaled(_) => (state.copy(
      seq = state.seq + 1)
      , items.addItem(curItem))

    case Discounted(_,_,_,_) => (state.copy(
      seq = state.seq + 1)
      , items.addItem(curItem))


    case PaymentMade(_,_,_) =>
      val due = if (items.totalSales > 0) items.totalSales - items.totalPaid else items.totalSales + items.totalPaid
      val bal = if (items.totalSales > 0) due - curItem.amount else due + curItem.amount
      (state.copy(
        seq = state.seq + 1,
        due = (if ((curItem.amount.abs + items.totalPaid.abs) >= items.totalSales.abs) false else true)
      )
        ,items.addItem(curItem.copy(
        salestype = SALESTYPE.ttl,
        price = due,
        amount = curItem.amount,
        dscamt = bal
      )))

    case VoucherNumed(_, tnum) =>
      val vi = items.copy(txnitems = items.txnitems.map { it => it.copy(num = tnum) })
      (state.copy(seq = state.seq + 1, num = tnum), vi)

    case EndVoucher(vnum) => (state.nextVoucher.copy(jseq = lastSeqNr + 1), VchItems())

    case _ => (state, items)
  }

在寫端的這個模擬處理結果是為了實時向客戶端反饋它發出指令後產生的結果,如: 

  case cmd @ LogSales(acct,sdpt,scode,sqty,sprice) if cmdFilter(cmd,sender()) => {
      log.step(s"${nodeAddress.address}-${persistenceId} running LogSales($acct,$sdpt,$scode,$sqty,$sprice) with state[$vchState],txns[$vchItems] ...")
      var pqty = sqty
      if (vchState.void || vchState.refd) pqty = -sqty
      log.step(s"${nodeAddress.address}-${persistenceId} run LogSales($acct,$sdpt,$scode,$sqty,$sprice).")
      persistEvent(SalesLogged(acct,sdpt,scode,sqty,sprice)) { evt =>
        curTxnItem = buildTxnItem(evt).copy(
          qty =pqty,
          amount = pqty * sprice
        )
        val sts = updateState(evt)
        vchState = sts._1
        vchItems = sts._2
        if (vchState.void) {
          val (lstTxns, found) = vchItems.txnitems.foldLeft((List[TxnItem](), false)) { case (b, ti) =>
            if (b._2)
              (ti :: b._1, true)
            else if (ti.txntype == TXNTYPE.sales && ti.salestype == SALESTYPE.itm && ti.acct == acct &&
              ti.dpt == sdpt && ti.code == scode && ti.qty == sqty && ti.price == sprice)
              (ti.copy(txntype = TXNTYPE.voided) :: b._1, true)
            else (ti :: b._1, false)

          }
          vchItems = vchItems.copy(txnitems = lstTxns.reverse)
          sender() ! POSResponse(STATUS.OK, s"終端-$persistenceId: 成功沖銷商品銷售。", vchState, List(vchItems.topTxnItem))
        } else {
          sender() ! POSResponse(STATUS.OK, s"終端-$persistenceId: 商品銷售操作成功。", vchState, List(vchItems.topTxnItem))
        }

      }
    }

上面這個POSResponse類實時把當前交易狀態返回給客戶端。而這個vchItems.topTxnItem正是一條正式的交易記錄,也就是我們需要在讀端還原的數據。所以,這個updateState在讀端還是需要調用的。先看看整體reader示範的程式結構: 

 

object ReaderDemo extends App {


  implicit val system = ActorSystem("reader")
  implicit val ec = system.dispatcher
  implicit val mat = ActorMaterializer()

  val cluster = Cluster(system)
  implicit val nodeAddress: NodeAddress = NodeAddress(cluster.selfAddress.toString)

  States.setShowSteps(true)

  Reader.readActions(0, Long.MaxValue,"1022")
  
  scala.io.StdIn.readLine()

  mat.shutdown()
  system.terminate()
}

object Reader extends LogSupport {

  def readActions(startSeq: Long, endSeq: Long, persistenceId: String)(implicit sys: ActorSystem, ec: ExecutionContextExecutor, mat: ActorMaterializer, nodeAddress: NodeAddress) = {
    implicit var vchState = VchStates()
    implicit var vchItems = VchItems()
    implicit var curTxnItem = TxnItem()
    implicit val pid = PID(persistenceId)
    // obtain read journal by plugin id
    val readJournal =
      PersistenceQuery(sys).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

    // issue query to journal
    val source: Source[EventEnvelope, NotUsed] =
      readJournal.currentEventsByPersistenceId(persistenceId, startSeq, endSeq)

    // materialize stream, consuming events
    val futureActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }

    futureActions.onComplete {
      case Success(txns) => 
        buildVoucher(txns)
//vchItems.txnitems里就是恢復的正式交易記錄。可以在這裡把它們寫入到業務資料庫
        println(vchItems.txnitems.reverse)
      case Failure(excpt) =>
    }


    def buildVoucher(actions: List[Any])= {

      actions.reverse.foreach { txn =>
        txn match {
          case EndVoucher(_) =>
            vchItems.txnitems.foreach(println)
          case ti@_ =>
            curTxnItem = buildTxnItem(ti.asInstanceOf[Action])
            val sts = updateState(ti.asInstanceOf[Action],0)
            vchState = sts._1
            vchItems = sts._2
        }
      }
    }
  }

}

 

順便提一下:這個updateState函數的其中一個主要功能是對集合vchItems.txnitems內的txnitem元素進行更改處理,但我們使用的是函數式編程模式的不可變集合(immutable collections),每次都需要對集合進行遍歷,如fold: 

    def subTotal: (Int, Int, Int, Int) = txnitems.foldRight((0, 0, 0, 0)) { case (txn, b) =>
      if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales)
        b.copy(_1 = b._1 + 1, _2 = b._2 + txn.qty, _3 = b._3 + txn.amount, _4 = b._4 + txn.dscamt)
      else b
    }

突然想到如果在一個很大的集合中只需要更新前面幾條數據怎麼辦?難道還是需要traverse所有元素嗎?在我們的例子里還真有這麼樣的需求: 

    def groupTotal(level:Int): (Int, Int, Int, Int) = {
      val gts = txnitems.foldLeftWhile((0, 0, 0, 0, 0)) { case (b,txn) =>
        if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales)
          ((b._1._1 +1,b._1._2 + txn.qty, b._1._3 + txn.amount, b._1._4 + txn.dscamt, b._1._5),false)
        else {
          if (txn.salestype == SALESTYPE.sub) {
            if (((b._1._5) + 1) >= level)
              ((b._1._1, b._1._2, b._1._3, b._1._4, b._1._5 + 1), true)
            else
              ((b._1._1, b._1._2, b._1._3, b._1._4, b._1._5 + 1), false)
          } else b
        }
      }
      (gts._1,gts._2,gts._3,gts._4)
    }

這是一個分層小記函數:在遍歷指令集時遇到另一條小記指令就終止小記運算,調用了foldLeftWhile函數來實現這樣的場景。foldLeftWhile是一個定製的函數: 

  implicit class FoldLeftWhile[A](trav: Seq[A]) {
    def foldLeftWhile[B](z: B)(op: ((B,Boolean), A) => (B, Boolean)): B = {
      def go(acc: (B, Boolean), l: Seq[A]): (B, Boolean) = l match {
        case h +: t =>
          val nacc = op(acc, h)
          if (!nacc._2)
            go(nacc, t)
          else
            nacc
        case _ => acc
      }
      go((z, false), trav)._1
    }
  }

下麵附上本次示範中的源代碼

build.sbt

name := "akka-cluster-reader"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies := Seq(
  "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19",
  "com.typesafe.akka" %% "akka-persistence" % "2.5.19",
  "com.typesafe.akka" %% "akka-persistence-query" % "2.5.19",
  "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.93",
  "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.93" % Test,
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3"
)

 resources/application.conf

akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off
akka.remote.use-passive-connections=off

akka {
  loglevel = INFO
  actor {
    provider = "cluster"
  }

  remote {
    log-remote-lifecycle-events = on
    netty.tcp {
      hostname = "192.168.11.189"
      port = 2551
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551"]

    log-info = off
    sharding {
      role = "shard"
      passivate-idle-entity-after = 10 m
    }
  }

  persistence {
    journal.plugin = "cassandra-journal"
    snapshot-store.plugin = "cassandra-snapshot-store"
  }

}

cassandra-journal {
  contact-points = ["192.168.11.189"]
}

cassandra-snapshot-store {
  contact-points = ["192.168.11.189"]
}

 message/Messages.scala

package datatech.cloud.pos

import java.time.LocalDate
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.Locale
import akka.cluster.sharding._

object Messages {

  val  dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.CHINA)

  sealed trait Command {}

  case class LogOn(opr: String, passwd: String) extends Command
  case object LogOff extends Command
  case class SuperOn(su: String, passwd: String) extends Command
  case object SuperOff extends Command
  case class MemberOn(cardnum: String, passwd: String) extends Command
  case object MemberOff extends Command   //remove member status for the voucher
  case object RefundOn extends Command
  case object RefundOff extends Command
  case object VoidOn extends Command
  case object VoidOff extends Command
  case object VoidAll extends Command
  case object Suspend extends Command

  case class VoucherNum(vnum: Int) extends Command



  case class LogSales(acct: String, dpt: String, code: String, qty: Int, price: Int) extends Command
  case class Subtotal(level: Int) extends Command
  case class Discount(disctype: Int, grouped: Boolean, code: String, percent: Int) extends Command

  case class Payment(acct: String, num: String, amount: Int) extends Command          //settlement   結算支付

  // read only command, no update event
  case class Plu(itemCode: String) extends Command  //read only
  case object GetTxnItems extends Command


  /* discount type */
  object DISCTYPE  {
    val duplicated: Int = 0
    val best: Int = 1
    val least: Int = 2
    val keep: Int = 3
  }

  /* result message returned to client on the wire */
  object TXNTYPE {
    val sales: Int = 0
    val refund: Int = 1
    val void: Int = 2
    val voided: Int = 3
    val voidall: Int = 4
    val subtotal: Int = 5
    val logon: Int = 6
    val supon: Int = 7       // super user on/off
    val suspend: Int = 8

  }

  object SALESTYPE {
    val itm: Int = 2
    val sub: Int = 10
    val ttl: Int = 11
    val dsc: Int = 12
    val crd: Int = 13
  }

  case class TxnItem(
                      txndate: String = LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd"))
                      ,txntime: String = LocalDateTime.now.format(dateTimeFormatter).substring(11)
                      ,opr: String = ""//工號
                      ,num: Int = 0 //銷售單號
                      ,seq: Int = 1 //交易序號
                      ,txntype: Int = TXNTYPE.sales//交易類型
                      ,salestype: Int = SALESTYPE.itm //銷售類型
                      ,qty: Int =  1 //交易數量
                      ,price: Int = 0 //單價(分)
                      ,amount: Int = 0 //碼洋(分)
                      ,disc: Int = 0   //折扣率 (%)
                      ,dscamt: Int = 0 //折扣額:負值  net實洋 = amount + dscamt
                      ,member: String = "" //會員卡號
                      ,code: String = "" //編號(商品、卡號...)
                      ,acct: String = "" //賬號
                      ,dpt: String = "" //部類
                    )
  object TxnItem {
    def apply(vs: VchStates): TxnItem = TxnItem().copy(
      opr = vs.opr,
      num = vs.num,
      seq = vs.seq,
      member = vs.mbr
    )
  }

  case class VchStatus( //操作狀態鎖留給前端維護
                        qty: Int = 1,
                        refund: Boolean = false,
                        void: Boolean = false)

  case class VchStates(
                        opr: String = "",      //收款員
                        jseq: BigInt = 0,      //begin journal sequence for read-side replay
                        num: Int = 0,          //當前單號
                        seq: Int = 0,          //當前序號
                        void: Boolean = false, //取消模式
                        refd: Boolean = false, //退款模式
                        due: Boolean = true,   //當前餘額
                        su: String = "",
                        mbr: String = ""
                      ) {

    def nextVoucher : VchStates = VchStates().copy(
      opr = this.opr,
      jseq = this.jseq + 1,
      num = this.num + 1
    )
  }


  object STATUS {
    val OK: Int = 0
    val FAIL: Int = -1
  }

  case class POSResponse (sts: Int, msg: String, voucher: VchStates, txnItems: List[TxnItem])

  /* message on the wire (exchanged message) */
  val shardName = "POSShard"

  case class POSMessage(id: Long, cmd: Command) {
    def shopId = id.toString.head.toString
    def posId = id.toString
  }

  val getPOSId: ShardRegion.ExtractEntityId =  {
    case posCommand: POSMessage => (posCommand.posId,posCommand.cmd)
  }
  val getShopId: ShardRegion.ExtractShardId = {
    case posCommand: POSMessage => posCommand.shopId
  }


  case object PassivatePOS    //passivate message
  case object FilteredOut
  case class DebugMode(debug: Boolean)
  case class NodeAddress(address: String)
  case class PID(id: String)

}

 States.scala

package datatech.cloud.pos
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter


import Messages._
import sdp.logging._

object Actions {


  implicit class FoldLeftWhile[A](trav: Seq[A]) {
    def foldLeftWhile[B](z: B)(op: ((B,Boolean), A) => (B, Boolean)): B = {
      def go(acc: (B, Boolean), l: Seq[A]): (B, Boolean) = l match {
        case h +: t =>
          val nacc = op(acc, h)
          if (!nacc._2)
            go(nacc, t)
          else
            nacc
        case _ => acc
      }
      go((z, false), trav)._1
    }
  }


  case class ReadActions(startSeq: Int, endSeq: Int, persistenceId: String)

  sealed trait Action {}
  case class LogOned(opr: String) extends Action
  case object LogOffed extends Action
  case class SuperOned(su: String) extends Action
  case object SuperOffed extends Action
  case class MemberOned(cardnum: String) extends Action
  case object MemberOffed extends Action   //remove member status for the voucher
  case object RefundOned extends Action
  case object RefundOffed extends Action
  case object VoidOned extends Action
  case object VoidOffed extends Action


  case class SalesLogged(acct: String, dpt: String, code: String, qty: Int, price: Int) extends Action
  case class Subtotaled(level: Int) extends Action
  case class Discounted(disctype: Int, grouped: Boolean, code: String, percent: Int) extends Action

  case class NewVoucher(vnum: Int) extends Action //新單, reminder for read-side to set new vnum
  case class EndVoucher(vnum: Int) extends Action //單據終結標示
  case object VoidVoucher extends Action


  case object SuspVoucher extends Action

  case class VoucherNumed(fnum: Int, tnum: Int) extends Action

  case class PaymentMade(acct: String, num: String, amount: Int) extends Action          //settlement   結算支付

}


object States extends LogSupport {
  import Actions._

  def setShowSteps(b: Boolean) = log.stepOn = b

  def buildTxnItem(evt: Action)(implicit vs: VchStates, vi: VchItems): TxnItem = evt match {
    case LogOned(op) => TxnItem(vs).copy(
      txntype = TXNTYPE.logon,
      salestype = SALESTYPE.crd,
      opr = op,
      code = op
    )
    case LogOffed => TxnItem(vs).copy(
      txntype = TXNTYPE.logon,
      salestype = SALESTYPE.crd
    )
    case SuperOned(su) => TxnItem(vs).copy(
      txntype = TXNTYPE.supon,
      salestype = SALESTYPE.crd,
      code = su
    )
    case SuperOffed => TxnItem(vs).copy(
      txntype = TXNTYPE.supon,
      salestype = SALESTYPE.crd
    )
    case MemberOned(cardnum) => TxnItem(vs).copy(
      txntype = TXNTYPE.sales,
      salestype = SALESTYPE.crd,
      member = cardnum
    )
    case MemberOffed => TxnItem(vs).copy(
      txntype = TXNTYPE.sales,
      salestype = SALESTYPE.crd
    )
    case RefundOned => TxnItem(vs).copy(
      txntype = TXNTYPE.refund
    )
    case RefundOffed => TxnItem(vs).copy(
      txntype = TXNTYPE.refund
    )
    case VoidOned => TxnItem(vs).copy(
      txntype = TXNTYPE.void
    )
    case VoidOffed => TxnItem(vs).copy(
      txntype = TXNTYPE.void
    )
    case VoidVoucher => TxnItem(vs).copy(
      txntype = TXNTYPE.voidall,
      code = vs.num.toString,
      acct = vs.num.toString
    )
    case SuspVoucher => TxnItem(vs).copy(
      txntype = TXNTYPE.suspend,
      code = vs.num.toString,
      acct = vs.num.toString
    )
    case Subtotaled(level) =>
      TxnItem(vs).copy(
        txntype = TXNTYPE.sales,
        salestype = SALESTYPE.sub
      )
    case Discounted(dt,gp,code,pct) => TxnItem(vs).copy(
      txntype = TXNTYPE.sales,
      salestype = SALESTYPE.dsc,
      acct = code,
      disc = pct
    )
    case PaymentMade(act,num,amt) => TxnItem(vs).copy(
      txntype = TXNTYPE.sales,
      salestype = SALESTYPE.ttl,
      acct = act,
      code = num,
      amount = amt
    )

    case SalesLogged(sacct,sdpt,scode,sqty,sprice)  => TxnItem(vs).copy(
      txntype = TXNTYPE.sales,
      salestype = SALESTYPE.itm,
      acct = sacct,
      dpt = sdpt,
      code = scode,
      qty = sqty,
      price = sprice,
      amount = sprice * sqty,
      dscamt = 0
    )
    case _ => TxnItem(vs)
  }

  case class VchItems(txnitems: List[TxnItem] = Nil) {

    def noSales: Boolean = (txnitems.find(txn => txn.salestype == SALESTYPE.itm)).isEmpty

    def subTotal: (Int, Int, Int, Int) = txnitems.foldRight((0, 0, 0, 0)) { case (txn, b) =>
      if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales)
        b.copy(_1 = b._1 + 1, _2 = b._2 + txn.qty, _3 = b._3 + txn.amount, _4 = b._4 + txn.dscamt)
      else b
    }

    def groupTotal(level:Int): (Int, Int, Int, Int) = {
      val gts = txnitems.foldLeftWhile((0, 0, 0, 0, 0)) { case (b,txn) =>
        if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales)
          ((b._1._1 +1,b._1._2 + txn.qty, b._1._3 + txn.amount, b._1._4 + txn.dscamt, b._1._5),false)
        else {
          if (txn.salestype == SALESTYPE.sub) {
            if (((b._1._5) + 1) >= level)
              ((b._1._1, b._1._2, b._1._3, b._1._4, b._1._5 + 1), true)
            else
              ((b._1._1, b._1._2, b._1._3, b._1._4, b._1._5 + 1), false)
          } else b
        }
      }
      (gts._1,gts._2,gts._3,gts._4)
    }

    def updateDisc(dt: Int, grouped: Boolean, disc: Int): (List[TxnItem],(Int,Int,Int,Int)) = {
      //(salestype,(cnt,qty,amt,dsc),hassub,list)
      val accu = txnitems.foldLeft((-1, (0,0,0,0), false, List[TxnItem]())) { case (b, txn) =>
        var discAmt = 0
        if ((b._1) < 0) {
          if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales) {
            if (txn.dscamt == 0)
              ((txn.salestype, (
                (b._2._1) + 1,
                (b._2._2) + txn.qty,
                (b._2._3) + txn.amount,
                (b._2._4) - (txn.amount * disc / 100)
                ), false, txn.copy(
                dscamt = - (txn.amount * disc / 100)) :: (b._4)))
            else {
              dt match {
                case DISCTYPE.duplicated =>
                  if (txn.dscamt != 0) {
                    ((txn.salestype, (
                      (b._2._1) + 1,
                      (b._2._2) + txn.qty,
                      (b._2._3) + txn.amount,
                      (b._2._4) - (txn.amount + txn.dscamt) * disc / 100
                      ), false, txn.copy(
                      dscamt = -(txn.amount + txn.dscamt) * disc / 100) :: (b._4)
                    ))
                  } else {
                    ((txn.salestype, (
                      (b._2._1) + 1,
                      (b._2._2) + txn.qty,
                      (b._2._3) + txn.amount,
                      (b._2._4) - txn.amount * disc / 100
                      ), false, txn.copy(
                      dscamt = -txn.amount * disc / 100) :: (b._4)
                    ))
                  }
                case DISCTYPE.keep => ((txn.salestype, (
                  (b._2._1) + 1,
                  (b._2._2) + txn.qty,
                  (b._2._3) + txn.amount,
                  (b._2._4) + txn.dscamt), false, txn :: (b._4)))
                case DISCTYPE.best =>
                  discAmt = -(txn.amount * disc / 100)
                  if (discAmt < txn.dscamt)
                    ((txn.salestype, (
                      (b._2._1) + 1,
                      (b._2._2) + txn.qty,
                      (b._2._3) + txn.amount,
                      (b._2._4) + discAmt), false, txn.copy(
                      dscamt = discAmt
                    ) :: (b._4)))
                  else
                    ((txn.salestype, (
                      (b._2._1) + 1,
                      (b._2._2) + txn.qty,
                      (b._2._3) + txn.amount,
                      (b._2._4) + txn.dscamt), false, txn :: (b._4)))
              }
            }

          } else ((b._1,b._2,b._3,txn :: (b._4)))
        } else {
          if ((b._3))
            (((b._1), (b._2), true, txn :: (b._4)))
          else {
            if (txn.salestype == SALESTYPE.sub) {
              if (grouped)
                (((b._1), (b._2), true, txn :: (b._4)))
              else
                (((b._1), (b._2), false, txn :: (b._4)))
            } else {
              if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales) {
                dt match {
                  case DISCTYPE.duplicated =>
                    if (txn.dscamt != 0) {
                      ((txn.salestype, (
                        (b._2._1) + 1,
                        (b._2._2) + txn.qty,
                        (b._2._3) + txn.amount,
                        (b._2._4) - (txn.amount + txn.dscamt) * disc / 100), false, txn.copy(
                        dscamt = -(txn.amount + txn.dscamt) * disc / 100) :: (b._4)
                      ))
                    } else {
                      ((txn.salestype, (
                        (b._2._1) + 1,
                        (b._2._2) + txn.qty,
                        (b._2._3) + txn.amount,
                        (b._2._4) - txn.amount * disc / 100), false, txn.copy(
                        dscamt = -(txn.amount * disc / 100)) :: (b._4)
                      ))
                    }
                  case DISCTYPE.keep => ((txn.salestype, (
                    (b._2._1) + 1,
                    (b._2._2) + txn.qty,
                    (b._2._3) + txn.amount,
                    (b._2._4) + txn.dscamt), false, txn :: (b._4)))
                  case DISCTYPE.best =>
                    discAmt = -(txn.amount * disc / 100)
                    if (discAmt < txn.dscamt)
                      ((txn.salestype, (
                        (b._2._1) + 1,
                        (b._2._2) + txn.qty,
                        (b._2._3) + txn.amount,
                        (b._2._4) + discAmt), false, txn.copy(
                        dscamt = discAmt
                      ) :: (b._4)))
                    else
                      ((txn.salestype, (
                        (b._2._1) + 1,
                        (b._2._2) + txn.qty,
                        (b._2._3) + txn.amount,
                        (b._2._4) + txn.dscamt), false, txn :: (b._4)))
                }
              }
              else ((b._1, b._2, b._3, txn :: (b._4)))
            }
          }
        }

      }
      (accu._4.reverse,accu._2)
    }

    def totalSales: Int = txnitems.foldRight(0) { case (txn, b) =>
      if (txn.salestype == SALESTYPE.itm)
        (txn.amount + txn.dscamt) + b
      else b

      /*
            val amt: Int = txn.salestype match {
              case (SALESTYPE.plu | SALESTYPE.cat | SALESTYPE.brd | SALESTYPE.ra) => txn.amount + txn.dscamt
              case _ => 0
            }
            amt + b */
    }

    def totalPaid: Int = txnitems.foldRight(0) { case (txn, b) =>
      if (txn.txntype == TXNTYPE.sales && txn.salestype == SALESTYPE.ttl)
        txn.amount + b
      else b
    }

    def addItem(item: TxnItem): VchItems = VchItems((item :: txnitems)) //.reverse)

  }

  def LastSecOfDate(ldate: LocalDate): LocalDateTime = {
    val dtStr = ldate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + " 23:59:59"
    LocalDateTime.parse(dtStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
  }

  def dateStr(dt: LocalDate): String = dt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))

  def updateState(evt: Action, lastSeqNr: BigInt = 0)(implicit nodeAddress: NodeAddress, persistenceId: PID, state: VchStates, items: VchItems, curItem: TxnItem): (VchStates, VchItems) = {
    val (vs, vi) = updateStateImpl(evt, lastSeqNr)
    log.step(s"${nodeAddress.address}-${persistenceId.id} run updateState($evt, $lastSeqNr) with results state[$vs], txns[$vi].")
    (vs, vi)
  }

  def updateStateImpl(evt: Action, lastSeqNr: BigInt = 0)(implicit state: VchStates, items: VchItems, curItem: TxnItem): (VchStates, VchItems) = evt match {
    case LogOned(csr) => (state.copy(seq = state.seq + 1, opr = csr, jseq = lastSeqNr), items)
    case LogOffed => (state.copy(seq = state.seq + 1, opr = ""), items)
    case RefundOned => (state.copy(seq = state.seq + 1, refd = true), items)
    case RefundOffed => (state.copy(seq = state.seq + 1, refd = false), items)
    case VoidOned => (state.copy(seq = state.seq + 1, void = true), items)
    case VoidOffed => (state.copy(seq = state.seq + 1, void = false), items)
    case SuperOned(suser) => (state.copy(seq = state.seq + 1, su = suser), items)
    case SuperOffed => (state.copy(seq = state.seq + 1, su = ""), items)
    case MemberOned(num) => (state.copy(seq = state.seq + 1, mbr = num), items)
    case MemberOffed => (state.copy(seq = state.seq + 1, mbr = ""), items)


    case SalesLogged(_,_,_,_,_) => (state.copy(
      seq = state.seq + 1)
      , items.addItem(curItem))

    case Subtotaled(level) =>
      var subs = (0,0,0,0)
      if (level == 0)
        subs = items.subTotal
      else
        subs = items.groupTotal(level)
      val (cnt, tqty, tamt, tdsc) = subs

      val subttlItem =
        TxnItem(state).copy(
          txntype = TXNTYPE.sales,
          salestype = SALESTYPE.sub,
          qty = tqty,
          amount = tamt,
          dscamt = tdsc,
          price = cnt
        )
      (state.copy(
      seq = state.seq + 1)
      , items.addItem(subttlItem))

    case Discounted(dt,gp,code,pct) =>
      val (lstItems, accum) = items.updateDisc(dt,gp,pct)
      val discItem = TxnItem(state).copy(
        txntype = TXNTYPE.sales,
        salestype = SALESTYPE.dsc,
        acct = code,
        disc = pct,
        price = accum._1,
        qty = accum._2,
        amount = accum._3,
        dscamt = accum._4
      )
      (state.copy(
      seq = state.seq + 1)
      , items.copy(txnitems = lstItems).addItem(discItem))


    case PaymentMade(_,_,_) =>
      val due = if (items.totalSales > 0) items.totalSales - items.totalPaid else items.totalSales + items.totalPaid
      val bal = if (items.totalSales > 0) due - curItem.amount else due + curItem.amount
      (state.copy(
        seq = state.seq + 1,
        due = (if ((curItem.amount.abs + items.totalPaid.abs) >= items.totalSales.abs) false else true)
      )
        ,items.addItem(curItem.copy(
        salestype = SALESTYPE.ttl,
        price = due,
        amount = curItem.amount,
        dscamt = bal
      )))

    case VoucherNumed(_, tnum) =>
      val vi = items.copy(txnitems = items.txnitems.map { it => it.copy(num = tnum) })
      (state.copy(seq = state.seq + 1, num = tnum), vi)

    case EndVoucher(vnum) => (state.nextVoucher.copy(jseq = lastSeqNr + 1), VchItems())

    case _ => (state, items)
  }


}

 Reader.scala

package datatech.cloud.pos
import akka.actor._
import akka.stream.scaladsl._

import scala.util._
import akka._
import akka.persistence.query._
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal

import scala.concurrent._
import akka.stream._
import sdp.logging._
import Actions._
import States._
import Messages._
import akka.cluster._

class Reader extends Actor with LogSupport {
import Reader._
  val cluster = Cluster(context.system)
  implicit val nodeAddress: NodeAddress = NodeAddress(cluster.selfAddress.toString)
  override def receive: Receive = {
    case ReadActions(si, ei, pid) => readActions(si,ei, pid)(context.system, context.dispatcher, ActorMaterializer(),nodeAddress)
  }
}

object Reader extends LogSupport {

  def readActions(startSeq: Long, endSeq: Long, persistenceId: String)(implicit sys: ActorSystem, ec: ExecutionContextExecutor, mat: ActorMaterializer, nodeAddress: NodeAddress) = {
    implicit var vchState = VchStates()
    implicit var vchItems = VchItems()
    implicit var curTxnItem = TxnItem()
    implicit val pid = PID(persistenceId)
    // obtain read journal by plugin id
    val readJournal =
      PersistenceQuery(sys).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

    // issue query to journal
    val source: Source[EventEnvelope, NotUsed] =
      readJournal.currentEventsByPersistenceId(persistenceId, startSeq, endSeq)

    // materialize stream, consuming events
    val futureActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }

    futureActions.onComplete {
      case Success(txns) => buildVoucher(txns)
      case Failure(excpt) =>
    }


    def buildVoucher(actions: List[Any])= {

      actions.reverse.foreach { txn =>
        txn match {
          case EndVoucher(_) =>
            vchItems.txnitems.foreach(println)
          case ti@_ =>
            curTxnItem = buildTxnItem(ti.asInstanceOf[Action])
            val sts = updateState(ti.asInstanceOf[Action],0)
            vchState = sts._1
            vchItems = sts._2
        }
      }
    }
  }

}

 ReaderDemo.scala

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.cluster._

import scala.concurrent._
import scala.util._
import akka._
import akka.persistence.query._
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import datatech.cloud.pos._
import Actions._
import Messages._
import Reader._
import sdp.logging.LogSupport



object ReaderDemo extends App {


  implicit val system = ActorSystem("reader")
  implicit val ec = system.dispatcher
  implicit val mat = ActorMaterializer()

  val cluster = Cluster(system)
  implicit val nodeAddress: NodeAddress = NodeAddress(cluster.selfAddress.toString)

  States.setShowSteps(true)

  Reader.readActions(0, Long.MaxValue,"1022")

  scala.io.StdIn.readLine()

  mat.shutdown()
  system.terminate()
}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 何為質數: 只能被1 和 自身 整除的數; 方法: 利用js中求模, 看是否有餘數. > 3%2 = 1; 5%2 = 3......... 代碼如下: 以上方法是為判斷一個數是否為質數; 那如何判斷1到任意數之間的所有質數呢, 就比較簡單; 代碼如下: 到此完結. 核心test方法, 遍歷時, 碰 ...
  • 問題描述: 前些天在練習寫一個小例子的時候用到了定時器,發現在setInterval和setTimeout中傳入函數時,函數中的this會指向window對象,如下例: var num = 0;function Obj (){ this.num = 1, this.getNum = function ...
  • 上一篇理解Express的使用之後, 再總結一篇Express中間件的簡單實現原理。 我們知道Express中間件就是一個個的函數, 那麼怎麼讓這些函數有序的執行呢? 那就需要我們調用 函數。其實 函數調用的就是下一個中間件函數。 以下代碼實現了簡單的 註冊中間件, 以及 、`post`方式的中間件 ...
  • Map 對象保存鍵值對。任何值(對象或者原始值) 都可以作為一個鍵或一個值。構造函數 可以接受一個數組作為參數。 Map和Object的區別 一個 的鍵只能是字元串或者 ,但一個 的鍵可以是任意值。 中的鍵值是有序的(FIFO 原則),而添加到對象中的鍵則不是。 的鍵值對個數可以從 size 屬性獲 ...
  • 1.代碼生成器: [正反雙向](單表、主表、明細表、樹形表,快速開發利器)freemaker模版技術 ,0個代碼不用寫,生成完整的一個模塊,帶頁面、建表sql腳本、處理類、service等完整模塊2.多數據源:(支持同時連接無數個資料庫,可以不同的模塊連接不同數的據庫)支持N個數據源3.阿裡資料庫連 ...
  • 博主剛剛接觸web開發的時候,寫了一個介面 /get_article_info/1 獲取id為1的這篇文章的內容,被前輩們看見了,前輩給我說我這個介面設計的不太好啊,不符合RESTFUL規範,當前輩們說出這些話的時候,我很迷惑,我寫的介面不能夠好好工作嗎?能夠正常返回內容啊,對於不存在的文章也能夠在 ...
  • 這篇主要介紹 相關內容,主要是一些基本概念普及。 代理模式 1、什麼是代理模式? ,為其他對象提供一種代理以控制對這個對象的訪問。[DP] 通俗的說就是指客戶端並不直接調用實際的對象,而是通過代理對象,來間接調用實際的對象。 2、實現原理 可以是介面,也可以是抽象類 內部含有對真實對象RealSub ...
  • [TOC] 前言 很久沒有寫博客了,一直給自己找藉口說太忙了,過幾天有空再寫,幾天之後又幾天,時間就這麼快速的消逝。說到底就是自己太懶了,不下點決心真是不行。我決定逼自己一把,從今天開始學習設計模式系列,並寫成博文記錄下來,做不到的話,就罰自己一個月不玩游戲 (作孽啊。。。。) 六大原則 言歸正傳, ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...