我們在這篇通過一個具體CQRS-Reader-Actor的例子來示範akka-persistence的query端編程和應用。在前面的博客里我們設計了一個CQRS模式POS機程式的操作動作錄入過程,並示範瞭如何實現CQRS的寫端編程。現在我們可以根據這個例子來示範如何通過CQRS的讀端reader- ...
我們在這篇通過一個具體CQRS-Reader-Actor的例子來示範akka-persistence的query端編程和應用。在前面的博客里我們設計了一個CQRS模式POS機程式的操作動作錄入過程,並示範瞭如何實現CQRS的寫端編程。現在我們可以根據這個例子來示範如何通過CQRS的讀端reader-actor讀取由writer-actor所寫入的操作事件並將二進位格式的事件恢覆成資料庫表的行數據。
首先看看reader是如何從cassandra資料庫里按順序讀出寫入事件的:cassandra-plugin提供了currentEventsByPersistenceId函數,使用方法如下:
// obtain read journal by plugin id
val readJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
readJournal.currentEventsByPersistenceId("2022", 0, Long.MaxValue)
這個函數返回一個akka-stream的Source[EventEnvelope,_]類型,一個靜態Source。還有另外一個eventsByPersistenceId函數可以返回實時動態的akka-stream Source。我們可以runFold一個靜態的Source對流元素進行彙總形成一個集合:
// materialize stream, consuming events
val futureActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }
我們可以通過模式匹配pattern-matching把List元素分辨出來:
implicit val system = ActorSystem("reader")
implicit val ec = system.dispatcher
implicit val mat = ActorMaterializer()
futureActions.onComplete {
case Success(acts) => acts.reverse.foreach {act => act match {
case LogOned(txn) => println(s"LogOn: $txn" )
case SalesLogged(txn) => println(s"LogSales: $txn")
case _ => println("unkown action !!!!!")
}}
case Failure(exception) => println(exception.getMessage)
}
試著運行一下得到下麵的輸出:
LogOn: TxnItem(20190509,16:12:23,1001,0,1,6,8,1,0,0,0,,1001,,,,,,,)
LogSales: TxnItem(20190509,16:12:34,1001,0,2,0,0,0,1300,0,0,,005,hainan pineapple,02,Grocery,01,,01,Sunkist)
LogSales: TxnItem(20190509,16:12:35,1001,0,3,0,0,0,300,0,0,,004,demon banana,02,Grocery,01,,01,Sunkist)
LogSales: TxnItem(20190509,16:12:36,1001,0,4,0,0,0,1050,0,0,,002,red grape,02,Grocery,01,,01,Sunkist)
unkown action !!!!!
unkown action !!!!!
如此已經可以將寫操作中存入的事件恢覆成為Action類結構了。
好了,現在我們稍微認真點做個詳細的reader示範。回到我們的POS例子:如果我們調用以下寫端指令:
posref ! POSMessage(1022, LogOn("1001", "123"))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, LogSales("0", "0", apple.code, 1,820))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, LogSales("0", "0", pineapple.code, 2, 1300))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, Subtotal(0))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, Discount(DISCTYPE.duplicated,false,"001",5))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, LogSales("0", "0", banana.code, 1, 300))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, LogSales("0", "0", grape.code, 3, 1050))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, Subtotal(1))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, Discount(DISCTYPE.duplicated,true,"001",10))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, LogSales("0", "0", orage.code, 10, 350))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, Discount(DISCTYPE.duplicated,false,"001",10))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, Subtotal(0))
scala.io.StdIn.readLine()
posref ! POSMessage(1022, Suspend)
scala.io.StdIn.readLine()
前面的博文里一再強調:CQRS的寫端應簡單直接地將指令存寫入資料庫,避免任何複雜的控製程序,儘量在讀端讀出指令、模擬處理過程中的狀態轉變處理,還原指令預期應產生的結果,在這個示範里就是正式的交易數據了。實際上我們在寫端已經做了些狀態轉變處理過程:就在updataState函數里:
def updateState(evt: Action, lastSeqNr: BigInt = 0)(implicit nodeAddress: NodeAddress, persistenceId: PID, state: VchStates, items: VchItems, curItem: TxnItem): (VchStates, VchItems) = {
val (vs, vi) = updateStateImpl(evt, lastSeqNr)
log.step(s"${nodeAddress.address}-${persistenceId.id} run updateState($evt, $lastSeqNr) with results state[$vs], txns[$vi].")
(vs, vi)
}
def updateStateImpl(evt: Action, lastSeqNr: BigInt = 0)(implicit state: VchStates, items: VchItems, curItem: TxnItem): (VchStates, VchItems) = evt match {
case LogOned(csr) => (state.copy(seq = state.seq + 1, opr = csr, jseq = lastSeqNr), items)
case LogOffed => (state.copy(seq = state.seq + 1, opr = ""), items)
case RefundOned => (state.copy(seq = state.seq + 1, refd = true), items)
case RefundOffed => (state.copy(seq = state.seq + 1, refd = false), items)
case VoidOned => (state.copy(seq = state.seq + 1, void = true), items)
case VoidOffed => (state.copy(seq = state.seq + 1, void = false), items)
case SuperOned(suser) => (state.copy(seq = state.seq + 1, su = suser), items)
case SuperOffed => (state.copy(seq = state.seq + 1, su = ""), items)
case MemberOned(num) => (state.copy(seq = state.seq + 1, mbr = num), items)
case MemberOffed => (state.copy(seq = state.seq + 1, mbr = ""), items)
case SalesLogged(_,_,_,_,_) => (state.copy(
seq = state.seq + 1)
, items.addItem(curItem))
case Subtotaled(_) => (state.copy(
seq = state.seq + 1)
, items.addItem(curItem))
case Discounted(_,_,_,_) => (state.copy(
seq = state.seq + 1)
, items.addItem(curItem))
case PaymentMade(_,_,_) =>
val due = if (items.totalSales > 0) items.totalSales - items.totalPaid else items.totalSales + items.totalPaid
val bal = if (items.totalSales > 0) due - curItem.amount else due + curItem.amount
(state.copy(
seq = state.seq + 1,
due = (if ((curItem.amount.abs + items.totalPaid.abs) >= items.totalSales.abs) false else true)
)
,items.addItem(curItem.copy(
salestype = SALESTYPE.ttl,
price = due,
amount = curItem.amount,
dscamt = bal
)))
case VoucherNumed(_, tnum) =>
val vi = items.copy(txnitems = items.txnitems.map { it => it.copy(num = tnum) })
(state.copy(seq = state.seq + 1, num = tnum), vi)
case EndVoucher(vnum) => (state.nextVoucher.copy(jseq = lastSeqNr + 1), VchItems())
case _ => (state, items)
}
在寫端的這個模擬處理結果是為了實時向客戶端反饋它發出指令後產生的結果,如:
case cmd @ LogSales(acct,sdpt,scode,sqty,sprice) if cmdFilter(cmd,sender()) => {
log.step(s"${nodeAddress.address}-${persistenceId} running LogSales($acct,$sdpt,$scode,$sqty,$sprice) with state[$vchState],txns[$vchItems] ...")
var pqty = sqty
if (vchState.void || vchState.refd) pqty = -sqty
log.step(s"${nodeAddress.address}-${persistenceId} run LogSales($acct,$sdpt,$scode,$sqty,$sprice).")
persistEvent(SalesLogged(acct,sdpt,scode,sqty,sprice)) { evt =>
curTxnItem = buildTxnItem(evt).copy(
qty =pqty,
amount = pqty * sprice
)
val sts = updateState(evt)
vchState = sts._1
vchItems = sts._2
if (vchState.void) {
val (lstTxns, found) = vchItems.txnitems.foldLeft((List[TxnItem](), false)) { case (b, ti) =>
if (b._2)
(ti :: b._1, true)
else if (ti.txntype == TXNTYPE.sales && ti.salestype == SALESTYPE.itm && ti.acct == acct &&
ti.dpt == sdpt && ti.code == scode && ti.qty == sqty && ti.price == sprice)
(ti.copy(txntype = TXNTYPE.voided) :: b._1, true)
else (ti :: b._1, false)
}
vchItems = vchItems.copy(txnitems = lstTxns.reverse)
sender() ! POSResponse(STATUS.OK, s"終端-$persistenceId: 成功沖銷商品銷售。", vchState, List(vchItems.topTxnItem))
} else {
sender() ! POSResponse(STATUS.OK, s"終端-$persistenceId: 商品銷售操作成功。", vchState, List(vchItems.topTxnItem))
}
}
}
上面這個POSResponse類實時把當前交易狀態返回給客戶端。而這個vchItems.topTxnItem正是一條正式的交易記錄,也就是我們需要在讀端還原的數據。所以,這個updateState在讀端還是需要調用的。先看看整體reader示範的程式結構:
object ReaderDemo extends App {
implicit val system = ActorSystem("reader")
implicit val ec = system.dispatcher
implicit val mat = ActorMaterializer()
val cluster = Cluster(system)
implicit val nodeAddress: NodeAddress = NodeAddress(cluster.selfAddress.toString)
States.setShowSteps(true)
Reader.readActions(0, Long.MaxValue,"1022")
scala.io.StdIn.readLine()
mat.shutdown()
system.terminate()
}
object Reader extends LogSupport {
def readActions(startSeq: Long, endSeq: Long, persistenceId: String)(implicit sys: ActorSystem, ec: ExecutionContextExecutor, mat: ActorMaterializer, nodeAddress: NodeAddress) = {
implicit var vchState = VchStates()
implicit var vchItems = VchItems()
implicit var curTxnItem = TxnItem()
implicit val pid = PID(persistenceId)
// obtain read journal by plugin id
val readJournal =
PersistenceQuery(sys).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
readJournal.currentEventsByPersistenceId(persistenceId, startSeq, endSeq)
// materialize stream, consuming events
val futureActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }
futureActions.onComplete {
case Success(txns) =>
buildVoucher(txns)
//vchItems.txnitems里就是恢復的正式交易記錄。可以在這裡把它們寫入到業務資料庫
println(vchItems.txnitems.reverse)
case Failure(excpt) =>
}
def buildVoucher(actions: List[Any])= {
actions.reverse.foreach { txn =>
txn match {
case EndVoucher(_) =>
vchItems.txnitems.foreach(println)
case ti@_ =>
curTxnItem = buildTxnItem(ti.asInstanceOf[Action])
val sts = updateState(ti.asInstanceOf[Action],0)
vchState = sts._1
vchItems = sts._2
}
}
}
}
}
順便提一下:這個updateState函數的其中一個主要功能是對集合vchItems.txnitems內的txnitem元素進行更改處理,但我們使用的是函數式編程模式的不可變集合(immutable collections),每次都需要對集合進行遍歷,如fold:
def subTotal: (Int, Int, Int, Int) = txnitems.foldRight((0, 0, 0, 0)) { case (txn, b) =>
if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales)
b.copy(_1 = b._1 + 1, _2 = b._2 + txn.qty, _3 = b._3 + txn.amount, _4 = b._4 + txn.dscamt)
else b
}
突然想到如果在一個很大的集合中只需要更新前面幾條數據怎麼辦?難道還是需要traverse所有元素嗎?在我們的例子里還真有這麼樣的需求:
def groupTotal(level:Int): (Int, Int, Int, Int) = {
val gts = txnitems.foldLeftWhile((0, 0, 0, 0, 0)) { case (b,txn) =>
if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales)
((b._1._1 +1,b._1._2 + txn.qty, b._1._3 + txn.amount, b._1._4 + txn.dscamt, b._1._5),false)
else {
if (txn.salestype == SALESTYPE.sub) {
if (((b._1._5) + 1) >= level)
((b._1._1, b._1._2, b._1._3, b._1._4, b._1._5 + 1), true)
else
((b._1._1, b._1._2, b._1._3, b._1._4, b._1._5 + 1), false)
} else b
}
}
(gts._1,gts._2,gts._3,gts._4)
}
這是一個分層小記函數:在遍歷指令集時遇到另一條小記指令就終止小記運算,調用了foldLeftWhile函數來實現這樣的場景。foldLeftWhile是一個定製的函數:
implicit class FoldLeftWhile[A](trav: Seq[A]) {
def foldLeftWhile[B](z: B)(op: ((B,Boolean), A) => (B, Boolean)): B = {
def go(acc: (B, Boolean), l: Seq[A]): (B, Boolean) = l match {
case h +: t =>
val nacc = op(acc, h)
if (!nacc._2)
go(nacc, t)
else
nacc
case _ => acc
}
go((z, false), trav)._1
}
}
下麵附上本次示範中的源代碼
build.sbt
name := "akka-cluster-reader" version := "0.1" scalaVersion := "2.12.8" libraryDependencies := Seq( "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19", "com.typesafe.akka" %% "akka-persistence" % "2.5.19", "com.typesafe.akka" %% "akka-persistence-query" % "2.5.19", "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.93", "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.93" % Test, "ch.qos.logback" % "logback-classic" % "1.2.3" )
resources/application.conf
akka.actor.warn-about-java-serializer-usage = off akka.log-dead-letters-during-shutdown = off akka.log-dead-letters = off akka.remote.use-passive-connections=off akka { loglevel = INFO actor { provider = "cluster" } remote { log-remote-lifecycle-events = on netty.tcp { hostname = "192.168.11.189" port = 2551 } } cluster { seed-nodes = [ "akka.tcp://[email protected]:2551"] log-info = off sharding { role = "shard" passivate-idle-entity-after = 10 m } } persistence { journal.plugin = "cassandra-journal" snapshot-store.plugin = "cassandra-snapshot-store" } } cassandra-journal { contact-points = ["192.168.11.189"] } cassandra-snapshot-store { contact-points = ["192.168.11.189"] }
message/Messages.scala
package datatech.cloud.pos import java.time.LocalDate import java.time.LocalDateTime import java.time.format.DateTimeFormatter import java.util.Locale import akka.cluster.sharding._ object Messages { val dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.CHINA) sealed trait Command {} case class LogOn(opr: String, passwd: String) extends Command case object LogOff extends Command case class SuperOn(su: String, passwd: String) extends Command case object SuperOff extends Command case class MemberOn(cardnum: String, passwd: String) extends Command case object MemberOff extends Command //remove member status for the voucher case object RefundOn extends Command case object RefundOff extends Command case object VoidOn extends Command case object VoidOff extends Command case object VoidAll extends Command case object Suspend extends Command case class VoucherNum(vnum: Int) extends Command case class LogSales(acct: String, dpt: String, code: String, qty: Int, price: Int) extends Command case class Subtotal(level: Int) extends Command case class Discount(disctype: Int, grouped: Boolean, code: String, percent: Int) extends Command case class Payment(acct: String, num: String, amount: Int) extends Command //settlement 結算支付 // read only command, no update event case class Plu(itemCode: String) extends Command //read only case object GetTxnItems extends Command /* discount type */ object DISCTYPE { val duplicated: Int = 0 val best: Int = 1 val least: Int = 2 val keep: Int = 3 } /* result message returned to client on the wire */ object TXNTYPE { val sales: Int = 0 val refund: Int = 1 val void: Int = 2 val voided: Int = 3 val voidall: Int = 4 val subtotal: Int = 5 val logon: Int = 6 val supon: Int = 7 // super user on/off val suspend: Int = 8 } object SALESTYPE { val itm: Int = 2 val sub: Int = 10 val ttl: Int = 11 val dsc: Int = 12 val crd: Int = 13 } case class TxnItem( txndate: String = LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd")) ,txntime: String = LocalDateTime.now.format(dateTimeFormatter).substring(11) ,opr: String = ""//工號 ,num: Int = 0 //銷售單號 ,seq: Int = 1 //交易序號 ,txntype: Int = TXNTYPE.sales//交易類型 ,salestype: Int = SALESTYPE.itm //銷售類型 ,qty: Int = 1 //交易數量 ,price: Int = 0 //單價(分) ,amount: Int = 0 //碼洋(分) ,disc: Int = 0 //折扣率 (%) ,dscamt: Int = 0 //折扣額:負值 net實洋 = amount + dscamt ,member: String = "" //會員卡號 ,code: String = "" //編號(商品、卡號...) ,acct: String = "" //賬號 ,dpt: String = "" //部類 ) object TxnItem { def apply(vs: VchStates): TxnItem = TxnItem().copy( opr = vs.opr, num = vs.num, seq = vs.seq, member = vs.mbr ) } case class VchStatus( //操作狀態鎖留給前端維護 qty: Int = 1, refund: Boolean = false, void: Boolean = false) case class VchStates( opr: String = "", //收款員 jseq: BigInt = 0, //begin journal sequence for read-side replay num: Int = 0, //當前單號 seq: Int = 0, //當前序號 void: Boolean = false, //取消模式 refd: Boolean = false, //退款模式 due: Boolean = true, //當前餘額 su: String = "", mbr: String = "" ) { def nextVoucher : VchStates = VchStates().copy( opr = this.opr, jseq = this.jseq + 1, num = this.num + 1 ) } object STATUS { val OK: Int = 0 val FAIL: Int = -1 } case class POSResponse (sts: Int, msg: String, voucher: VchStates, txnItems: List[TxnItem]) /* message on the wire (exchanged message) */ val shardName = "POSShard" case class POSMessage(id: Long, cmd: Command) { def shopId = id.toString.head.toString def posId = id.toString } val getPOSId: ShardRegion.ExtractEntityId = { case posCommand: POSMessage => (posCommand.posId,posCommand.cmd) } val getShopId: ShardRegion.ExtractShardId = { case posCommand: POSMessage => posCommand.shopId } case object PassivatePOS //passivate message case object FilteredOut case class DebugMode(debug: Boolean) case class NodeAddress(address: String) case class PID(id: String) }
States.scala
package datatech.cloud.pos import java.time.LocalDate import java.time.LocalDateTime import java.time.format.DateTimeFormatter import Messages._ import sdp.logging._ object Actions { implicit class FoldLeftWhile[A](trav: Seq[A]) { def foldLeftWhile[B](z: B)(op: ((B,Boolean), A) => (B, Boolean)): B = { def go(acc: (B, Boolean), l: Seq[A]): (B, Boolean) = l match { case h +: t => val nacc = op(acc, h) if (!nacc._2) go(nacc, t) else nacc case _ => acc } go((z, false), trav)._1 } } case class ReadActions(startSeq: Int, endSeq: Int, persistenceId: String) sealed trait Action {} case class LogOned(opr: String) extends Action case object LogOffed extends Action case class SuperOned(su: String) extends Action case object SuperOffed extends Action case class MemberOned(cardnum: String) extends Action case object MemberOffed extends Action //remove member status for the voucher case object RefundOned extends Action case object RefundOffed extends Action case object VoidOned extends Action case object VoidOffed extends Action case class SalesLogged(acct: String, dpt: String, code: String, qty: Int, price: Int) extends Action case class Subtotaled(level: Int) extends Action case class Discounted(disctype: Int, grouped: Boolean, code: String, percent: Int) extends Action case class NewVoucher(vnum: Int) extends Action //新單, reminder for read-side to set new vnum case class EndVoucher(vnum: Int) extends Action //單據終結標示 case object VoidVoucher extends Action case object SuspVoucher extends Action case class VoucherNumed(fnum: Int, tnum: Int) extends Action case class PaymentMade(acct: String, num: String, amount: Int) extends Action //settlement 結算支付 } object States extends LogSupport { import Actions._ def setShowSteps(b: Boolean) = log.stepOn = b def buildTxnItem(evt: Action)(implicit vs: VchStates, vi: VchItems): TxnItem = evt match { case LogOned(op) => TxnItem(vs).copy( txntype = TXNTYPE.logon, salestype = SALESTYPE.crd, opr = op, code = op ) case LogOffed => TxnItem(vs).copy( txntype = TXNTYPE.logon, salestype = SALESTYPE.crd ) case SuperOned(su) => TxnItem(vs).copy( txntype = TXNTYPE.supon, salestype = SALESTYPE.crd, code = su ) case SuperOffed => TxnItem(vs).copy( txntype = TXNTYPE.supon, salestype = SALESTYPE.crd ) case MemberOned(cardnum) => TxnItem(vs).copy( txntype = TXNTYPE.sales, salestype = SALESTYPE.crd, member = cardnum ) case MemberOffed => TxnItem(vs).copy( txntype = TXNTYPE.sales, salestype = SALESTYPE.crd ) case RefundOned => TxnItem(vs).copy( txntype = TXNTYPE.refund ) case RefundOffed => TxnItem(vs).copy( txntype = TXNTYPE.refund ) case VoidOned => TxnItem(vs).copy( txntype = TXNTYPE.void ) case VoidOffed => TxnItem(vs).copy( txntype = TXNTYPE.void ) case VoidVoucher => TxnItem(vs).copy( txntype = TXNTYPE.voidall, code = vs.num.toString, acct = vs.num.toString ) case SuspVoucher => TxnItem(vs).copy( txntype = TXNTYPE.suspend, code = vs.num.toString, acct = vs.num.toString ) case Subtotaled(level) => TxnItem(vs).copy( txntype = TXNTYPE.sales, salestype = SALESTYPE.sub ) case Discounted(dt,gp,code,pct) => TxnItem(vs).copy( txntype = TXNTYPE.sales, salestype = SALESTYPE.dsc, acct = code, disc = pct ) case PaymentMade(act,num,amt) => TxnItem(vs).copy( txntype = TXNTYPE.sales, salestype = SALESTYPE.ttl, acct = act, code = num, amount = amt ) case SalesLogged(sacct,sdpt,scode,sqty,sprice) => TxnItem(vs).copy( txntype = TXNTYPE.sales, salestype = SALESTYPE.itm, acct = sacct, dpt = sdpt, code = scode, qty = sqty, price = sprice, amount = sprice * sqty, dscamt = 0 ) case _ => TxnItem(vs) } case class VchItems(txnitems: List[TxnItem] = Nil) { def noSales: Boolean = (txnitems.find(txn => txn.salestype == SALESTYPE.itm)).isEmpty def subTotal: (Int, Int, Int, Int) = txnitems.foldRight((0, 0, 0, 0)) { case (txn, b) => if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales) b.copy(_1 = b._1 + 1, _2 = b._2 + txn.qty, _3 = b._3 + txn.amount, _4 = b._4 + txn.dscamt) else b } def groupTotal(level:Int): (Int, Int, Int, Int) = { val gts = txnitems.foldLeftWhile((0, 0, 0, 0, 0)) { case (b,txn) => if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales) ((b._1._1 +1,b._1._2 + txn.qty, b._1._3 + txn.amount, b._1._4 + txn.dscamt, b._1._5),false) else { if (txn.salestype == SALESTYPE.sub) { if (((b._1._5) + 1) >= level) ((b._1._1, b._1._2, b._1._3, b._1._4, b._1._5 + 1), true) else ((b._1._1, b._1._2, b._1._3, b._1._4, b._1._5 + 1), false) } else b } } (gts._1,gts._2,gts._3,gts._4) } def updateDisc(dt: Int, grouped: Boolean, disc: Int): (List[TxnItem],(Int,Int,Int,Int)) = { //(salestype,(cnt,qty,amt,dsc),hassub,list) val accu = txnitems.foldLeft((-1, (0,0,0,0), false, List[TxnItem]())) { case (b, txn) => var discAmt = 0 if ((b._1) < 0) { if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales) { if (txn.dscamt == 0) ((txn.salestype, ( (b._2._1) + 1, (b._2._2) + txn.qty, (b._2._3) + txn.amount, (b._2._4) - (txn.amount * disc / 100) ), false, txn.copy( dscamt = - (txn.amount * disc / 100)) :: (b._4))) else { dt match { case DISCTYPE.duplicated => if (txn.dscamt != 0) { ((txn.salestype, ( (b._2._1) + 1, (b._2._2) + txn.qty, (b._2._3) + txn.amount, (b._2._4) - (txn.amount + txn.dscamt) * disc / 100 ), false, txn.copy( dscamt = -(txn.amount + txn.dscamt) * disc / 100) :: (b._4) )) } else { ((txn.salestype, ( (b._2._1) + 1, (b._2._2) + txn.qty, (b._2._3) + txn.amount, (b._2._4) - txn.amount * disc / 100 ), false, txn.copy( dscamt = -txn.amount * disc / 100) :: (b._4) )) } case DISCTYPE.keep => ((txn.salestype, ( (b._2._1) + 1, (b._2._2) + txn.qty, (b._2._3) + txn.amount, (b._2._4) + txn.dscamt), false, txn :: (b._4))) case DISCTYPE.best => discAmt = -(txn.amount * disc / 100) if (discAmt < txn.dscamt) ((txn.salestype, ( (b._2._1) + 1, (b._2._2) + txn.qty, (b._2._3) + txn.amount, (b._2._4) + discAmt), false, txn.copy( dscamt = discAmt ) :: (b._4))) else ((txn.salestype, ( (b._2._1) + 1, (b._2._2) + txn.qty, (b._2._3) + txn.amount, (b._2._4) + txn.dscamt), false, txn :: (b._4))) } } } else ((b._1,b._2,b._3,txn :: (b._4))) } else { if ((b._3)) (((b._1), (b._2), true, txn :: (b._4))) else { if (txn.salestype == SALESTYPE.sub) { if (grouped) (((b._1), (b._2), true, txn :: (b._4))) else (((b._1), (b._2), false, txn :: (b._4))) } else { if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales) { dt match { case DISCTYPE.duplicated => if (txn.dscamt != 0) { ((txn.salestype, ( (b._2._1) + 1, (b._2._2) + txn.qty, (b._2._3) + txn.amount, (b._2._4) - (txn.amount + txn.dscamt) * disc / 100), false, txn.copy( dscamt = -(txn.amount + txn.dscamt) * disc / 100) :: (b._4) )) } else { ((txn.salestype, ( (b._2._1) + 1, (b._2._2) + txn.qty, (b._2._3) + txn.amount, (b._2._4) - txn.amount * disc / 100), false, txn.copy( dscamt = -(txn.amount * disc / 100)) :: (b._4) )) } case DISCTYPE.keep => ((txn.salestype, ( (b._2._1) + 1, (b._2._2) + txn.qty, (b._2._3) + txn.amount, (b._2._4) + txn.dscamt), false, txn :: (b._4))) case DISCTYPE.best => discAmt = -(txn.amount * disc / 100) if (discAmt < txn.dscamt) ((txn.salestype, ( (b._2._1) + 1, (b._2._2) + txn.qty, (b._2._3) + txn.amount, (b._2._4) + discAmt), false, txn.copy( dscamt = discAmt ) :: (b._4))) else ((txn.salestype, ( (b._2._1) + 1, (b._2._2) + txn.qty, (b._2._3) + txn.amount, (b._2._4) + txn.dscamt), false, txn :: (b._4))) } } else ((b._1, b._2, b._3, txn :: (b._4))) } } } } (accu._4.reverse,accu._2) } def totalSales: Int = txnitems.foldRight(0) { case (txn, b) => if (txn.salestype == SALESTYPE.itm) (txn.amount + txn.dscamt) + b else b /* val amt: Int = txn.salestype match { case (SALESTYPE.plu | SALESTYPE.cat | SALESTYPE.brd | SALESTYPE.ra) => txn.amount + txn.dscamt case _ => 0 } amt + b */ } def totalPaid: Int = txnitems.foldRight(0) { case (txn, b) => if (txn.txntype == TXNTYPE.sales && txn.salestype == SALESTYPE.ttl) txn.amount + b else b } def addItem(item: TxnItem): VchItems = VchItems((item :: txnitems)) //.reverse) } def LastSecOfDate(ldate: LocalDate): LocalDateTime = { val dtStr = ldate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + " 23:59:59" LocalDateTime.parse(dtStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) } def dateStr(dt: LocalDate): String = dt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) def updateState(evt: Action, lastSeqNr: BigInt = 0)(implicit nodeAddress: NodeAddress, persistenceId: PID, state: VchStates, items: VchItems, curItem: TxnItem): (VchStates, VchItems) = { val (vs, vi) = updateStateImpl(evt, lastSeqNr) log.step(s"${nodeAddress.address}-${persistenceId.id} run updateState($evt, $lastSeqNr) with results state[$vs], txns[$vi].") (vs, vi) } def updateStateImpl(evt: Action, lastSeqNr: BigInt = 0)(implicit state: VchStates, items: VchItems, curItem: TxnItem): (VchStates, VchItems) = evt match { case LogOned(csr) => (state.copy(seq = state.seq + 1, opr = csr, jseq = lastSeqNr), items) case LogOffed => (state.copy(seq = state.seq + 1, opr = ""), items) case RefundOned => (state.copy(seq = state.seq + 1, refd = true), items) case RefundOffed => (state.copy(seq = state.seq + 1, refd = false), items) case VoidOned => (state.copy(seq = state.seq + 1, void = true), items) case VoidOffed => (state.copy(seq = state.seq + 1, void = false), items) case SuperOned(suser) => (state.copy(seq = state.seq + 1, su = suser), items) case SuperOffed => (state.copy(seq = state.seq + 1, su = ""), items) case MemberOned(num) => (state.copy(seq = state.seq + 1, mbr = num), items) case MemberOffed => (state.copy(seq = state.seq + 1, mbr = ""), items) case SalesLogged(_,_,_,_,_) => (state.copy( seq = state.seq + 1) , items.addItem(curItem)) case Subtotaled(level) => var subs = (0,0,0,0) if (level == 0) subs = items.subTotal else subs = items.groupTotal(level) val (cnt, tqty, tamt, tdsc) = subs val subttlItem = TxnItem(state).copy( txntype = TXNTYPE.sales, salestype = SALESTYPE.sub, qty = tqty, amount = tamt, dscamt = tdsc, price = cnt ) (state.copy( seq = state.seq + 1) , items.addItem(subttlItem)) case Discounted(dt,gp,code,pct) => val (lstItems, accum) = items.updateDisc(dt,gp,pct) val discItem = TxnItem(state).copy( txntype = TXNTYPE.sales, salestype = SALESTYPE.dsc, acct = code, disc = pct, price = accum._1, qty = accum._2, amount = accum._3, dscamt = accum._4 ) (state.copy( seq = state.seq + 1) , items.copy(txnitems = lstItems).addItem(discItem)) case PaymentMade(_,_,_) => val due = if (items.totalSales > 0) items.totalSales - items.totalPaid else items.totalSales + items.totalPaid val bal = if (items.totalSales > 0) due - curItem.amount else due + curItem.amount (state.copy( seq = state.seq + 1, due = (if ((curItem.amount.abs + items.totalPaid.abs) >= items.totalSales.abs) false else true) ) ,items.addItem(curItem.copy( salestype = SALESTYPE.ttl, price = due, amount = curItem.amount, dscamt = bal ))) case VoucherNumed(_, tnum) => val vi = items.copy(txnitems = items.txnitems.map { it => it.copy(num = tnum) }) (state.copy(seq = state.seq + 1, num = tnum), vi) case EndVoucher(vnum) => (state.nextVoucher.copy(jseq = lastSeqNr + 1), VchItems()) case _ => (state, items) } }
Reader.scala
package datatech.cloud.pos import akka.actor._ import akka.stream.scaladsl._ import scala.util._ import akka._ import akka.persistence.query._ import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal import scala.concurrent._ import akka.stream._ import sdp.logging._ import Actions._ import States._ import Messages._ import akka.cluster._ class Reader extends Actor with LogSupport { import Reader._ val cluster = Cluster(context.system) implicit val nodeAddress: NodeAddress = NodeAddress(cluster.selfAddress.toString) override def receive: Receive = { case ReadActions(si, ei, pid) => readActions(si,ei, pid)(context.system, context.dispatcher, ActorMaterializer(),nodeAddress) } } object Reader extends LogSupport { def readActions(startSeq: Long, endSeq: Long, persistenceId: String)(implicit sys: ActorSystem, ec: ExecutionContextExecutor, mat: ActorMaterializer, nodeAddress: NodeAddress) = { implicit var vchState = VchStates() implicit var vchItems = VchItems() implicit var curTxnItem = TxnItem() implicit val pid = PID(persistenceId) // obtain read journal by plugin id val readJournal = PersistenceQuery(sys).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) // issue query to journal val source: Source[EventEnvelope, NotUsed] = readJournal.currentEventsByPersistenceId(persistenceId, startSeq, endSeq) // materialize stream, consuming events val futureActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny } futureActions.onComplete { case Success(txns) => buildVoucher(txns) case Failure(excpt) => } def buildVoucher(actions: List[Any])= { actions.reverse.foreach { txn => txn match { case EndVoucher(_) => vchItems.txnitems.foreach(println) case ti@_ => curTxnItem = buildTxnItem(ti.asInstanceOf[Action]) val sts = updateState(ti.asInstanceOf[Action],0) vchState = sts._1 vchItems = sts._2 } } } } }
ReaderDemo.scala
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.cluster._ import scala.concurrent._ import scala.util._ import akka._ import akka.persistence.query._ import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal import datatech.cloud.pos._ import Actions._ import Messages._ import Reader._ import sdp.logging.LogSupport object ReaderDemo extends App { implicit val system = ActorSystem("reader") implicit val ec = system.dispatcher implicit val mat = ActorMaterializer() val cluster = Cluster(system) implicit val nodeAddress: NodeAddress = NodeAddress(cluster.selfAddress.toString) States.setShowSteps(true) Reader.readActions(0, Long.MaxValue,"1022") scala.io.StdIn.readLine() mat.shutdown() system.terminate() }