Akka(8): 分散式運算:Remoting-遠程查找式

Akka是一種消息驅動運算模式,它實現跨JVM程式運算的方式是通過能跨JVM的消息系統來調動分佈在不同JVM上ActorSystem中的Actor進行運算,前題是Akka的地址系統可以支持跨JVM定位。Akka的消息系統最高境界可以實現所謂的Actor位置透明化,這樣在Akka編程中就無須關註Act ...

  Akka是一種消息驅動運算模式,它實現跨JVM程式運算的方式是通過能跨JVM的消息系統來調動分佈在不同JVM上ActorSystem中的Actor進行運算,前題是Akka的地址系統可以支持跨JVM定位。Akka的消息系統最高境界可以實現所謂的Actor位置透明化,這樣在Akka編程中就無須關註Actor具體在哪個JVM上運行,分散式Actor編程從方式上跟普通Actor編程就不會有什麼區別了。Akka的Remoting是一種點對點的跨JVM消息通道,讓一個JVM上ActorSystem中的某個Actor可以連接另一個JVM上ActorSystem中的另一個Actor。兩個JVM上的ActorSystem之間只需具備TCP網路連接功能就可以實現Akka Remoting了。Akka-Remoting還沒有實現完全的位置透明化,因為用戶還必須在代碼里或者配置文件里指明目標Actor的具體地址。




Akka-Remoting的主要應用應該是把一些任務部署到遠程機上去運算。發起方(Local JVM)在這裡面的主要作用是任務分配,有點像Akka-Router。我們可以用下麵的例子來示範:模擬一個計算器,可以進行連續的加減乘除,保留累計結果。我們會把這個計算器部署到遠程機上,然後從本機與之溝通分配運算任務及獲取運算結果。這個計算器就是個簡單的Actor:

import akka.actor._

object Calculator {
  sealed trait MathOps
  case class Num(dnum: Double) extends MathOps
  case class Add(dnum: Double) extends MathOps
  case class Sub(dnum: Double) extends MathOps
  case class Mul(dnum: Double) extends MathOps
  case class Div(dnum: Double) extends MathOps

  sealed trait CalcOps
  case object Clear extends CalcOps
  case object GetResult extends CalcOps

class Calcultor extends Actor {
  import Calculator._
  var result: Double = 0.0   //internal state

