高性能分散式執行框架——Ray

来源:http://www.cnblogs.com/fanzhidongyzby/archive/2017/11/26/7901139.html
-Advertisement-
Play Games

Ray是UC Berkeley RISELab新推出的高性能分散式執行框架,它使用了和傳統分散式計算系統不一樣的架構和對分散式計算的抽象方式,具有比Spark更優異的計算性能。 Ray目前還處於實驗室階段,最新版本為 "0.2.2版本" 。雖然Ray自稱是面向AI應用的分散式計算框架,但是它的架構具 ...


Ray是UC Berkeley RISELab新推出的高性能分散式執行框架,它使用了和傳統分散式計算系統不一樣的架構和對分散式計算的抽象方式,具有比Spark更優異的計算性能。

Ray目前還處於實驗室階段,最新版本為0.2.2版本。雖然Ray自稱是面向AI應用的分散式計算框架,但是它的架構具有通用的分散式計算抽象。本文對Ray進行簡單的介紹,幫助大家更快地瞭解Ray是什麼,如有描述不當的地方,歡迎不吝指正。

一、簡單開始

首先來看一下最簡單的Ray程式是如何編寫的。

# 導入ray,並初始化執行環境
import ray
ray.init()

# 定義ray remote函數
@ray.remote
def hello():
    return "Hello world !"

# 非同步執行remote函數,返回結果id
object_id = hello.remote()

# 同步獲取計算結果
hello = ray.get(object_id)

# 輸出計算結果
print hello

在Ray里,通過Python註解@ray.remote定義remote函數。使用此註解聲明的函數都會自帶一個預設的方法remote,通過此方法發起的函數調用都是以提交分散式任務的方式非同步執行的,函數的返回值是一個對象id,使用ray.get內置操作可以同步獲取該id對應的對象。熟悉Java里的Future機制的話對此應該並不陌生,或許會有人疑惑這和普通的非同步函數調用沒什麼大的區別,但是這裡最大的差異是,函數hello是分散式非同步執行的。

remote函數是Ray分散式計算抽象中的核心概念,通過它開發者擁有了動態定製計算依賴(任務DAG)的能力。比如:

@ray.remote
def A():
    return "A"

@ray.remote
def B():
    return "B"

@ray.remote
def C(a, b):
    return "C"

a_id = A.remote()
b_id = B.remote()
c_id = C.remote(a_id, b_id)
print ray.get(c_id)

例子代碼中,對函數A、B的調用是完全並行執行的,但是對函數C的調用依賴於A、B函數的返回結果。Ray可以保證函數C需要等待A、B函數的結果真正計算出來後才會執行。如果將函數A、B、C類比為DAG的節點的話,那麼DAG的邊就是函數C參數對函數A、B計算結果的依賴,自由的函數調用方式允許Ray可以自由地定製DAG的結構和計算依賴關係。另外,提及一點的是Python的函數可以定義函數具有多個返回值,這也使得Python的函數更天然具備了DAG節點多入和多出的特點。

二、系統架構

Ray是使用什麼樣的架構對分散式計算做出如上抽象的呢,一下給出了Ray的系統架構(來自Ray論文,參考文獻1)。

作為分散式計算系統,Ray仍舊遵循了典型的Master-Slave的設計:Master負責全局協調和狀態維護,Slave執行分散式計算任務。不過和傳統的分散式計算系統不同的是,Ray使用了混合任務調度的思路。在集群部署模式下,Ray啟動了以下關鍵組件:

  1. GlobalScheduler:Master上啟動了一個全局調度器,用於接收本地調度器提交的任務,並將任務分發給合適的本地任務調度器執行。
  2. RedisServer:Master上啟動了一到多個RedisServer用於保存分散式任務的狀態信息(ControlState),包括對象機器的映射、任務描述、任務debug信息等。
  3. LocalScheduler:每個Slave上啟動了一個本地調度器,用於提交任務到全局調度器,以及分配任務給當前機器的Worker進程。
  4. Worker:每個Slave上可以啟動多個Worker進程執行分散式任務,並將計算結果存儲到ObjectStore。
  5. ObjectStore:每個Slave上啟動了一個ObjectStore存儲只讀數據對象,Worker可以通過共用記憶體的方式訪問這些對象數據,這樣可以有效地減少記憶體拷貝和對象序列化成本。ObjectStore底層由Apache Arrow實現。
  6. Plasma:每個Slave上的ObjectStore都由一個名為Plasma的對象管理器進行管理,它可以在Worker訪問本地ObjectStore上不存在的遠程數據對象時,主動拉取其它Slave上的對象數據到當前機器。

