Spark RPC框架源碼分析(一)簡述

来源:https://www.cnblogs.com/listenfwind/archive/2019/02/26/10434983.html
-Advertisement-
Play Games

Spark RPC 框架對 Spark 來說是至關重要的,它在 Spark 中擔任中樞的作用。 ...


一. Spark rpc框架概述

Spark是最近幾年已經算是最為成功的大數據計算框架,那麼這次我們就來介紹它內部的一個小點,Spark RPC框架。

在介紹之前,我們需要先說明什麼是RPC,引用百度百科:

RPC(Remote Procedure Call)—遠程過程調用,它是一種通過網路從遠程電腦程式上請求服務,而不需要瞭解底層網路技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通信程式之間攜帶信息數據。

Spark RPC可以說是Spark分散式集群的基礎,若是將Spark類比為一個人的話,Spark RPC無疑就是它的血液部分。而在Spark1.6之前,它的RPC部分還是用akka實現的,但之後底層就換成了netty來實現。為什麼要這樣做呢?因為啊,這樣將Spark和Akka耦合在了一起,如果你系統本身就有使用到Akka,然後又想使用Spark的話,那兩個Akka框架版本不一致可怎麼辦呀,這無疑是很讓人頭痛的。Spark團隊正是考慮到了這一點,所以將Akka替換成了netty。

這次我們就來看看Spark是如何讓它的血液流動起來的吧。有一位大神將Spark RPC中的RPC部分剝離出來,弄成一個新的可運行的 RPC 項目,這個項目本身就可以當作一個簡易的Akka來使用,地址在這Spark RPC

雖然名字不一樣,但這個項目的類和內容基本和Spark Core中RPC部分的代碼和結構基本是一樣的,這樣我們就可以通過這個來學習Spark RPC框架。

PS:所用spark版本:spark2.1.0

二.Spark RPC中的 Hello world

我們程式員學東西最喜歡從一個Hello world開始,那麼接下來我們就來演示如何下載並運行最簡單的Hello World例子吧。

首先,我使用的編譯器是IDEA,通過idea將github上的代碼clone下來。
可以看到項目目錄下有兩個模塊,

  • kraps-rpc
  • kraps-rpc-example

kraps-rpc存放的是Spark RPC的源代碼,而我們要做的即是運行 kraps-rpc-example中的示例代碼。

啟動PRC的話首先需要啟動Server端,開啟監聽服務,然後才能通過Client進行訪問。這裡在HelloworldServer.scala中都已經幫我們寫好,不過在main方法中需要修改一下內容,就是將host改為本機地址。

  def main(args: Array[String]): Unit = {
//    val host = args(0)
    val host = "localhost"
    val config = RpcEnvServerConfig(new RpcConf(), "hello-server", host, 52345)
    val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
    val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
    rpcEnv.setupEndpoint("hello-service", helloEndpoint)
    rpcEnv.awaitTermination()
  }

然後我們只需要右鍵該文件然後執行即可。

接下來我們就需要啟動Client端代碼,我們先到HelloworldClient文件中,這裡面提供了同步和非同步兩個方法可以運行。代碼同樣都已經寫好,通過修改註釋即可使用不同的方法運行。同樣是右鍵點擊該文件執行。

  def main(args: Array[String]): Unit = {
    //非同步方法
    //asyncCall()
    //同步方法
    syncCall()
  }

非同步方法中,ask會返回一個Future(註意這裡的Future是scala中的Future,和java的是不一樣的)。並且在Future運行結果出來前,我們可以去做其他事情(非同步的優勢所在)。scala中的Future和Java的Future有些不同,不過這可以先不去管,先當作Java裡面的Future即可。

  def asyncCall() = {
    val rpcConf = new RpcConf()
    val config = RpcEnvClientConfig(rpcConf, "hello-client")
    val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
    val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service")
    val future: Future[String] = endPointRef.ask[String](SayHi("neo"))
    future.onComplete {
      case scala.util.Success(value) => println(s"Got the result = $value")
      case scala.util.Failure(e) => println(s"Got error: $e")
    }
    Await.result(future, Duration.apply("3s"))
    //在future結果運行出來前,會先列印這條語句。
    println("print me at first!")
    Thread.sleep(7)
  }

