Akka是一種消息驅動運算模式,它實現跨JVM程式運算的方式是通過能跨JVM的消息系統來調動分佈在不同JVM上ActorSystem中的Actor進行運算,前題是Akka的地址系統可以支持跨JVM定位。Akka的消息系統最高境界可以實現所謂的Actor位置透明化,這樣在Akka編程中就無須關註Act ...
Akka是一種消息驅動運算模式,它實現跨JVM程式運算的方式是通過能跨JVM的消息系統來調動分佈在不同JVM上ActorSystem中的Actor進行運算,前題是Akka的地址系統可以支持跨JVM定位。Akka的消息系統最高境界可以實現所謂的Actor位置透明化,這樣在Akka編程中就無須關註Actor具體在哪個JVM上運行,分散式Actor編程從方式上跟普通Actor編程就不會有什麼區別了。Akka的Remoting是一種點對點的跨JVM消息通道,讓一個JVM上ActorSystem中的某個Actor可以連接另一個JVM上ActorSystem中的另一個Actor。兩個JVM上的ActorSystem之間只需具備TCP網路連接功能就可以實現Akka Remoting了。Akka-Remoting還沒有實現完全的位置透明化,因為用戶還必須在代碼里或者配置文件里指明目標Actor的具體地址。
Akka-Remoting提供了兩種Actor之間的溝通方法:
1、遠程查找:通過路徑Path查找在遠程機上已經創建存在的Actor,獲取ActorRef後進行溝通
2、遠程創建:在遠程機上直接創建Actor作為溝通對象
Akka-Remoting的主要應用應該是把一些任務部署到遠程機上去運算。發起方(Local JVM)在這裡面的主要作用是任務分配,有點像Akka-Router。我們可以用下麵的例子來示範:模擬一個計算器,可以進行連續的加減乘除,保留累計結果。我們會把這個計算器部署到遠程機上,然後從本機與之溝通分配運算任務及獲取運算結果。這個計算器就是個簡單的Actor:
import akka.actor._
object Calculator {
sealed trait MathOps
case class Num(dnum: Double) extends MathOps
case class Add(dnum: Double) extends MathOps
case class Sub(dnum: Double) extends MathOps
case class Mul(dnum: Double) extends MathOps
case class Div(dnum: Double) extends MathOps
sealed trait CalcOps
case object Clear extends CalcOps
case object GetResult extends CalcOps
}
class Calcultor extends Actor {
import Calculator._
var result: Double = 0.0 //internal state
override def receive: Receive = {
case Num(d) => result = d
case Add(d) => result += d
case Sub(d) => result -= d
case Mul(d) => result *= d
case Div(d) => result = result / d
case Clear => result = 0.0
case GetResult =>
sender() ! s"Result of calculation is: $result"
}
}
就是一個簡單的Actor實現,跟Remoting沒什麼關係。
下麵我們會在一個遠程機上部署這個Calculator Actor。 先看看這個示範的項目結構:remoteLookup/build.sbt
lazy val commonSettings = seq (
name := "RemoteLookupDemo",
version := "1.0",
scalaVersion := "2.11.8",
libraryDependencies := Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.2",
"com.typesafe.akka" %% "akka-remote" % "2.5.2"
)
)
lazy val local = (project in file("."))
.settings(commonSettings)
.settings(
name := "localSystem"
).aggregate(messages,remote).dependsOn(messages)
lazy val messages = (project in file("messages"))
.settings(commonSettings)
.settings(
name := "commands"
)
lazy val remote = (project in file("remote"))
.settings(commonSettings)
.settings(
name := "remoteSystem"
).aggregate(messages).dependsOn(messages)
在這裡我們分了三個項目:local是主項目,messages和remote是分項目(subprojects)。messages里只有OpsMessages.scala一個源文件:
package remoteLookup.messages
object Messages {
sealed trait MathOps
case class Num(dnum: Double) extends MathOps
case class Add(dnum: Double) extends MathOps
case class Sub(dnum: Double) extends MathOps
case class Mul(dnum: Double) extends MathOps
case class Div(dnum: Double) extends MathOps
sealed trait CalcOps
case object Clear extends CalcOps
case object GetResult extends CalcOps
}
我們看到:這個文件是把上面的Calculator支持的消息拆了出來。這是因為Calculator Actor會在另一個JVM remote上部署,而我們會從local JVM里向Calculator發送操作消息,所以Messages必須是local和remote共用的。這個要求我們通過dependOn(messages)實現了。現在Calculator是在remote項目里定義的:remote/Calculator.scala
package remoteLookup.remote
import akka.actor._
import remoteLookup.messages.Messages._
object CalcProps {
def props = Props(new Calcultor)
}
class Calcultor extends Actor with ActorLogging {
var result: Double = 0.0 //internal state
override def receive: Receive = {
case Num(d) => result = d
case Add(d) => result += d
case Sub(d) => result -= d
case Mul(d) => result *= d
case Div(d) =>
val _ = result.toInt / d.toInt //yield ArithmeticException
result /= d
case Clear => result = 0.0
case GetResult =>
sender() ! s"Result of calculation is: $result"
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
log.info(s"Restarting calculator: ${reason.getMessage}")
super.preRestart(reason, message)
}
}
由於ArithmeticException預設的處理策略SupervisorStrategy是Restart,一旦輸入Div(0.0)時會重啟將result清零。我們可以在remote上加一個Supervisor來把異常處理策略改為Resume。
下麵我們先在remote項目本地對Calculator的功能進行測試:remote/CalculatorRunner.scala
package remoteLookup.remote
import akka.actor._
import akka.pattern._
import remoteLookup.messages.Messages._
import scala.concurrent.duration._
class SupervisorActor extends Actor {
def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
case _: ArithmeticException => SupervisorStrategy.Resume
}
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
decider.orElse(SupervisorStrategy.defaultDecider)
}
val calcActor = context.actorOf(CalcProps.props,"calculator")
override def receive: Receive = {
case msg@ _ => calcActor.forward(msg)
}
}
object CalculatorRunner extends App {
val remoteSystem = ActorSystem("remoteSystem")
val calcActor = remoteSystem.actorOf(Props[SupervisorActor],"supervisorActor")
import remoteSystem.dispatcher
calcActor ! Clear
calcActor ! Num(13.0)
calcActor ! Mul(1.5)
implicit val timeout = akka.util.Timeout(1 second)
((calcActor ? GetResult).mapTo[String]) foreach println
scala.io.StdIn.readLine()
calcActor ! Div(0.0)
calcActor ! Div(1.5)
calcActor ! Add(100.0)
((calcActor ? GetResult).mapTo[String]) foreach println
scala.io.StdIn.readLine()
remoteSystem.terminate()
}
測試運算得出以下結果:
Result of calculation is: 19.5 Result of calculation is: 113.0 [WARN] [06/20/2017 19:28:10.720] [remoteSystem-akka.actor.default-dispatcher-4] [akka://remoteSystem/user/parentActor/calculator] / by zero
supervisorActor實現了它應有的功能。
下麵進行遠程查找示範:首先,remote需要把Calculator向外發佈。這可以通過配置文件設置實現:remote/src/main/resources/application.conf
akka {
actor {
provider = remote
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
log-sent-messages = on
log-received-messages = on
}
}
上面這段的意思是:所有向外公開Actor的地址首碼為:akka.tcp://[email protected]:2552/user/???
那麼Calculator的完整地址path應該就是:akka.tcp://[email protected]:2552/user/supervisorActor/calculator
Akka-Remoting提供了兩種遠程查找方式:actorSelection.resolveOne方法和Identify消息確認。無論如何,local都需要進行Remoting配置: local/src/main/resources/application.conf
akka {
actor {
provider = remote
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
}
port=0的意思是由系統自動選擇任何可用的埠。現在我們完成了Remoting設置,也得到了在遠程機上Calculator的具體地址,應該足夠進行遠程Actor溝通了。我們先用actorSelection.resolveOne示範。resolveOne源代碼如下:
/**
* Resolve the [[ActorRef]] matching this selection.
* The result is returned as a Future that is completed with the [[ActorRef]]
* if such an actor exists. It is completed with failure [[ActorNotFound]] if
* no such actor exists or the identification didn't complete within the
* supplied `timeout`.
*
* Under the hood it talks to the actor to verify its existence and acquire its
* [[ActorRef]].
*/
def resolveOne()(implicit timeout: Timeout): Future[ActorRef] = {
implicit val ec = ExecutionContexts.sameThreadExecutionContext
val p = Promise[ActorRef]()
this.ask(Identify(None)) onComplete {
case Success(ActorIdentity(_, Some(ref))) ⇒ p.success(ref)
case _ ⇒ p.failure(ActorNotFound(this))
}
p.future
}
resolveOne返回Future[ActorRef],我們可以用Future的函數組件(combinator)來操作:localAccessDemo.scala
import akka.actor._
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern._
import remoteLookup.messages.Messages._
object LocalSelectionDemo extends App {
val localSystem = ActorSystem("localSystem")
import localSystem.dispatcher
val path = "akka.tcp://[email protected]:2552/user/supervisorActor/calculator"
implicit val timeout = Timeout(5 seconds)
for (calcActor : ActorRef <- localSystem.actorSelection(path).resolveOne()) {
calcActor ! Clear
calcActor ! Num(13.0)
calcActor ! Mul(1.5)
((calcActor ? GetResult).mapTo[String]) foreach println
calcActor ! Div(0.0)
calcActor ! Div(1.5)
calcActor ! Add(100.0)
((calcActor ? GetResult).mapTo[String]) foreach println
}
scala.io.StdIn.readLine()
localSystem.terminate()
}
因為resolveOne返回的是個Future[x],我們可以用for來對嵌在Future內的x進行操作。現在remoteSystem只需要構建Calculator待用就行了:remote/CalculatorRunner.scala
package remoteLookup.remote
import akka.actor._
import akka.pattern._
import remoteLookup.messages.Messages._
import scala.concurrent.duration._
class SupervisorActor extends Actor {
def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
case _: ArithmeticException => SupervisorStrategy.Resume
}
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
decider.orElse(SupervisorStrategy.defaultDecider)
}
val calcActor = context.actorOf(CalcProps.props,"calculator")
override def receive: Receive = {
case msg@ _ => calcActor.forward(msg)
}
}
object CalculatorRunner extends App {
val remoteSystem = ActorSystem("remoteSystem")
val calcActor = remoteSystem.actorOf(Props[SupervisorActor],"supervisorActor")
/*
import remoteSystem.dispatcher
calcActor ! Clear
calcActor ! Num(13.0)
calcActor ! Mul(1.5)
implicit val timeout = akka.util.Timeout(1 second)
((calcActor ? GetResult).mapTo[String]) foreach println
scala.io.StdIn.readLine()
calcActor ! Div(0.0)
calcActor ! Div(1.5)
calcActor ! Add(100.0)
((calcActor ? GetResult).mapTo[String]) foreach println
*/
scala.io.StdIn.readLine()
remoteSystem.terminate()
}
註意:註銷的操作轉移到了localSelectionDemo里。
先運行remote項目:
INFO] [06/20/2017 21:24:37.955] [main] [akka.remote.Remoting] Starting remoting [INFO] [06/20/2017 21:24:38.091] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552] [INFO] [06/20/2017 21:24:38.092] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552]
remoteSystem開始監視配置的公開地址。
用sbt run 運行local:
Result of calculation is: 19.5 Result of calculation is: 113.0
結果正確。supervisorActor的SupervisorStrategy起到了應有的作用。
remote項目輸出顯示也能證明:
[INFO] [06/20/2017 21:24:37.955] [main] [akka.remote.Remoting] Starting remoting [INFO] [06/20/2017 21:24:38.091] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552] [INFO] [06/20/2017 21:24:38.092] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552] [WARN] [06/20/2017 21:27:06.330] [remoteSystem-akka.actor.default-dispatcher-4] [akka://remoteSystem/user/supervisorActor/calculator] / by zero [ERROR] [06/20/2017 21:27:34.176] [remoteSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://[email protected]:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FlocalSystem%40127.0.0.1%3A60601-0/endpointWriter] AssociationError [akka.tcp://[email protected]:2552] <- [akka.tcp://[email protected]:60601]: Error [Shut down address: akka.tcp://[email protected]:60601] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://[email protected]:60601 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ]
下麵我們試著用Identify消息確認方式來複演上述例子。Akka是如下這樣定義有關Identify消息確認的:
/**
* A message all Actors will understand, that when processed will reply with
* [[akka.actor.ActorIdentity]] containing the `ActorRef`. The `messageId`
* is returned in the `ActorIdentity` message as `correlationId`.
*/
@SerialVersionUID(1L)
final case class Identify(messageId: Any) extends AutoReceivedMessage with NotInfluenceReceiveTimeout
/**
* Reply to [[akka.actor.Identify]]. Contains
* `Some(ref)` with the `ActorRef` of the actor replying to the request or
* `None` if no actor matched the request.
* The `correlationId` is taken from the `messageId` in
* the `Identify` message.
*/
@SerialVersionUID(1L)
final case class ActorIdentity(correlationId: Any, ref: Option[ActorRef]) {
if (ref.isDefined && ref.get == null) {
throw new IllegalArgumentException("ActorIdentity created with ref = Some(null) is not allowed, " +
"this could happen when serializing with Scala 2.12 and deserializing with Scala 2.11 which is not supported.")
}
/**
* Java API: `ActorRef` of the actor replying to the request or
* null if no actor matched the request.
*/
@deprecated("Use getActorRef instead", "2.5.0")
def getRef: ActorRef = ref.orNull
/**
* Java API: `ActorRef` of the actor replying to the request or
* not defined if no actor matched the request.
*/
def getActorRef: Optional[ActorRef] = {
import scala.compat.java8.OptionConverters._
ref.asJava
}
}
如果拿上面的例子,我們就會向遠程機上的Calculator地址發送Identify(path),而Calculator返回ActorIdentity消息,參數包括correlationId = path, ref = Calculator的ActorRef。 下麵是使用示範代碼:
object LocalIdentifyDemo extends App {
class RemoteCalc extends Actor with ActorLogging {
val path = "akka.tcp://[email protected]:2552/user/supervisorActor/calculator"
context.actorSelection(path) ! Identify(path) //send req for ActorRef
import context.dispatcher
implicit val timeout = Timeout(5 seconds)
override def receive: Receive = {
case ActorIdentity(p,someRef) if p.equals(path) =>
someRef foreach { calcActor =>
calcActor ! Clear
calcActor ! Num(13.0)
calcActor ! Mul(1.5)
((calcActor ? GetResult).mapTo[String]) foreach println
calcActor ! Div(0.0)
calcActor ! Div(1.5)
calcActor ! Add(100.0)
((calcActor ? GetResult).mapTo[String]) foreach println
}
}
}
val localSystem = ActorSystem("localSystem")
val localActor = localSystem.actorOf(Props[RemoteCalc],"localActor")
scala.io.StdIn.readLine()
localSystem.terminate()
}
Identify消息確認機制是一種Actor溝通模式,所以我們需要構建一個RemoteCalc Actor,把程式包嵌在這個Actor裡面。當receive收到確認消息ActorIdentity後獲取ActorRef運算程式。
查看運算結果,正確。
下麵是這次示範的完整源代碼:
build.sbt
lazy val commonSettings = seq (
name := "RemoteLookupDemo",
version := "1.0",
scalaVersion := "2.11.8",
libraryDependencies := Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.2",
"com.typesafe.akka" %% "akka-remote" % "2.5.2"
)
)
lazy val local = (project in file("."))
.settings(commonSettings)
.settings(
name := "remoteLookupDemo"
).aggregate(messages,remote).dependsOn(messages)
lazy val messages = (project in file("messages"))
.settings(commonSettings)
.settings(
name := "commands"
)
lazy val remote = (project in file("remote"))
.settings(commonSettings)
.settings(
name := "remoteSystem"
).aggregate(messages).dependsOn(messages)
messages/OpsMessages.scala
package remoteLookup.messages
object Messages {
sealed trait MathOps
case class Num(dnum: Double) extends MathOps
case class Add(dnum: Double) extends MathOps
case class Sub(dnum: Double) extends MathOps
case class Mul(dnum: Double) extends MathOps
case class Div(dnum: Double) extends MathOps
sealed trait CalcOps
case object Clear extends CalcOps
case object GetResult extends CalcOps
}
remote/src/main/resources/application.conf
akka {
actor {
provider = remote
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
log-sent-messages = on
log-received-messages = on
}
}
remote/Calculator.scala
package remoteLookup.remote
import akka.actor._
import remoteLookup.messages.Messages._
object CalcProps {
def props = Props(new Calcultor)
}
class Calcultor extends Actor with ActorLogging {
var result: Double = 0.0 //internal state
override def receive: Receive = {
case Num(d) => result = d
case Add(d) => result += d
case Sub(d) => result -= d
case Mul(d) => result *= d
case Div(d) =>
val _ = result.toInt / d.toInt //yield ArithmeticException
result /= d
case Clear => result = 0.0
case GetResult =>
sender() ! s"Result of calculation is: $result"
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
log.info(s"Restarting calculator: ${reason.getMessage}")
super.preRestart(reason, message)
}
}
remote/CalculatorRunner.scala
package remoteLookup.remote
import akka.actor._
import akka.pattern._
import remoteLookup.messages.Messages._
import scala.concurrent.duration._
class SupervisorActor extends Actor {
def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
case _: ArithmeticException => SupervisorStrategy.Resume
}
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
decider.orElse(SupervisorStrategy.defaultDecider)
}
val calcActor = context.actorOf(CalcProps.props,"calculator")
override def receive: Receive = {
case msg@ _ => calcActor.forward(msg)
}
}
object CalculatorRunner extends App {
val remoteSystem = ActorSystem("remoteSystem")
val calcActor = remoteSystem.actorOf(Props[SupervisorActor],"supervisorActor")
/*
import remoteSystem.dispatcher
calcActor ! Clear
calcActor ! Num(13.0)
calcActor ! Mul(1.5)
implicit val timeout = akka.util.Timeout(1 second)
((calcActor ? GetResult).mapTo[String]) foreach println
scala.io.StdIn.readLine()
calcActor ! Div(0.0)
calcActor ! Div(1.5)
calcActor ! Add(100.0)
((calcActor ? GetResult).mapTo[String]) foreach println
*/
scala.io.StdIn.readLine()
remoteSystem.terminate()
}
local/src/main/resources/application.conf
akka {
actor {
provider = remote
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
}
local/localAccessDemo.scala
import akka.actor._
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern._
import remoteLookup.messages.Messages._
object LocalSelectionDemo extends App {
val localSystem = ActorSystem("localSystem")
import localSystem.dispatcher
val path = "akka.tcp://[email protected]:2552/user/supervisorActor/calculator"
implicit val timeout = Timeout(5 seconds)
for (calcActor : ActorRef <- localSystem.actorSelection(path).resolveOne()) {
calcActor ! Clear
calcActor ! Num(13.0)
calcActor ! Mul(1.5)
((calcActor ? GetResult).mapTo[String]) foreach println
calcActor ! Div(0.0)
calcActor ! Div(1.5)
calcActor ! Add(100.0)
((calcActor ? GetResult).mapTo[String]) foreach println
}
scala.io.StdIn.readLine()
localSystem.terminate()
}
object LocalIdentifyDemo extends App {
class RemoteCalc extends Actor with ActorLogging {
val path = "akka.tcp://[email protected]:2552/user/supervisorActor/calculator"
context.actorSelection(path) ! Identify(path) //semd req for ActorRef
import context.dispatcher
implicit val timeout = Timeout(5 seconds)
override def receive: Receive = {
case ActorIdentity(p,someRef) if p.equals(path) =>
someRef foreach { calcActor =>
calcActor ! Clear
calcActor ! Num(13.0)
calcActor ! Mul(1.5)
((calcActor ? GetResult).mapTo[String]) foreach println
calcActor ! Div(0.0)
calcActor ! Div(1.5)
calcActor ! Add(100.0)
((calcActor ? GetResult).mapTo[String]) foreach println
}
}
}
val localSystem = ActorSystem("localSystem")
val localActor = localSystem.actorOf(Props[RemoteCalc],"localActor")
scala.io.StdIn.readLine()
localSystem.terminate()
}