需要說明的是,Ray的論文中提及,全局調度器可以啟動一到多個,而目前Ray的實現文檔里討論的內容都是基於一個全局調度器的情況。我猜測可能是Ray尚在建設中,一些機制還未完善,後續讀者可以留意此處的細節變化。

Ray的任務也是通過類似Spark中Driver的概念的方式進行提交的,有所不同的是:

  1. Spark的Driver提交的是任務DAG,一旦提交則不可更改。
  2. 而Ray提交的是更細粒度的remote function,任務DAG依賴關係由函數依賴關係自由定製。

論文給出的架構圖裡並未畫出Driver的概念,因此我在其基礎上做了一些修改和擴充。

Ray的Driver節點和和Slave節點啟動的組件幾乎相同,不過卻有以下區別:

  1. Driver上的工作進程DriverProcess一般只有一個,即用戶啟動的PythonShell。Slave可以根據需要創建多個WorkerProcess。
  2. Driver只能提交任務,卻不能接收來自全局調度器分配的任務。Slave可以提交任務,也可以接收全局調度器分配的任務。
  3. Driver可以主動繞過全局調度器給Slave發送Actor調用任務(此處設計是否合理尚不討論)。Slave只能接收全局調度器分配的計算任務。

三、核心操作

基於以上架構,我們簡單討論一下Ray中關鍵的操作和流程。

1. ray.init()

在PythonShell中,使用ray.init()可以在本地啟動ray,包括Driver、HeadNode(Master)和若幹Slave。

import ray
ray.init()

如果是直連已有的Ray集群,只需要指定RedisServer的地址即可。

ray.init(redis_address="<redis-address>")

本地啟動Ray得到的輸出如下:

>>> ray.init()
Waiting for redis server at 127.0.0.1:58807 to respond...
Waiting for redis server at 127.0.0.1:23148 to respond...
Allowing the Plasma store to use up to 13.7439GB of memory.
Starting object store with directory /tmp and huge page support disabled
Starting local scheduler with 8 CPUs, 0 GPUs

======================================================================
View the web UI at http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5
======================================================================

{'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store73540254', manager_name='/tmp/plasma_manager78072648', manager_port=39874)], 'redis_address': '127.0.0.1:58807', 'local_scheduler_socket_names': ['/tmp/scheduler98624129'], 'webui_url': 'http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5', 'node_ip_address': '127.0.0.1'}
>>> 

本地啟動Ray時,可以看到Ray的WebUI的訪問地址。

2. ray.put()

使用ray.put()可以將Python對象存入本地ObjectStore,並且非同步返回一個唯一的ObjectID。通過該ID,Ray可以訪問集群中任一個節點上的對象(遠程對象通過查閱Master的對象表獲得)。

對象一旦存入ObjectStore便不可更改,Ray的remote函數可以將直接將該對象的ID作為參數傳入。使用ObjectID作為remote函數參數,可以有效地減少函數參數的寫ObjectStore的次數。

@ray.remote
def f(x):
    pass

x = "hello"

# 對象x往ObjectStore拷貝里10次
[f.remote(x) for _ in range(10)]

# 對象x僅往ObjectStore拷貝1次
x_id = ray.put(x)
[f.remote(x_id) for _ in range(10)]

3. ray.get()

使用ray.get()可以通過ObjectID獲取ObjectStore內的對象並將之轉換為Python對象。對於數組類型的對象,Ray使用共用記憶體機制減少數據的拷貝成本。而對於其它對象則需要將數據從ObjectStore拷貝到進程的堆記憶體中。

如果調用ray.get()操作時,對象尚未創建好,則get操作會阻塞,直到對象創建完成後返回。get操作的關鍵流程如下:

  1. Driver或者Worker進程首先到ObjectStore內請求ObjectID對應的對象數據。
  2. 如果本地ObjectStore沒有對應的對象數據,本地對象管理器Plasma會檢查Master上的對象表查看對象是否存儲其它節點的ObjectStore。
  3. 如果對象數據在其它節點的ObjectStore內,Plasma會發送網路請求將對象數據拉到本地ObjectStore。
  4. 如果對象數據還沒有創建好,Master會在對象創建完成後通知請求的Plasma讀取。
  5. 如果對象數據已經被所有的ObjectStore移除(被LRU策略刪除),本地調度器會根據任務血緣關係執行對象的重新創建工作。
  6. 一旦對象數據在本地ObjectStore可用,Driver或者Worker進程會通過共用記憶體的方式直接將對象記憶體區域映射到自己的進程地址空間中,並反序列化為Python對象。

另外,ray.get()可以一次性讀取多個對象的數據:

result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids)  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

4. @ray.remote

Ray中使用註解@ray.remote可以聲明一個remote function。remote函數時Ray的基本任務調度單元,remote函數定義後會立即被序列化存儲到RedisServer中,並且分配了一個唯一的ID,這樣就保證了集群的所有節點都可以看到這個函數的定義。