而同步方法是直接將結果返回,並且會阻塞,這個時間內你無法做其他事情,只能等待,直到結果返回

  def syncCall() = {
    val rpcConf = new RpcConf()
    val config = RpcEnvClientConfig(rpcConf, "hello-client")
    val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
    val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service")
    val result = endPointRef.askWithRetry[String](SayBye("neo"))
    println(result)

  }

很簡單是吧,運行過例子後,我們就可以來瞭解一些Spark RPC運行過程中至關重要的兩個編程模型,以及在這其中使用到的一些主要的類。

三.Spark RPC中的兩個編程模型以及各個類

Spark RPC是使用了Actor模型和Reactor模型的混合模式,我們結合兩種模型分別說明Spark RPC中各個類的作用:

首先我們先來看Spark RPC的類圖。

Spark RPC 類圖

是不是感覺很亂?沒事,我們來逐步剖析各個類。

為了更加清楚了說明各個類的關係,我們要先知道兩個模型,分別是Actor模型和Reactor模型,我們將從這兩個模型的角度來拆解各個類的關係。

Actor模型

其實之前也有寫過一篇介紹Actor模型的文章,感興趣的同學可以點擊這裡查看Actor模型淺析

其實Actor主要就是這副圖的內容:
Actor併發編程模型
在Spark RPC中有幾個類分別與Actor模型中的各個角色對應,對應如下,左邊的是Spark RPC中的類,右邊的是Actor模型中的角色:

RpcEndpoint => Actor

RpcEndpointRef => ActorRef

RpcEnv => ActorSystem

我們逐個來看:

RpcEnv --RPC Environment

RPC Environment 是 RpcEndpoint 的運行環境。它管理 RpcEndpoint 的整個生命周期:

  1. 通過名字或 URI 註冊 RpcEndpoint。
  2. 對到底的消息進行路由,決定分發給哪個 RpcEndpoint。
  3. 停止 RpcEndpoint。

RPC Environment在akka已經被移除的2.0後面版本中,RPC Environment的實現類是NettyRpcEnv。通常是由NettyRpcEnvFactory.create創建。

RpcEndpoint

RpcEndpoint能通過callbacks接收消息。通常需要我們自己寫一個類繼承RpcEndpoint。編寫自己的接收信息和返回信息規則。

RpcEndpoint的生命周期被RPC Environment管理。其生命周期包括,onStart,receive和onStop。

它是作為服務端,比如上面例子中的HelloworldServer就是一個RpcEndpoint。

RpcEndpointRef

RpcEndpointRef是RpcEndpoint在RPC Environment中的一個引用。

它包含一個地址(即Spark URL)和名字。RpcEndpointRef作為客戶端向服務端發送請求並接收返回信息,通常可以選擇使用同步或非同步的方式進行發送。

Reactor模型

Spark RPC採用Actor模型和Reactor模型混合的結構,上面已經介紹了Actor,那麼現在我們就來介紹Reactor模型,同樣,我們可以從一張圖來看Reactor的架構。

Reactor模型

使用Reactor模型,由底層netty創建的EventLoop做I/O多路復用,這裡使用Multiple Reactors這種形式,如上圖所示,從netty的角度而言,Main Reactor和Sub Reactor對應BossGroup和WorkerGroup的概念,前者負責監聽TCP連接、建立和斷開,後者負責真正的I/O讀寫。

而圖中的ThreadPool就是的Dispatcher中的線程池,它來解耦開來耗時的業務邏輯和I/O操作,這樣就可以更scalabe,只需要少數的線程就可以處理成千上萬的連接,這種思想是標準的分治策略,offload非I/O操作到另外的線程池。

Dispatcher

Dispatcher的主要作用是保存註冊的RpcEndpoint、分發相應的Message到RpcEndPoint中進行處理。Dispatcher即是上圖中ThreadPool的角色。它同時也維繫一個threadpool,用來處理每次接受到的InboxMessage。而這裡處理InboxMessage是通過inbox實現的。