  override def receive: Receive = {
    case Num(d) => result = d
    case Add(d) => result += d
    case Sub(d) => result -= d
    case Mul(d) => result *= d
    case Div(d) => result = result / d

    case Clear => result = 0.0
    case GetResult =>
      sender() ! s"Result of calculation is: $result"



下麵我們會在一個遠程機上部署這個Calculator Actor。 先看看這個示範的項目結構:remoteLookup/build.sbt

lazy val commonSettings = seq (
  name := "RemoteLookupDemo",
  version := "1.0",
  scalaVersion := "2.11.8",
  libraryDependencies := Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.5.2",
    "com.typesafe.akka" %% "akka-remote" % "2.5.2"

lazy val local = (project in file("."))
       name := "localSystem"

lazy val messages = (project in file("messages"))
      name := "commands"

lazy val remote = (project in file("remote"))
      name := "remoteSystem"


package remoteLookup.messages

object Messages {
  sealed trait MathOps
  case class Num(dnum: Double) extends MathOps
  case class Add(dnum: Double) extends MathOps
  case class Sub(dnum: Double) extends MathOps
  case class Mul(dnum: Double) extends MathOps
  case class Div(dnum: Double) extends MathOps

  sealed trait CalcOps
  case object Clear extends CalcOps
  case object GetResult extends CalcOps


我們看到:這個文件是把上面的Calculator支持的消息拆了出來。這是因為Calculator Actor會在另一個JVM remote上部署,而我們會從local JVM里向Calculator發送操作消息,所以Messages必須是local和remote共用的。這個要求我們通過dependOn(messages)實現了。現在Calculator是在remote項目里定義的:remote/Calculator.scala

package remoteLookup.remote

import akka.actor._
import remoteLookup.messages.Messages._

object CalcProps {
  def props = Props(new Calcultor)

class Calcultor extends Actor with ActorLogging {

  var result: Double = 0.0   //internal state

  override def receive: Receive = {
    case Num(d) => result = d
    case Add(d) => result += d
    case Sub(d) => result -= d
    case Mul(d) => result *= d
    case Div(d) =>
      val _ = result.toInt / d.toInt   //yield ArithmeticException
      result /= d
    case Clear => result = 0.0
    case GetResult =>
      sender() ! s"Result of calculation is: $result"

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting calculator: ${reason.getMessage}")
    super.preRestart(reason, message)



package remoteLookup.remote
import akka.actor._
import akka.pattern._
import remoteLookup.messages.Messages._

import scala.concurrent.duration._

class SupervisorActor extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: ArithmeticException => SupervisorStrategy.Resume

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){

  val calcActor = context.actorOf(CalcProps.props,"calculator")

  override def receive: Receive = {
    case msg@ _ => calcActor.forward(msg)


object CalculatorRunner extends App {

  val remoteSystem = ActorSystem("remoteSystem")
  val calcActor = remoteSystem.actorOf(Props[SupervisorActor],"supervisorActor")

  import remoteSystem.dispatcher

  calcActor ! Clear
  calcActor ! Num(13.0)
  calcActor ! Mul(1.5)

  implicit val timeout = akka.util.Timeout(1 second)

  ((calcActor ? GetResult).mapTo[String]) foreach println

  calcActor ! Div(0.0)
  calcActor ! Div(1.5)
  calcActor ! Add(100.0)
  ((calcActor ? GetResult).mapTo[String]) foreach println




Result of calculation is: 19.5

Result of calculation is: 113.0
[WARN] [06/20/2017 19:28:10.720] [remoteSystem-akka.actor.default-dispatcher-4] [akka://remoteSystem/user/parentActor/calculator] / by zero



akka {
  actor {
    provider = remote 
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = ""
      port = 2552
    log-sent-messages = on
    log-received-messages = on

上面這段的意思是:所有向外公開Actor的地址首碼為:akka.tcp://[email protected]:2552/user/???

那麼Calculator的完整地址path應該就是:akka.tcp://[email protected]:2552/user/supervisorActor/calculator

Akka-Remoting提供了兩種遠程查找方式:actorSelection.resolveOne方法和Identify消息確認。無論如何,local都需要進行Remoting配置: local/src/main/resources/application.conf

akka {
  actor {
    provider = remote
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = ""
      port = 0


   * Resolve the [[ActorRef]] matching this selection.
   * The result is returned as a Future that is completed with the [[ActorRef]]
   * if such an actor exists. It is completed with failure [[ActorNotFound]] if
   * no such actor exists or the identification didn't complete within the
   * supplied `timeout`.
   * Under the hood it talks to the actor to verify its existence and acquire its
   * [[ActorRef]].
  def resolveOne()(implicit timeout: Timeout): Future[ActorRef] = {
    implicit val ec = ExecutionContexts.sameThreadExecutionContext
    val p = Promise[ActorRef]()
    this.ask(Identify(None)) onComplete {
      case Success(ActorIdentity(_, Some(ref))) ⇒ p.success(ref)
      case _                                    ⇒ p.failure(ActorNotFound(this))


import akka.actor._
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern._
import remoteLookup.messages.Messages._

object LocalSelectionDemo extends App {

  val localSystem = ActorSystem("localSystem")
  import localSystem.dispatcher

  val path = "akka.tcp://[email protected]:2552/user/supervisorActor/calculator"

      implicit val timeout = Timeout(5 seconds)
      for (calcActor : ActorRef <- localSystem.actorSelection(path).resolveOne()) {

        calcActor ! Clear
        calcActor ! Num(13.0)
        calcActor ! Mul(1.5)
        ((calcActor ? GetResult).mapTo[String]) foreach println

        calcActor ! Div(0.0)
        calcActor ! Div(1.5)
        calcActor ! Add(100.0)
        ((calcActor ? GetResult).mapTo[String]) foreach println





package remoteLookup.remote
import akka.actor._
import akka.pattern._
import remoteLookup.messages.Messages._

import scala.concurrent.duration._

class SupervisorActor extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: ArithmeticException => SupervisorStrategy.Resume

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){

  val calcActor = context.actorOf(CalcProps.props,"calculator")

  override def receive: Receive = {
    case msg@ _ => calcActor.forward(msg)


object CalculatorRunner extends App {

  val remoteSystem = ActorSystem("remoteSystem")
  val calcActor = remoteSystem.actorOf(Props[SupervisorActor],"supervisorActor")
  import remoteSystem.dispatcher

  calcActor ! Clear
  calcActor ! Num(13.0)
  calcActor ! Mul(1.5)

  implicit val timeout = akka.util.Timeout(1 second)

  ((calcActor ? GetResult).mapTo[String]) foreach println

  calcActor ! Div(0.0)
  calcActor ! Div(1.5)
  calcActor ! Add(100.0)
  ((calcActor ? GetResult).mapTo[String]) foreach println




INFO] [06/20/2017 21:24:37.955] [main] [akka.remote.Remoting] Starting remoting
[INFO] [06/20/2017 21:24:38.091] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552]
[INFO] [06/20/2017 21:24:38.092] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552]


用sbt run 運行local:

Result of calculation is: 19.5
Result of calculation is: 113.0



[INFO] [06/20/2017 21:24:37.955] [main] [akka.remote.Remoting] Starting remoting
[INFO] [06/20/2017 21:24:38.091] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552]
[INFO] [06/20/2017 21:24:38.092] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552]
[WARN] [06/20/2017 21:27:06.330] [remoteSystem-akka.actor.default-dispatcher-4] [akka://remoteSystem/user/supervisorActor/calculator] / by zero
[ERROR] [06/20/2017 21:27:34.176] [remoteSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://[email protected]:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FlocalSystem%40127.0.0.1%3A60601-0/endpointWriter] AssociationError [akka.tcp://[email protected]:2552] <- [akka.tcp://[email protected]:60601]: Error [Shut down address: akka.tcp://[email protected]:60601] [
akka.remote.ShutDownAssociation: Shut down address: akka.tcp://[email protected]:60601
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down.


 * A message all Actors will understand, that when processed will reply with
 * [[akka.actor.ActorIdentity]] containing the `ActorRef`. The `messageId`
 * is returned in the `ActorIdentity` message as `correlationId`.
final case class Identify(messageId: Any) extends AutoReceivedMessage with NotInfluenceReceiveTimeout

 * Reply to [[akka.actor.Identify]]. Contains
 * `Some(ref)` with the `ActorRef` of the actor replying to the request or
 * `None` if no actor matched the request.
 * The `correlationId` is taken from the `messageId` in
 * the `Identify` message.
final case class ActorIdentity(correlationId: Any, ref: Option[ActorRef]) {
  if (ref.isDefined && ref.get == null) {
    throw new IllegalArgumentException("ActorIdentity created with ref = Some(null) is not allowed, " +
      "this could happen when serializing with Scala 2.12 and deserializing with Scala 2.11 which is not supported.")

   * Java API: `ActorRef` of the actor replying to the request or
   * null if no actor matched the request.
  @deprecated("Use getActorRef instead", "2.5.0")
  def getRef: ActorRef = ref.orNull

   * Java API: `ActorRef` of the actor replying to the request or
   * not defined if no actor matched the request.
  def getActorRef: Optional[ActorRef] = {
    import scala.compat.java8.OptionConverters._

如果拿上面的例子,我們就會向遠程機上的Calculator地址發送Identify(path),而Calculator返回ActorIdentity消息,參數包括correlationId = path, ref = Calculator的ActorRef。 下麵是使用示範代碼:

object LocalIdentifyDemo extends App {

  class RemoteCalc extends Actor with ActorLogging {

    val path = "akka.tcp://[email protected]:2552/user/supervisorActor/calculator"

    context.actorSelection(path) ! Identify(path)  //send req for ActorRef

    import context.dispatcher
    implicit val timeout = Timeout(5 seconds)

    override def receive: Receive = {
      case ActorIdentity(p,someRef) if p.equals(path) => 
        someRef foreach { calcActor =>

          calcActor ! Clear
          calcActor ! Num(13.0)
          calcActor ! Mul(1.5)
          ((calcActor ? GetResult).mapTo[String]) foreach println

          calcActor ! Div(0.0)
          calcActor ! Div(1.5)
          calcActor ! Add(100.0)
          ((calcActor ? GetResult).mapTo[String]) foreach println



  val localSystem = ActorSystem("localSystem")
  val localActor = localSystem.actorOf(Props[RemoteCalc],"localActor")



Identify消息確認機制是一種Actor溝通模式,所以我們需要構建一個RemoteCalc Actor,把程式包嵌在這個Actor裡面。當receive收到確認消息ActorIdentity後獲取ActorRef運算程式。




lazy val commonSettings = seq (
  name := "RemoteLookupDemo",
  version := "1.0",
  scalaVersion := "2.11.8",
  libraryDependencies := Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.5.2",
    "com.typesafe.akka" %% "akka-remote" % "2.5.2"

lazy val local = (project in file("."))
       name := "remoteLookupDemo"

lazy val messages = (project in file("messages"))
      name := "commands"

lazy val remote = (project in file("remote"))
      name := "remoteSystem"


package remoteLookup.messages

object Messages {
  sealed trait MathOps
  case class Num(dnum: Double) extends MathOps
  case class Add(dnum: Double) extends MathOps
  case class Sub(dnum: Double) extends MathOps
  case class Mul(dnum: Double) extends MathOps
  case class Div(dnum: Double) extends MathOps

  sealed trait CalcOps
  case object Clear extends CalcOps
  case object GetResult extends CalcOps



akka {
  actor {
    provider = remote
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = ""
      port = 2552
    log-sent-messages = on
    log-received-messages = on


package remoteLookup.remote

import akka.actor._
import remoteLookup.messages.Messages._

object CalcProps {
  def props = Props(new Calcultor)

class Calcultor extends Actor with ActorLogging {

  var result: Double = 0.0   //internal state

  override def receive: Receive = {
    case Num(d) => result = d
    case Add(d) => result += d
    case Sub(d) => result -= d
    case Mul(d) => result *= d
    case Div(d) =>
      val _ = result.toInt / d.toInt   //yield ArithmeticException
      result /= d
    case Clear => result = 0.0
    case GetResult =>
      sender() ! s"Result of calculation is: $result"

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting calculator: ${reason.getMessage}")
    super.preRestart(reason, message)


package remoteLookup.remote
import akka.actor._
import akka.pattern._
import remoteLookup.messages.Messages._

import scala.concurrent.duration._

class SupervisorActor extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: ArithmeticException => SupervisorStrategy.Resume

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){

  val calcActor = context.actorOf(CalcProps.props,"calculator")

  override def receive: Receive = {
    case msg@ _ => calcActor.forward(msg)


object CalculatorRunner extends App {

  val remoteSystem = ActorSystem("remoteSystem")
  val calcActor = remoteSystem.actorOf(Props[SupervisorActor],"supervisorActor")
  import remoteSystem.dispatcher

  calcActor ! Clear
  calcActor ! Num(13.0)
  calcActor ! Mul(1.5)

  implicit val timeout = akka.util.Timeout(1 second)

  ((calcActor ? GetResult).mapTo[String]) foreach println

  calcActor ! Div(0.0)
  calcActor ! Div(1.5)
  calcActor ! Add(100.0)
  ((calcActor ? GetResult).mapTo[String]) foreach println



akka {
  actor {
    provider = remote
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = ""
      port = 0


import akka.actor._
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern._
import remoteLookup.messages.Messages._

object LocalSelectionDemo extends App {

  val localSystem = ActorSystem("localSystem")
  import localSystem.dispatcher

  val path = "akka.tcp://[email protected]:2552/user/supervisorActor/calculator"

      implicit val timeout = Timeout(5 seconds)
      for (calcActor : ActorRef <- localSystem.actorSelection(path).resolveOne()) {

        calcActor ! Clear
        calcActor ! Num(13.0)
        calcActor ! Mul(1.5)
        ((calcActor ? GetResult).mapTo[String]) foreach println

        calcActor ! Div(0.0)
        calcActor ! Div(1.5)
        calcActor ! Add(100.0)
        ((calcActor ? GetResult).mapTo[String]) foreach println




object LocalIdentifyDemo extends App {

  class RemoteCalc extends Actor with ActorLogging {

    val path = "akka.tcp://[email protected]:2552/user/supervisorActor/calculator"

    context.actorSelection(path) ! Identify(path)  //semd req for ActorRef

    import context.dispatcher
    implicit val timeout = Timeout(5 seconds)

    override def receive: Receive = {
      case ActorIdentity(p,someRef) if p.equals(path) =>
        someRef foreach { calcActor =>

          calcActor ! Clear
          calcActor ! Num(13.0)
          calcActor ! Mul(1.5)
          ((calcActor ? GetResult).mapTo[String]) foreach println

          calcActor ! Div(0.0)
          calcActor ! Div(1.5)
          calcActor ! Add(100.0)
          ((calcActor ? GetResult).mapTo[String]) foreach println



  val localSystem = ActorSystem("localSystem")
  val localActor = localSystem.actorOf(Props[RemoteCalc],"localActor")
































  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...