ScalaPB(1): using protobuf in akka

来源:https://www.cnblogs.com/tiger-xc/archive/2018/04/30/8974317.html
-Advertisement-
Play Games

任何類型的實例作為消息在兩端獨立系統的機器之間進行傳遞時必須經過序列化/反序列化serialize/deserialize處理過程。假設以下場景:在一個網路里有兩台連接的伺服器,它們分別部署了獨立的akka系統。如果我們需要在這兩台伺服器的akka系統之間進行消息交換的話,所有消息都必須經過序列化/ ...


    任何類型的實例作為消息在兩端獨立系統的機器之間進行傳遞時必須經過序列化/反序列化serialize/deserialize處理過程。假設以下場景:在一個網路里有兩台連接的伺服器,它們分別部署了獨立的akka系統。如果我們需要在這兩台伺服器的akka系統之間進行消息交換的話,所有消息都必須經過序列化/反序列化處理。akka系統對於用戶自定義消息類型的預設序列化處理是以java-object serialization 方式進行的。我們上次提過:由於java-object-serialization會把一個java-object的類型信息、實例值、它所包含的其它類型描述信息等都寫入序列化的結果里,所以會占據較大空間,傳輸數據的效率相對就低了。protobuf是binary格式的,基本只包括實例值,所以數據傳輸效率較高。下麵我們就介紹如何在akka系統中使用protobuf序列化。在akka中使用自定義序列化方法包括下麵的這些步驟:

1、在.proto文件中對消息類型進行IDL定義

2、用ScalaPB編譯IDL文件並產生scala源代碼。這些源代碼中包括了涉及的消息類型及它們的操作方法

3、在akka程式模塊中import產生的classes,然後直接調用這些類型和方法

4、按akka要求編寫序列化方法

5、在akka的.conf文件里actor.serializers段落中定義akka的預設serializer

下麵的build.sbt文件里描述了程式結構:

lazy val commonSettings = Seq(
  name := "AkkaProtobufDemo",
  version := "1.0",
  scalaVersion := "2.12.6",
)

lazy val local = (project in file("."))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11",
      "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
    ),
    name := "akka-protobuf-demo"
  )

lazy val remote = (project in file("remote"))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11"
    ),
    name := "remote-system"
  ).dependsOn(local)

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value
)

local和remote是兩個分開的項目。我們會在這兩個項目里分別部署akka系統。註意依賴項中的scalapb.runtime。PB.targets指明瞭產生源代碼的路徑。我們還需要在project/scalapb.sbt中指定scalaPB插件: 

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

我們首先在.proto文件里定義消息:

syntax = "proto3";

// Brought in from scalapb-runtime
import "scalapb/scalapb.proto";
import "google/protobuf/wrappers.proto";

package learn.proto;

message Added {

    int32 nbr1 = 1;
    int32 nbr2 = 2;
}

message Subtracted {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
}

message AddedResult {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
    int32 result = 3;
}

message SubtractedResult {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
    int32 result = 3;
}

現在我們先在remote項目里定義一個Calculator actor:

package akka.protobuf.calculator
import akka.actor._
import com.typesafe.config.ConfigFactory
import learn.proto.messages._

class Calculator extends Actor with ActorLogging {


  override def receive: Receive = {
    case Added(a,b) =>
      log.info("Calculating %d + %d".format(a, b))
      sender() ! AddedResult(a,b,a+b)
    case Subtracted(a,b) =>
      log.info("Calculating %d - %d".format(a, b))
      sender() ! SubtractedResult(a,b,a-b)
  }

}

object Calculator {
  def props = Props(new Calculator)
}

object CalculatorStarter extends App {

  val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2552")
    .withFallback(ConfigFactory.load())

  val calcSystem = ActorSystem("calcSystem",config)

  calcSystem.actorOf(Calculator.props,"calculator")

  println("press any key to end program ...")

  scala.io.StdIn.readLine()

  calcSystem.terminate()

}

運行CalculatorStarter產生一個calculator actor:  akka.tcp://[email protected]:2552/user/calculator

下麵我們在local項目里從埠2551上部署另一個akka系統,然後調用埠2552上部署akka系統的calculator actor:

package akka.protobuf.calcservice
import akka.actor._
import learn.proto.messages._
import scala.concurrent.duration._

class CalcRunner(path: String) extends Actor with ActorLogging {
  sendIdentifyRequest()

