任何類型的實例作為消息在兩端獨立系統的機器之間進行傳遞時必須經過序列化/反序列化serialize/deserialize處理過程。假設以下場景:在一個網路里有兩台連接的伺服器,它們分別部署了獨立的akka系統。如果我們需要在這兩台伺服器的akka系統之間進行消息交換的話,所有消息都必須經過序列化/ ...
任何類型的實例作為消息在兩端獨立系統的機器之間進行傳遞時必須經過序列化/反序列化serialize/deserialize處理過程。假設以下場景:在一個網路里有兩台連接的伺服器,它們分別部署了獨立的akka系統。如果我們需要在這兩台伺服器的akka系統之間進行消息交換的話,所有消息都必須經過序列化/反序列化處理。akka系統對於用戶自定義消息類型的預設序列化處理是以java-object serialization 方式進行的。我們上次提過:由於java-object-serialization會把一個java-object的類型信息、實例值、它所包含的其它類型描述信息等都寫入序列化的結果里,所以會占據較大空間,傳輸數據的效率相對就低了。protobuf是binary格式的,基本只包括實例值,所以數據傳輸效率較高。下麵我們就介紹如何在akka系統中使用protobuf序列化。在akka中使用自定義序列化方法包括下麵的這些步驟:
1、在.proto文件中對消息類型進行IDL定義
2、用ScalaPB編譯IDL文件並產生scala源代碼。這些源代碼中包括了涉及的消息類型及它們的操作方法
3、在akka程式模塊中import產生的classes,然後直接調用這些類型和方法
4、按akka要求編寫序列化方法
5、在akka的.conf文件里actor.serializers段落中定義akka的預設serializer
下麵的build.sbt文件里描述了程式結構:
lazy val commonSettings = Seq(
name := "AkkaProtobufDemo",
version := "1.0",
scalaVersion := "2.12.6",
)
lazy val local = (project in file("."))
.settings(commonSettings)
.settings(
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-remote" % "2.5.11",
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
),
name := "akka-protobuf-demo"
)
lazy val remote = (project in file("remote"))
.settings(commonSettings)
.settings(
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-remote" % "2.5.11"
),
name := "remote-system"
).dependsOn(local)
PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value
)
local和remote是兩個分開的項目。我們會在這兩個項目里分別部署akka系統。註意依賴項中的scalapb.runtime。PB.targets指明瞭產生源代碼的路徑。我們還需要在project/scalapb.sbt中指定scalaPB插件:
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"
我們首先在.proto文件里定義消息:
syntax = "proto3";
// Brought in from scalapb-runtime
import "scalapb/scalapb.proto";
import "google/protobuf/wrappers.proto";
package learn.proto;
message Added {
int32 nbr1 = 1;
int32 nbr2 = 2;
}
message Subtracted {
int32 nbr1 = 1;
int32 nbr2 = 2;
}
message AddedResult {
int32 nbr1 = 1;
int32 nbr2 = 2;
int32 result = 3;
}
message SubtractedResult {
int32 nbr1 = 1;
int32 nbr2 = 2;
int32 result = 3;
}
現在我們先在remote項目里定義一個Calculator actor:
package akka.protobuf.calculator
import akka.actor._
import com.typesafe.config.ConfigFactory
import learn.proto.messages._
class Calculator extends Actor with ActorLogging {
override def receive: Receive = {
case Added(a,b) =>
log.info("Calculating %d + %d".format(a, b))
sender() ! AddedResult(a,b,a+b)
case Subtracted(a,b) =>
log.info("Calculating %d - %d".format(a, b))
sender() ! SubtractedResult(a,b,a-b)
}
}
object Calculator {
def props = Props(new Calculator)
}
object CalculatorStarter extends App {
val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2552")
.withFallback(ConfigFactory.load())
val calcSystem = ActorSystem("calcSystem",config)
calcSystem.actorOf(Calculator.props,"calculator")
println("press any key to end program ...")
scala.io.StdIn.readLine()
calcSystem.terminate()
}
運行CalculatorStarter產生一個calculator actor: akka.tcp://[email protected]:2552/user/calculator
下麵我們在local項目里從埠2551上部署另一個akka系統,然後調用埠2552上部署akka系統的calculator actor:
package akka.protobuf.calcservice
import akka.actor._
import learn.proto.messages._
import scala.concurrent.duration._
class CalcRunner(path: String) extends Actor with ActorLogging {
sendIdentifyRequest()
def sendIdentifyRequest(): Unit = {
context.actorSelection(path) ! Identify(path)
import context.dispatcher
context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout)
}
def receive = identifying
def identifying : Receive = {
case ActorIdentity(calcPath,Some(calcRef)) if (path.equals(calcPath)) =>
log.info("Remote calculator started!")
context.watch(calcRef)
context.become(calculating(calcRef))
case ActorIdentity(_,None) =>
log.info("Remote calculator not found!")
case ReceiveTimeout =>
sendIdentifyRequest()
case s @ _ =>
log.info(s"Remote calculator not ready. [$s]")
}
def calculating(calculator: ActorRef) : Receive = {
case (op : Added) => calculator ! op
case (op : Subtracted) => calculator ! op
case AddedResult(a,b,r) =>
log.info(s"$a + $b = $r")
case SubtractedResult(a,b,r) =>
log.info(s"$a - $b = $r")
case Terminated(calculator) =>
log.info("Remote calculator terminated, restarting ...")
sendIdentifyRequest()
context.become(identifying)
case ReceiveTimeout => //nothing
}
}
object CalcRunner {
def props(path: String) = Props(new CalcRunner(path))
}
這個CalcRunner是一個actor,在程式里首先通過向remote項目中的calculator-actor傳送Identify消息以取得具體的ActorRef。然後用這個ActorRef與calculator-actor進行交互。這其中Identify是akka預定消息類型,其它消息都是ScalaPB從.proto文件中產生的。下麵是local項目的運算程式:
package akka.protobuf.demo
import akka.actor._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import akka.protobuf.calcservice._
import scala.concurrent.duration._
import scala.util._
import learn.proto.messages._
object Main extends App {
val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2551")
.withFallback(ConfigFactory.load())
val calcSystem = ActorSystem("calcSystem",config)
val calcPath = "akka.tcp://[email protected]:2552/user/calculator"
val calculator = calcSystem.actorOf(CalcRunner.props(calcPath),"calcRunner")
println("Calculator started ...")
import calcSystem.dispatcher
calcSystem.scheduler.schedule(1.second, 1.second) {
if (Random.nextInt(100) % 2 == 0)
calculator ! Added(Random.nextInt(100), Random.nextInt(100))
else
calculator ! Subtracted(Random.nextInt(100), Random.nextInt(100))
}
scala.io.StdIn.readLine()
}
配置文件application.conf:
akka {
actor {
provider = remote
}
remote {
netty.tcp {
hostname = "127.0.0.1"
}
}
}
先運行remote然後local。註意下麵出現的提示:
[akka.serialization.Serialization(akka://calcSystem)] Using the default Java serializer for class [learn.proto.messages.Added] which is not recommended because of performance implications. Use another serializer
下麵是protobuf類型的序列化方法:
package akka.protobuf.serializer
import akka.serialization.SerializerWithStringManifest
import learn.proto.messages._
class ProtobufSerializer extends SerializerWithStringManifest{
def identifier: Int = 101110116
override def manifest(o: AnyRef): String = o.getClass.getName
final val AddedManifest = classOf[Added].getName
final val SubtractedManifest = classOf[Subtracted].getName
final val AddedResultManifest = classOf[AddedResult].getName
final val SubtractedResultManifest = classOf[SubtractedResult].getName
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
println("inside fromBinary"+manifest)
manifest match {
case AddedManifest => Added.parseFrom(bytes)
case SubtractedManifest => Subtracted.parseFrom(bytes)
case AddedResultManifest => AddedResult.parseFrom(bytes)
case SubtractedResultManifest => SubtractedResult.parseFrom(bytes)
}
}
override def toBinary(o: AnyRef): Array[Byte] = {
println("inside toBinary ")
o match {
case a: Added => a.toByteArray
case s :Subtracted => s.toByteArray
case aR: AddedResult => aR.toByteArray
case sR: SubtractedResult => sR.toByteArray
}
}
}
然後我們需要在application.conf中告訴akka系統使用這些方法:
actor {
serializers {
proto = "akka.protobuf.serializer.ProtobufSerializer"
}
serialization-bindings {
"java.io.Serializable" = none
"com.google.protobuf.Message" = proto
"learn.proto.messages.Added" = proto
"learn.proto.messages.AddedResult" = proto
"learn.proto.messages.Subtracted" = proto
"learn.proto.messages.SubtractedResult" = proto
}
}
現在再重新運行:
[INFO] [04/30/2018 18:41:02.348] [calcSystem-akka.actor.default-dispatcher-2] [akka.tcp://[email protected]:2551/user/calcRunner] Remote calculator started!
inside toBinary
inside fromBinarylearn.proto.messages.AddedResult
[INFO] [04/30/2018 18:41:03.234] [calcSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2551/user/calcRunner] 18 + 38 = 56
inside toBinary
inside fromBinarylearn.proto.messages.AddedResult
[INFO] [04/30/2018 18:41:04.197] [calcSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2551/user/calcRunner] 22 + 74 = 96
系統使用了自定義的ProtobufferSerializer。
下麵是本次示範的完整源代碼:
project/scalapb.sbt
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"
build.sbt
lazy val commonSettings = Seq(
name := "AkkaProtobufDemo",
version := "1.0",
scalaVersion := "2.12.6",
)
lazy val local = (project in file("."))
.settings(commonSettings)
.settings(
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-remote" % "2.5.11",
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
),
name := "akka-protobuf-demo"
)
lazy val remote = (project in file("remote"))
.settings(commonSettings)
.settings(
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-remote" % "2.5.11"
),
name := "remote-system"
).dependsOn(local)
PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value
)
resources/application.conf
akka {
actor {
provider = remote
}
remote {
netty.tcp {
hostname = "127.0.0.1"
}
}
actor {
serializers {
proto = "akka.protobuf.serializer.ProtobufSerializer"
}
serialization-bindings {
"java.io.Serializable" = none
"com.google.protobuf.Message" = proto
"learn.proto.messages.Added" = proto
"learn.proto.messages.AddedResult" = proto
"learn.proto.messages.Subtracted" = proto
"learn.proto.messages.SubtractedResult" = proto
}
}
}
main/protobuf/messages.proto
syntax = "proto3";
// Brought in from scalapb-runtime
import "scalapb/scalapb.proto";
import "google/protobuf/wrappers.proto";
package learn.proto;
message Added {
int32 nbr1 = 1;
int32 nbr2 = 2;
}
message Subtracted {
int32 nbr1 = 1;
int32 nbr2 = 2;
}
message AddedResult {
int32 nbr1 = 1;
int32 nbr2 = 2;
int32 result = 3;
}
message SubtractedResult {
int32 nbr1 = 1;
int32 nbr2 = 2;
int32 result = 3;
}
remote/Calculator.scala
package akka.protobuf.calculator
import akka.actor._
import com.typesafe.config.ConfigFactory
import learn.proto.messages._
class Calculator extends Actor with ActorLogging {
override def receive: Receive = {
case Added(a,b) =>
log.info("Calculating %d + %d".format(a, b))
sender() ! AddedResult(a,b,a+b)
case Subtracted(a,b) =>
log.info("Calculating %d - %d".format(a, b))
sender() ! SubtractedResult(a,b,a-b)
}
}
object Calculator {
def props = Props(new Calculator)
}
object CalculatorStarter extends App {
val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2552")
.withFallback(ConfigFactory.load())
val calcSystem = ActorSystem("calcSystem",config)
calcSystem.actorOf(Calculator.props,"calculator")
println("press any key to end program ...")
scala.io.StdIn.readLine()
calcSystem.terminate()
}
CalcService.scala
package akka.protobuf.calcservice
import akka.actor._
import learn.proto.messages._
import scala.concurrent.duration._
class CalcRunner(path: String) extends Actor with ActorLogging {
sendIdentifyRequest()
def sendIdentifyRequest(): Unit = {
context.actorSelection(path) ! Identify(path)
import context.dispatcher
context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout)
}
def receive = identifying
def identifying : Receive = {
case ActorIdentity(calcPath,Some(calcRef)) if (path.equals(calcPath)) =>
log.info("Remote calculator started!")
context.watch(calcRef)
context.become(calculating(calcRef))
case ActorIdentity(_,None) =>
log.info("Remote calculator not found!")
case ReceiveTimeout =>
sendIdentifyRequest()
case s @ _ =>
log.info(s"Remote calculator not ready. [$s]")
}
def calculating(calculator: ActorRef) : Receive = {
case (op : Added) => calculator ! op
case (op : Subtracted) => calculator ! op
case AddedResult(a,b,r) =>
log.info(s"$a + $b = $r")
case SubtractedResult(a,b,r) =>
log.info(s"$a - $b = $r")
case Terminated(calculator) =>
log.info("Remote calculator terminated, restarting ...")
sendIdentifyRequest()
context.become(identifying)
case ReceiveTimeout => //nothing
}
}
object CalcRunner {
def props(path: String) = Props(new CalcRunner(path))
}
Main.scala
package akka.protobuf.demo
import akka.actor._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import akka.protobuf.calcservice._
import scala.concurrent.duration._
import scala.util._
import learn.proto.messages._
object Main extends App {
val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2551")
.withFallback(ConfigFactory.load())
val calcSystem = ActorSystem("calcSystem",config)
val calcPath = "akka.tcp://[email protected]:2552/user/calculator"
val calculator = calcSystem.actorOf(CalcRunner.props(calcPath),"calcRunner")
println("Calculator started ...")
import calcSystem.dispatcher
calcSystem.scheduler.schedule(1.second, 1.second) {
if (Random.nextInt(100) % 2 == 0)
calculator ! Added(Random.nextInt(100), Random.nextInt(100))
else
calculator ! Subtracted(Random.nextInt(100), Random.nextInt(100))
}
scala.io.StdIn.readLine()
}
ProtobufferSerializer.scala
package akka.protobuf.serializer
import akka.serialization.SerializerWithStringManifest
import learn.proto.messages._
class ProtobufSerializer extends SerializerWithStringManifest{
def identifier: Int = 101110116
override def manifest(o: AnyRef): String = o.getClass.getName
final val AddedManifest = classOf[Added].getName
final val SubtractedManifest = classOf[Subtracted].getName
final val AddedResultManifest = classOf[AddedResult].getName
final val SubtractedResultManifest = classOf[SubtractedResult].getName
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
println("inside fromBinary"+manifest)
manifest match {
case AddedManifest => Added.parseFrom(bytes)
case SubtractedManifest => Subtracted.parseFrom(bytes)
case AddedResultManifest => AddedResult.parseFrom(bytes)
case SubtractedResultManifest => SubtractedResult.parseFrom(bytes)
}
}
override def toBinary(o: AnyRef): Array[Byte] = {
println("inside toBinary ")
o match {
case a: Added => a.toByteArray
case s :Subtracted => s.toByteArray
case aR: AddedResult => aR.toByteArray
case sR: SubtractedResult => sR.toByteArray
}
}
}