Inbox

Inbox其實屬於Actor模型,是Actor中的信箱,不過它和Dispatcher聯繫緊密所以放這邊。

InboxMessage有多個實現它的類,比如OneWayMessage,RpcMessage,等等。Dispatcher會將接收到的InboxMessage分發到對應RpcEndpoint的Inbox中,然後Inbox便會處理這個InboxMessage。

OK,這次就先介紹到這裡,下次我們從代碼的角度來看Spark RPC的運行機制

如果覺得對你有幫助,不妨關註一波吧~~

參考資料:https://zhuanlan.zhihu.com/p/28893155


推薦閱讀:
從分治演算法到 MapReduce
Actor併發編程模型淺析
大數據存儲的進化史 --從 RAID 到 Hadoop Hdfs
一個故事告訴你什麼才是好的程式員


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、什麼是OCTO 定義: OCTO是美團的分散式服務通信框架及服務治理系統,屬於公司級基礎設施,目前尚未開源。 目標: 為公司所有業務提供統一的服務通信框架,使業務具備良好的服務運營能力,輕鬆實現服務註冊、服務自動發現、負載均衡、容錯、灰度發佈、調用數據可視化等,持續提升服務高可用性、服務運維效率 ...
  • LieBrother原文 : "行為型模式:責任鏈模式" 十一大行為型模式之四:責任鏈模式。 簡介 姓名 :責任鏈模式 英文名 :Chain of Responsibility Pattern 價值觀 :責任歸我 個人介紹 : Avoid coupling the sender of a reque ...
  • 定義:裝飾模式是在不必改變原類文件和使用繼承的情況下,動態的擴展一個對象的功能。它是通過創建一個包裝對象,也就是裝飾來包裹真實的對象。 裝飾器模式是為已有功能添加更多功能的一種方式,就增加功能來說,裝飾器模式比通過生成子類更為靈活。該模式通過將裝飾的功能放在單獨的類中,並讓這些類包含了需要進行裝飾的 ...
  • 方法一:繼承 Thread 類,覆蓋方法 run(),我們在創建的 Thread 類的子類中重寫 run() ,加入線程所要執行的代碼即可。 下麵是一個例子: 這種方法簡單明瞭,符合大家的習慣,但是,它也有一個很大的缺點,那就是如果我們的類已經從一個類繼承(如小程式必須繼承自 Applet 類),則 ...
  • 一、格式化拼接、format 1.字元串拼接 name = "Monica", age = 16 print("姓名"+name+“年齡”+age+".") 2.占位符 %s:string,%d:整數,%f:浮點 info1 = ‘’‘姓名:%s 年齡:%s’‘’%(name,age) print( ...
  • 題意 "題目鏈接" Sol 題解好神仙啊qwq。 一般看到這種考慮最大值的貢獻的題目不難想到單調數據結構 對於本題而言,我們可以預處理出每個位置左邊第一個比他大的位置$l_i$以及右邊第一個比他大的位置$r_i$ 那麼$(l_i, r_i)$會產生$p1$的貢獻 $[l_i + 1, i 1]$和$ ...
  • 這一篇博客,是關於反反爬蟲的,我會分享一些我遇到的反爬蟲的措施,並且會分享我自己的解決辦法。如果能對你有什麼幫助的話,麻煩點一下推薦啦。 一、UserAgent UserAgent中文名為用戶代理,它使得伺服器能夠識別客戶使用的操作系統及版本、CPU 類型、瀏覽器及版本等信息。對於一些網站來說,它會 ...
  • 第89節: 中的反射技術 反射技術是動態的獲取指定的類,和動態的調用類中的內容(沒有類前就可以創建對象,將對象的動作完成,這就是動態的獲取指定的類)。 配置文件把具體實現的類名稱定義到配置文件中。 反射技術的作用可以提高程式的擴展性。 類 用於創建並返回此對象的一個副本 用於指示其他某個對象是否與這 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...