  def sendIdentifyRequest(): Unit = {
    context.actorSelection(path) ! Identify(path)
    import context.dispatcher
    context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout)
  }

  def receive = identifying

  def identifying : Receive = {
    case ActorIdentity(calcPath,Some(calcRef)) if (path.equals(calcPath)) =>
      log.info("Remote calculator started!")
      context.watch(calcRef)
      context.become(calculating(calcRef))
    case ActorIdentity(_,None) =>
      log.info("Remote calculator not found!")
    case ReceiveTimeout =>
      sendIdentifyRequest()
    case s @ _ =>
      log.info(s"Remote calculator not ready. [$s]")
  }

  def calculating(calculator: ActorRef) : Receive = {
    case (op : Added) => calculator ! op
    case (op : Subtracted) => calculator ! op

    case AddedResult(a,b,r)  =>
      log.info(s"$a + $b = $r")
    case SubtractedResult(a,b,r) =>
      log.info(s"$a - $b = $r")

    case Terminated(calculator) =>
      log.info("Remote calculator terminated, restarting ...")
      sendIdentifyRequest()
      context.become(identifying)

    case ReceiveTimeout => //nothing
  }

}

object CalcRunner {
  def props(path: String) = Props(new CalcRunner(path))
}

這個CalcRunner是一個actor,在程式里首先通過向remote項目中的calculator-actor傳送Identify消息以取得具體的ActorRef。然後用這個ActorRef與calculator-actor進行交互。這其中Identify是akka預定消息類型,其它消息都是ScalaPB從.proto文件中產生的。下麵是local項目的運算程式:

 

package akka.protobuf.demo
import akka.actor._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import akka.protobuf.calcservice._

import scala.concurrent.duration._
import scala.util._
import learn.proto.messages._

object Main extends App {

  val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2551")
    .withFallback(ConfigFactory.load())

  val calcSystem = ActorSystem("calcSystem",config)

  val calcPath = "akka.tcp://[email protected]:2552/user/calculator"

  val calculator = calcSystem.actorOf(CalcRunner.props(calcPath),"calcRunner")

  println("Calculator started ...")

  import calcSystem.dispatcher

  calcSystem.scheduler.schedule(1.second, 1.second) {
    if (Random.nextInt(100) % 2 == 0)
      calculator ! Added(Random.nextInt(100), Random.nextInt(100))
    else
      calculator ! Subtracted(Random.nextInt(100), Random.nextInt(100))
  }


  scala.io.StdIn.readLine()

}

 

配置文件application.conf:

 

akka {

  actor {
    provider = remote
  }

  remote {
    netty.tcp {
      hostname = "127.0.0.1"
    }
  }

}

 

先運行remote然後local。註意下麵出現的提示:

 

[akka.serialization.Serialization(akka://calcSystem)] Using the default Java serializer for class [learn.proto.messages.Added] which is not recommended because of performance implications. Use another serializer 

下麵是protobuf類型的序列化方法:

package akka.protobuf.serializer

import akka.serialization.SerializerWithStringManifest
import learn.proto.messages._


class ProtobufSerializer extends SerializerWithStringManifest{

  def identifier: Int = 101110116

  override def manifest(o: AnyRef): String = o.getClass.getName
  final val AddedManifest = classOf[Added].getName
  final val SubtractedManifest = classOf[Subtracted].getName
  final val AddedResultManifest = classOf[AddedResult].getName
  final val SubtractedResultManifest = classOf[SubtractedResult].getName


  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {

    println("inside fromBinary"+manifest)

    manifest match {
      case AddedManifest => Added.parseFrom(bytes)
      case SubtractedManifest => Subtracted.parseFrom(bytes)
      case AddedResultManifest => AddedResult.parseFrom(bytes)
      case SubtractedResultManifest => SubtractedResult.parseFrom(bytes)
    }
  }

  override def toBinary(o: AnyRef): Array[Byte] = {

    println("inside toBinary ")
    o match {
      case a: Added => a.toByteArray
      case s :Subtracted => s.toByteArray
      case aR: AddedResult => aR.toByteArray
      case sR: SubtractedResult => sR.toByteArray
    }
  }
}

然後我們需要在application.conf中告訴akka系統使用這些方法:

  actor {

    serializers {

      proto = "akka.protobuf.serializer.ProtobufSerializer"
    }

    serialization-bindings {

      "java.io.Serializable" = none
      "com.google.protobuf.Message" = proto
      "learn.proto.messages.Added" = proto
      "learn.proto.messages.AddedResult" = proto
      "learn.proto.messages.Subtracted" = proto
      "learn.proto.messages.SubtractedResult" = proto

    }
  }

現在再重新運行:

[INFO] [04/30/2018 18:41:02.348] [calcSystem-akka.actor.default-dispatcher-2] [akka.tcp://[email protected]:2551/user/calcRunner] Remote calculator started!
inside toBinary 
inside fromBinarylearn.proto.messages.AddedResult
[INFO] [04/30/2018 18:41:03.234] [calcSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2551/user/calcRunner] 18 + 38 = 56
inside toBinary 
inside fromBinarylearn.proto.messages.AddedResult
[INFO] [04/30/2018 18:41:04.197] [calcSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2551/user/calcRunner] 22 + 74 = 96

系統使用了自定義的ProtobufferSerializer。

下麵是本次示範的完整源代碼:

project/scalapb.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

build.sbt

lazy val commonSettings = Seq(
  name := "AkkaProtobufDemo",
  version := "1.0",
  scalaVersion := "2.12.6",
)

lazy val local = (project in file("."))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11",
      "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
    ),
    name := "akka-protobuf-demo"
  )

lazy val remote = (project in file("remote"))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11"
    ),
    name := "remote-system"
  ).dependsOn(local)

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value
)