不過,這樣對remote函數定義有了一個潛在的要求,即remote函數內如果調用了其它的用戶函數,則必須提前定義,否則remote函數無法找到對應的函數定義內容。

remote函數內也可以調用其它的remote函數,Driver和Slave每次調用remote函數時,其實都是向集群提交了一個計算任務,從這裡也可以看到Ray的分散式計算的自由性。

Ray中調用remote函數的關鍵流程如下:

  1. 調用remote函數時,首先會創建一個任務對象,它包含了函數的ID、參數的ID或者值(Python的基本對象直接傳值,複雜對象會先通過ray.put()操作存入ObjectStore然後返回ObjectID)、函數返回值對象的ID。
  2. 任務對象被髮送到本地調度器。
  3. 本地調度器決定任務對象是在本地調度還是發送給全局調度器。如果任務對象的依賴(參數)在本地的ObejctStore已經存在且本地的CPU和GPU計算資源充足,那麼本地調度器將任務分配給本地的WorkerProcess執行。否則,任務對象被髮送給全局調度器並存儲到任務表(TaskTable)中,全局調度器根據當前的任務狀態信息決定將任務發給集群中的某一個本地調度器。
  4. 本地調度器收到任務對象後(來自本地的任務或者全局調度分配的任務),會將其放入一個任務隊列中,等待計算資源和本地依賴滿足後分配給WorkerProcess執行。
  5. Worker收到任務對象後執行該任務,並將函數返回值存入ObjectStore,並更新Master的對象表(ObjectTable)信息。

@ray.remote註解有一個參數num_return_vals用於聲明remote函數的返回值個數,基於此實現remote函數的多返回值機制。

@ray.remote(num_return_vals=2)
def f():
    return 1, 2

x_id, y_id = f.remote()
ray.get(x_id)  # 1
ray.get(y_id)  # 2

@ray.remote註解的另一個參數num_gpus可以為任務指定GPU的資源。使用內置函數ray.get_gpu_ids()可以獲取當前任務可以使用的GPU信息。

@ray.remote(num_gpus=1)
def gpu_method():
    return "This function is allowed to use GPUs {}.".format(ray.get_gpu_ids())

5. ray.wait()

ray.wait()操作支持批量的任務等待,基於此可以實現一次性獲取多個ObjectID對應的數據。

# 啟動5個remote函數調用任務
results = [f.remote(i) for i in range(5)]
# 阻塞等待4個任務完成,超時時間為2.5s
ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)

上述例子中,results包含了5個ObjectID,使用ray.wait操作可以一直等待有4個任務完成後返回,並將完成的數據對象放在第一個list類型返回值內,未完成的ObjectID放在第二個list返回值內。如果設置了超時時間,那麼在超時時間結束後仍未等到預期的返回值個數,則已超時完成時的返回值為準。

6. ray.error_info()

使用ray.error_info()可以獲取任務執行時產生的錯誤信息。

>>> import time
>>> @ray.remote
>>> def f():
>>>     time.sleep(5)
>>>     raise Exception("This task failed!!")
>>> f.remote()
Remote function __main__.f failed with:

Traceback (most recent call last):
  File "<stdin>", line 4, in f
Exception: This task failed!!


  You can inspect errors by running

      ray.error_info()

  If this driver is hanging, start a new one with

      ray.init(redis_address="127.0.0.1:65452")
>>> ray.error_info()
[{'type': 'task', 'message': 'Remote function \x1b[31m__main__.f\x1b[39m failed with:\n\nTraceback (most recent call last):\n  File "<stdin>", line 4, in f\nException: This task failed!!\n', 'data': '{\'function_id\': "Hm\\xde\\x93\'\\x91\\xce\\x13ld\\xf4O\\xd7\\xce\\xc2\\xe1\\x151\\x1e3", \'function_name\': u\'__main__.f\'}'}]

7. Actor

Ray的remote函數只能處理無狀態的計算需求,有狀態的計算需求需要使用Ray的Actor實現。在Python的class定義前使用@ray.remote可以聲明Actor。

@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

使用如下方式創建Actor對象。

a1 = Counter.remote()
a2 = Counter.remote()

Ray創建Actor的流程為:

  1. Master選取一個Slave,並將Actor創建任務分發給它的本地調度器。
  2. 創建Actor對象,並執行它的構造函數。

從流程可以看出,Actor對象的創建時並行的。

通過調用Actor對象的方法使用Actor。

a1.increment.remote()  # ray.get returns 1
a2.increment.remote()  # ray.get returns 1