resources/application.conf

akka {
  actor {
    provider = remote
  }
  remote {
    netty.tcp {
      hostname = "127.0.0.1"
    }
  }
  actor {
    serializers {
      proto = "akka.protobuf.serializer.ProtobufSerializer"
    }
    serialization-bindings {
      "java.io.Serializable" = none
      "com.google.protobuf.Message" = proto
      "learn.proto.messages.Added" = proto
      "learn.proto.messages.AddedResult" = proto
      "learn.proto.messages.Subtracted" = proto
      "learn.proto.messages.SubtractedResult" = proto

    }
  }
}

main/protobuf/messages.proto

syntax = "proto3";

// Brought in from scalapb-runtime
import "scalapb/scalapb.proto";
import "google/protobuf/wrappers.proto";

package learn.proto;

message Added {

    int32 nbr1 = 1;
    int32 nbr2 = 2;
}

message Subtracted {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
}

message AddedResult {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
    int32 result = 3;
}

message SubtractedResult {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
    int32 result = 3;
}

remote/Calculator.scala

package akka.protobuf.calculator
import akka.actor._
import com.typesafe.config.ConfigFactory
import learn.proto.messages._

class Calculator extends Actor with ActorLogging {


  override def receive: Receive = {
    case Added(a,b) =>
      log.info("Calculating %d + %d".format(a, b))
      sender() ! AddedResult(a,b,a+b)
    case Subtracted(a,b) =>
      log.info("Calculating %d - %d".format(a, b))
      sender() ! SubtractedResult(a,b,a-b)
  }

}

object Calculator {
  def props = Props(new Calculator)
}

object CalculatorStarter extends App {

  val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2552")
    .withFallback(ConfigFactory.load())

  val calcSystem = ActorSystem("calcSystem",config)

  calcSystem.actorOf(Calculator.props,"calculator")

  println("press any key to end program ...")

  scala.io.StdIn.readLine()

  calcSystem.terminate()

}

CalcService.scala

package akka.protobuf.calcservice
import akka.actor._
import learn.proto.messages._
import scala.concurrent.duration._



class CalcRunner(path: String) extends Actor with ActorLogging {
  sendIdentifyRequest()

  def sendIdentifyRequest(): Unit = {
    context.actorSelection(path) ! Identify(path)
    import context.dispatcher
    context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout)
  }

  def receive = identifying

  def identifying : Receive = {
    case ActorIdentity(calcPath,Some(calcRef)) if (path.equals(calcPath)) =>
      log.info("Remote calculator started!")
      context.watch(calcRef)
      context.become(calculating(calcRef))
    case ActorIdentity(_,None) =>
      log.info("Remote calculator not found!")
    case ReceiveTimeout =>
      sendIdentifyRequest()
    case s @ _ =>
      log.info(s"Remote calculator not ready. [$s]")
  }

  def calculating(calculator: ActorRef) : Receive = {
    case (op : Added) => calculator ! op
    case (op : Subtracted) => calculator ! op

    case AddedResult(a,b,r)  =>
      log.info(s"$a + $b = $r")
    case SubtractedResult(a,b,r) =>
      log.info(s"$a - $b = $r")

    case Terminated(calculator) =>
      log.info("Remote calculator terminated, restarting ...")
      sendIdentifyRequest()
      context.become(identifying)

    case ReceiveTimeout => //nothing
  }

}

object CalcRunner {
  def props(path: String) = Props(new CalcRunner(path))
}

Main.scala

package akka.protobuf.demo
import akka.actor._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import akka.protobuf.calcservice._