調用Actor對象的方法的流程為:

  1. 首先創建一個任務。
  2. 該任務被Driver直接分配到創建該Actor對應的本地執行器執行,這個操作繞開了全局調度器(Worker是否也可以使用Actor直接分配任務尚存疑問)。
  3. 返回Actor方法調用結果的ObjectID。

為了保證Actor狀態的一致性,對同一個Actor的方法調用是串列執行的。

四、安裝Ray

如果只是使用Ray,可以使用如下命令直接安裝。

pip intall ray

如果需要編譯Ray的最新源碼進行安裝,按照如下步驟進行(MaxOS):

# 更新編譯依賴包
brew update
brew install cmake pkg-config automake autoconf libtool boost wget
pip install numpy cloudpickle funcsigs click colorama psutil redis flatbuffers cython --ignore-installed six
# 下載源碼編譯安裝
git clone https://github.com/ray-project/ray.git
cd ray/python
python setup.py install
# 測試
python test/runtest.py

# 安裝WebUI需要的庫[可選]
pip install jupyter ipywidgets bokeh

# 編譯Ray文檔[可選]
cd ray/doc
pip install -r requirements-doc.txt
make html
open _build/html/index.html

我在MacOS上安裝jupyter時,遇到了Python的setuptools庫無法升級的情況,原因是MacOS的安全性設置問題,可以使用如下方式解決:

  1. 重啟電腦,啟動時按住Command+R進入Mac保護模式。
  2. 打開命令行,輸入命令csrutils disable關閉系統安全策略。
  3. 重啟電腦,繼續安裝jupyter。
  4. 安裝完成後,重覆如上的方式執行csrutils enable,再次重啟即可。

進入PythonShell,輸入代碼本地啟動Ray:

import ray
ray.init()

瀏覽器內打開WebUI界面如下:

參考資料

  1. Ray論文:Real-Time Machine Learning: The Missing Pieces
  2. Ray開發手冊:http://ray.readthedocs.io/en/latest/index.html
  3. Ray源代碼:https://github.com/ray-project/ray

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Javascript OOP 創建對象: 構造型函數方式: 1、可以創建很多個對象 2、函數裡面是可以寫代碼的 對象字面量形式: Object 實例方法: 1、prototype的方式 2、其它 靜態方法: 成員的訪問: js的2大特性: 1、弱類型 2、動態性 instanceof: 構造器(Co ...
  • 錯誤描述 當form表單加FormGroup屬性時報錯 Can't bind to 'formGroup' since it isn't a known property of 'form' 原因分析與解決方案 在使用form表單時,如果用到了form-group與formControlName,需 ...
  • 隨機生成10個0~100不重覆的數字(包含0和100); 需要用到的知識點:隨機數 去重 下麵放代碼 寫代碼前思路一定要清晰,整理好邏輯再寫會讓你的開發事半功倍! ...
  • HTML5 作為下一代網站開發技術,無論你是一個 Web 開發人員或者想探索新的平臺的游戲開發者,都值得去研究。藉助尖端功能,技術和 API,HTML5 允許你創建響應性、創新性、互動性以及令人驚嘆的漂亮網站。更進一步,你也可以使用 HTML5 創建原來只能用於桌面平臺上的複雜應用程式。 這篇文章挑 ...
  • 最近在學習vue,今天看到自定義事件的表單輸入組件,糾結了一會會然後恍然大悟...官方教程寫得不是很詳細,所以我決定總結一下。 v-model語法糖 v-model實現了表單輸入的雙向綁定,我們一般是這麼寫的: 通過該語句實現price變數與輸入值雙向綁定 實際上v-model只是一個語法糖,真正的 ...
  • 之前有寫過css/css3實現元素的水平和垂直居中的幾種方法點我,但是css3屬性不是所有瀏覽器都能相容的,今天寫下js實現未知寬高的元素的水平和垂直居中。 如果需要在有滾動條的情況的下要實現居中的效果,則需要加上scrollTop和scrollLeft。 ...
  • 這兩天做了一個全選反選的案例,用了幾種方法,剛開始寫的磕磕絆絆,聽老師講解後理清思路,再來寫就很容易了。其實還是寫代碼時候的思路問題。首先要分析功能,然後分步實現,不要攪在一起。下麵幾種方法為比較精簡的方法,用到classList屬性,剛開始寫的兩個方法都比較原始,所以相對比較複雜,就不放上來了。 ...
  • 某天突然寫了個方法要從後臺調用數據,顯示在前臺頁面,但是輸出結果總是空 undefined,得不到數據。多方找資料才發現,原來是入了 JS 非同步的 “坑”。 我們常常聽到單線程、多線程、同步、非同步這些概念,那麼這些東西到底是什麼呢? 那麼我們先從上面那幾個概念說起 o( ̄▽ ̄) ブ 單線程、多線程、 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...