import scala.concurrent.duration._
import scala.util._
import learn.proto.messages._

object Main extends App {

  val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2551")
    .withFallback(ConfigFactory.load())

  val calcSystem = ActorSystem("calcSystem",config)

  val calcPath = "akka.tcp://[email protected]:2552/user/calculator"

  val calculator = calcSystem.actorOf(CalcRunner.props(calcPath),"calcRunner")


  println("Calculator started ...")

  import calcSystem.dispatcher

  calcSystem.scheduler.schedule(1.second, 1.second) {
    if (Random.nextInt(100) % 2 == 0)
      calculator ! Added(Random.nextInt(100), Random.nextInt(100))
    else
      calculator ! Subtracted(Random.nextInt(100), Random.nextInt(100))
  }


  scala.io.StdIn.readLine()

}

ProtobufferSerializer.scala

package akka.protobuf.serializer

import akka.serialization.SerializerWithStringManifest
import learn.proto.messages._


class ProtobufSerializer extends SerializerWithStringManifest{

  def identifier: Int = 101110116

  override def manifest(o: AnyRef): String = o.getClass.getName
  final val AddedManifest = classOf[Added].getName
  final val SubtractedManifest = classOf[Subtracted].getName
  final val AddedResultManifest = classOf[AddedResult].getName
  final val SubtractedResultManifest = classOf[SubtractedResult].getName


  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {

    println("inside fromBinary"+manifest)

    manifest match {
      case AddedManifest => Added.parseFrom(bytes)
      case SubtractedManifest => Subtracted.parseFrom(bytes)
      case AddedResultManifest => AddedResult.parseFrom(bytes)
      case SubtractedResultManifest => SubtractedResult.parseFrom(bytes)
    }
  }

  override def toBinary(o: AnyRef): Array[Byte] = {

    println("inside toBinary ")
    o match {
      case a: Added => a.toByteArray
      case s :Subtracted => s.toByteArray
      case aR: AddedResult => aR.toByteArray
      case sR: SubtractedResult => sR.toByteArray
    }
  }
}

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 項目做前後端分離時,我們會經常提供Json數據給前端,如果有一個統一的Json格式返回工具類,那麼將大大提高開發效率和減低溝通成本。 ...
  • K近鄰演算法(KNN)是指一個樣本如果在特征空間中的K個最相鄰的樣本中的大多數屬於某一個類別,則該樣本也屬於這個類別,並具有這個類別上樣本的特性。即每個樣本都可以用它最接近的k個鄰居來代表。KNN演算法適合分類,也適合回歸。KNN演算法廣泛應用在推薦系統、語義搜索、異常檢測。 KNN演算法分類原理圖: 圖中 ...
  • shiro的過濾器也是不多的我們可以自定義的方法,它的繼承體系如下: 另外UserFilter是繼承於AccessControlFilter 1、NameableFilter NameableFilter給Filter起個名字,如果沒有設置預設就是FilterName;還記得之前的如authc嗎?當 ...
  • 文件格式 ①絕對路徑:從盤符開始,相同路徑必定是相同文件 ②相對路徑:不是從盤符開始,從當前文件夾下開始查找文件(相同路徑不易i的那個是相同文件) 訪問模式 說明 r 以只讀方式打開文件。文件的指針將會放在文件的開頭。這是預設模式。 w 打開一個文件只用於寫入。如果該文件已存在則將其覆蓋。如果該文件 ...
  • 一 寫在開頭 1.1 本文內容 C語言是一門古老而又高深莫測的編程語言,她身上總是閃爍著各種“巨坑”(對於我這種沒參透的菜鳥而言)。實踐出真知,親們在看C語言的資料時可千萬別想當然啊。 二 開始裝13 這是某本關於C語言指針的書中的一個小部分,具體書名不說了,內容如下圖所示。 我於是寫了一段代碼進行 ...
  • 網上關於PyQt5的教程很少,特別是界面跳轉這一塊兒,自己研究了半天,下來和大家分享一下 一、首先是主界面 二、跳轉界面Demo1 三、跳轉界面Demo2 ...
  • [TOC] Introducing Python Object Types 對象類型的優勢 1. Built in objects make programs easy to write 2. Built in objects are components of extensions 3. Buil ...
  • 三大框架整合的思路 1、Dao層: Mybatis的配置文件:SqlMapConfig.xml 不需要配置任何內容,需要有文件頭。文件必須存在。 applicationContext-dao.xml: mybatis整合spring,通過由spring創建資料庫連接池,spring管理SqlSess ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...