用Python進行實時計算——PyFlink快速入門

来源:https://www.cnblogs.com/tree1123/archive/2020/06/24/13185670.html
-Advertisement-
Play Games

Flink 1.9.0及更高版本支持Python,也就是PyFlink。 在最新版本的Flink 1.10中,PyFlink支持Python用戶定義的函數,使您能夠在Table API和SQL中註冊和使用這些函數。但是,聽完所有這些後,您可能仍然想知道PyFlink的架構到底是什麼?作為PyFlin ...


Flink 1.9.0及更高版本支持Python,也就是PyFlink。

在最新版本的Flink 1.10中,PyFlink支持Python用戶定義的函數,使您能夠在Table API和SQL中註冊和使用這些函數。但是,聽完所有這些後,您可能仍然想知道PyFlink的架構到底是什麼?作為PyFlink的快速指南,本文將回答這些問題。

為什麼需要PyFlink?

Python上的Flink和Flink上的Python

那麼,PyFlink到底是什麼?顧名思義,PyFlink就是Apache Flink與Python的組合,或者說是Python上的Flink。但是Flink on Python是什麼意思?首先,兩者的結合意味著您可以在Python中使用Flink的所有功能。而且,更重要的是,PyFlink還允許您在Flink上使用Python廣泛的生態系統的計算功能,從而可以進一步促進其生態系統的開發。換句話說,這對雙方都是雙贏。如果您更深入地研究這個主題,您會發現Flink框架和Python語言的集成絕不是巧合。

Python和大數據生態系統

python語言與大數據緊密相連。為了理解這一點,我們可以看一下人們正在使用Python解決的一些實際問題。一項用戶調查顯示,大多數人都在使用Python進行數據分析和機器學習應用程式。對於此類情況,大數據空間中還解決了一些理想的解決方案。除了擴大大數據產品的受眾範圍之外,Python和大數據的集成還通過將其獨立體繫結構擴展到分散式體繫結構,極大地增強了Python生態系統的功能。這也解釋了在分析大量數據時對Python的強烈需求。

為什麼選擇Flink和Python?

Python和大數據的集成與其他最近的趨勢一致。但是,再次說明一下,為什麼Flink現在支持Python,而不是Go或R或另一種語言?而且,為什麼大多數用戶選擇PyFlink而不是PySpark和PyHive?

為了理解原因,讓我們首先考慮使用Flink框架的一些優勢:

  • 有利的體繫結構: Flink是具有統一流和批處理功能的純流計算引擎。
  • 新的活力:根據ASF的客觀統計,Flink是2019年最活躍的開源項目。
  • 高可靠性:作為一個開源項目,Flink經過長期測試,並廣泛應用於大數據公司的生產環境中。

接下來,讓我們看看為什麼Flink支持Python而不是其他語言。統計數據顯示,Python是繼Java和C之後最受歡迎的語言,並且自2018年以來一直在快速發展。Java和Scala是Flink的預設語言,但是Flink支持Python似乎是合理的。

PyFlink是相關技術發展的必然產物。但是,僅僅瞭解PyFlink的重要性是不夠的,因為我們的最終目標是使Flink和Python用戶受益並解決實際問題。因此,我們需要進一步探索如何實現PyFlink。

PyFlink架構

要實現PyFlink,我們需要知道要實現的關鍵目標和要解決的核心問題。PyFlink的主要目標是什麼?簡而言之,PyFlink的主要目標如下:

  1. 使所有Flink功能對Python用戶可用。
  2. 在Flink上運行Python的分析和計算功能,以提高Python解決大數據問題的能力。

在此基礎上,讓我們分析實現這些目標需要解決的關鍵問題。

使Flink功能可供Python用戶使用

要實現PyFlink,是否需要像現有Java引擎一樣在Flink上開發Python引擎?答案是NO。嘗試在Flink 1.8版或更早版本中進行,但效果不佳。基本設計原則是以最小的成本實現給定的目標。最簡單但最好的方法是提供一層Python API,並重用現有的計算引擎。

那麼,我們應該為Flink提供哪些Python API?他們對我們很熟悉:高級表API和SQL,以及有狀態的DataStream API。現在,我們越來越接近Flink的內部邏輯,下一步是提供適用於Python的Table API和DataStream API。但是,剩下要解決的關鍵問題到底是什麼呢?

關鍵問題

顯然,關鍵問題在於在Python虛擬機(PyVM)和Java虛擬機(JVM)之間建立握手,這對於Flink支持多種語言至關重要。要解決此問題,我們必須選擇適當的通信技術。

選擇虛擬機通信技術

當前,有兩種解決方案可用於實現PyVM和JVM之間的通信,它們是Beam和Py4J。前者是一個著名的項目,具有多語言和多引擎支持,而後者是用於PyVM和JVM之間通信的專用解決方案。我們可以從幾個不同的角度比較和對比Apache Beam和Py4J,以瞭解它們之間的區別。首先,考慮一個比喻:要越過一堵牆,Py4J會像痣一樣在其中挖一個洞,而Apache Beam會像大熊一樣把整堵牆推倒。從這個角度來看,使用Apache Beam來實現VM通信有點複雜。簡而言之,這是因為Apache Beam專註於通用性,在極端情況下缺乏靈活性。

除此之外,Flink還需要互動式編程。此外,為了使Flink正常工作,我們還需要確保其API設計中的語義一致性,尤其是在其多語言支持方面。Apache Beam的現有體繫結構無法滿足這些要求,因此答案很明顯,Py4J是支持PyVM和JVM之間通信的最佳選擇。

技術架構

在PyVM和JVM之間建立通信之後,我們已經實現了向Python用戶提供Flink功能的第一個目標。我們已經在Flink 1.9版中實現了這一點。現在,讓我們看一下Flink 1.9版中PyFlink API的體繫結構:

Flink 1.9版使用Py4J來實現虛擬機通信。我們為PyVM啟用了網關,為JVM啟用了網關伺服器以接收Python請求。此外,我們還提供了Python API中的TableENV和Table之類的對象,這些對象與Java API中提供的對象相同。因此,編寫Python API的本質是關於如何調用Java API。Flink 1.9版還解決了作業部署問題。它使您可以通過各種方式提交作業,例如運行Python命令以及使用Python Shell和CLI。

但是,此體繫結構提供了哪些優勢?首先,該體繫結構很簡單,並且可以確保Python API和Java API之間的語義一致性。其次,它還提供了與Java作業相當的出色Python作業處理性能。

在Flink上運行Python的分析和計算功能

上一節介紹瞭如何使Flink功能可供Python用戶使用。本節說明如何在Flink上運行Python函數。通常,我們可以通過以下兩種方式之一在Flink上運行Python函數:

  1. 選擇一個典型的Python類庫,並將其API添加到PyFlink。該方法花費很長時間,因為Python包含太多的類庫。在合併任何API之前,我們需要簡化Python執行。
  2. 基於現有的Flink Table API和Python類庫的特征,我們可以將所有現有的Python類庫函數視為用戶定義的函數,並將其集成到Flink中。Flink 1.10及更高版本中支持此功能。功能集成的關鍵問題是什麼?同樣,它取決於Python用戶定義函數的執行。

接下來,讓我們為這個關鍵問題選擇一種技術。

選擇執行用戶定義功能的技術

實際上,執行Python用戶定義的函數非常複雜。它不僅涉及虛擬機之間的通信,還涉及以下所有方面:管理Python執行環境,解析Java和Python之間交換的業務數據,將Flink中的狀態後端傳遞給Python以及監視執行狀態。鑒於所有這些複雜性,現在是Apache Beam發揮作用的時候了。作為支持多種引擎和多種語言的大熊,Apache Beam可以在解決這種情況方面做很多工作,所以讓我們看看Apache Beam如何處理執行Python用戶定義的函數。

下麵顯示了可移植性框架,該框架是Apache Beam的高度抽象的體繫結構,旨在支持多種語言和引擎。當前,Apache Beam支持幾種不同的語言,包括Java,Go和Python。

用戶定義的功能架構

UDF體繫結構不僅需要實現PyVM與JVM之間的通信,還需要在編譯和運行階段滿足不同的要求。在下麵的PyLink用戶定義功能架構圖中,JVM中的行為以綠色表示,而PyVM中的行為以藍色表示。讓我們看看編譯期間的局部設計。本地設計依賴於純API映射調用。Py4J用於VM通信。

現在,讓我們看看Python API和Java API在此架構中的工作方式。在Java方面,JobMaster將作業分配給TaskManager,就像處理普通Java作業一樣,並且TaskManager執行任務,這涉及到操作員在JVM和PyVM中的執行。在Python用戶定義的函數運算符中,我們將設計各種gRPC服務,用於JVM和PyVM之間的通信。例如,用於業務數據通信的DataService和用於Python UDF的StateService來調用Java State後端。還將提供許多其他服務,例如日誌記錄和指標。

我們如何使用PyFlink?

瞭解了PyFlink的體繫結構及其背後的思想之後,我們來看一下PyFlink的特定應用場景,以更好地瞭解其背後的方式和原因。

PyFlink的應用場景

PyFlink支持哪些業務方案?我們可以從兩個角度分析其應用場景:Python和Java。請記住,PyFlink也適用於Java可以應用的所有情況。

  1. 事件驅動的方案,例如實時數據監控。
  2. 數據分析,例如庫存管理和數據可視化。
  3. 數據管道,也稱為ETL方案,例如日誌解析。
  4. 機器學習,例如有針對性的建議。

您可以在所有這些情況下使用PyFlink。PyFlink也適用於特定於Python的方案,例如科學計算。在如此眾多的應用場景中,您可能想知道現在可以使用哪些特定的PyFlink API。因此,現在我們也來研究這個問題。

PyFlink安裝

在使用任何API之前,您需要安裝PyFlink。當前,要安裝PyFlink,請運行命令:pip install apache-Flink

PyFlink API與Java Table API完全一致,以支持各種關係和視窗操作。某些易於使用的PyFlink API比SQL API更為強大,例如特定於列操作的API。除了API,PyFlink還提供了多種定義Python UDF的方法。

PyFlink中用戶定義的函數定義

可以擴展ScalarFunction(例如,通過添加指標)以提供更多輔助功能。另外,PyFlink用戶功能函數支持Python支持的所有方法定義,例如lambda,命名函數和可調用函數。

定義完這些方法後,我們可以使用PyFlink Decorators進行標記,並描述輸入和輸出數據類型。我們還可以基於Python的類型提示功能進一步簡化更高版本,以進行類型派生。以下示例將幫助您更好地瞭解如何定義用戶定義的函數。

定義Python用戶定義函數的一種情況

在本例中,我們將兩個數字相加。首先,為此,導入必要的類,然後定義前面提到的函數。這非常簡單,因此讓我們進行一個實際案例。

PyFlink的未來前景如何?

通常,使用PyFlink進行業務開發很簡單。您可以通過SQL或Table API輕鬆描述業務邏輯,而無需瞭解基礎實現。讓我們看一下PyFlink的整體前景。

目標驅動路線圖

PyFlink的開發始終受到目標的推動,這些目標是使Flink功能可供Python用戶使用並將Python函數集成到Flink中。根據下麵顯示的PyFlink路線圖,我們首先在PyVM和JVM之間建立了通信。然後,在Flink 1.9中,我們提供了Python Table API,向Python用戶開放了現有的Flink Table API功能。在Flink 1.10中,我們準備通過以下操作將Python函數集成到Flink:集成Apache Beam,設置Python用戶定義的函數執行環境,管理Python對其他類庫的依賴關係以及為用戶定義用戶定義的函數API,以便支持Python用戶定義函數。

為了擴展分散式Python的功能,PyFlink提供了對Pandas SeriesDataFrame支持,以便用戶可以在PyFlink中直接使用Pandas用戶定義的函數。此外,將來會在SQL客戶端上啟用Python用戶定義函數,以使PyFlink易於使用。PyFlink還將提供Python ML管道API,以使Python用戶能夠在機器學習中使用PyFlink。監視Python用戶定義的函數執行對實際生產和業務至關重要。因此,PyFlink將進一步為Python用戶定義函數提供度量管理。這些功能將包含在Flink 1.11中。

但是,這些只是PyFlink未來發展計劃的一部分。還有更多工作要做,例如優化PyFlink的性能,提供圖形計算API以及為Flink上的Pandas支持Pandas的本機API。我們將繼續向Python用戶提供Flink的現有功能,並將Python的強大功能集成到Flink中,以實現擴展Python生態系統的最初目標。

PyFlink的前景如何?您可能知道,PyFlink是Apache Flink的一部分,它涉及運行時和API層。

PyFlink在這兩層將如何發展?在運行時方面,PyFlink將構建用於JVM和PyVM之間通信的gRPC常規服務(例如控制項,數據和狀態)。在此框架中,將抽象化Java Python用戶定義函數運算符,並構建Python執行容器以支持Python的多種執行方式。例如,PyFlink可以在Docker容器中甚至在外部服務集群中作為進程運行。特別是在外部服務群集中運行時,將以套接字的形式啟用無限擴展功能。這一切在後續的Python集成中都起著至關重要的作用。

在API方面,我們將在Flink中啟用基於Python的API,以實現我們的使命。這也依賴於Py4J VM通信框架。PyFlink將逐漸支持更多的API,包括Flink中的Java API(例如Python Table API,UDX,ML Pipeline,DataStream,CEP,Gelly和State API)以及在Python用戶中最受歡迎的Pandas API。基於這些API,PyFlink將繼續與其他生態系統集成以便於開發;例如Notebook,Zeppelin,Jupyter和Alink,這是阿裡巴巴的Flink開源版本。到目前為止,PyAlink已完全整合了PyFlink的功能。PyFlink也將與現有的AI系統平臺集成,例如著名的TensorFlow。

為此,PyFlink將一直保持活力。同樣,PyFlink的任務是使Flink功能可供Python用戶使用,併在Flink上運行Python分析和計算功能。

更多實時數據分析相關博文與科技資訊,歡迎關註 “實時流式計算”
關註 “實時流式計算” 回覆 “電子書” 獲取Flink 300頁實戰電子書


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 實驗一 安裝OpenShift1.1 前置準備[student@workstation ~]$ lab review-install setup1.2 配置規劃OpenShift集群有三個節點:master.lab.example.com:OpenShift master節點,是一個不可調度pod的 ...
  • 鍵盤俠Linux乾貨| ELK(Elasticsearch + Logstash + Kibana) 搭建教程 全網最簡單 ...
  • 常見註釋 -- 很少支持 #行內註釋 /**/段落註釋 基礎語法 SELECT 檢索數據 語法 作用 例子 釋義 select 查找列,並返回行 select prod_name from products;#可使用,分隔列名來查找多個列。 查找prod_name列,並返回其下的所有行,在produ ...
  • 索引首碼 使用 字元串列的索引規範中的語法,您可以創建僅使用列首字元的索引 。以這種方式僅索引列值的首碼可以使索引文件小得多。為a 或 column 編製索引時 , 必須為索引指定首碼長度。例如: col_name(N)NBLOBTEXT CREATE TABLE test (blob_col BL ...
  • 一、異常現象截圖 二、解決方式: 1、背景 早期的canal版本(<=1.0.24),在處理表結構的DDL變更時採用了一種簡單的策略,在記憶體里維護了一個當前資料庫內表結構的鏡像(通過desc table獲取)。 這樣的記憶體表結構鏡像的維護存在問題,如果當前在處理的binlog為歷史時間段T0,當前時 ...
  • 1 select DATE_FORMAT(dtl.transdate,'%Y-%m-%d') as transdate, 2 right(DATE_FORMAT(concat(transdate,transtime),'%Y-%m-%d %H:%i:%s'),8) as 3 transtime,dt ...
  • SELECT語句用於從表中選取/查詢數據,結果被存儲在一個結果表中(稱為結果集)。 ...
  • 本文更新於2020-06-14,使用MySQL 5.7,操作系統為Deepin 15.4。 算數運算符 運算符 語法 說明 + a + b 加法 - a - b 減法 * a * b 乘法 / a / b 除法。類似DIV,但DIV會對結果取整,/的結果可為小數 % a % b 取餘。類似MOD 除 ...
一周排行
    -Advertisement-
    Play Games
  • Timer是什麼 Timer 是一種用於創建定期粒度行為的機制。 與標準的 .NET System.Threading.Timer 類相似,Orleans 的 Timer 允許在一段時間後執行特定的操作,或者在特定的時間間隔內重覆執行操作。 它在分散式系統中具有重要作用,特別是在處理需要周期性執行的 ...
  • 前言 相信很多做WPF開發的小伙伴都遇到過表格類的需求,雖然現有的Grid控制項也能實現,但是使用起來的體驗感並不好,比如要實現一個Excel中的表格效果,估計你能想到的第一個方法就是套Border控制項,用這種方法你需要控制每個Border的邊框,並且在一堆Bordr中找到Grid.Row,Grid. ...
  • .NET C#程式啟動閃退,目錄導致的問題 這是第2次踩這個坑了,很小的編程細節,容易忽略,所以寫個博客,分享給大家。 1.第一次坑:是windows 系統把程式運行成服務,找不到配置文件,原因是以服務運行它的工作目錄是在C:\Windows\System32 2.本次坑:WPF桌面程式通過註冊表設 ...
  • 在分散式系統中,數據的持久化是至關重要的一環。 Orleans 7 引入了強大的持久化功能,使得在分散式環境下管理數據變得更加輕鬆和可靠。 本文將介紹什麼是 Orleans 7 的持久化,如何設置它以及相應的代碼示例。 什麼是 Orleans 7 的持久化? Orleans 7 的持久化是指將 Or ...
  • 前言 .NET Feature Management 是一個用於管理應用程式功能的庫,它可以幫助開發人員在應用程式中輕鬆地添加、移除和管理功能。使用 Feature Management,開發人員可以根據不同用戶、環境或其他條件來動態地控制應用程式中的功能。這使得開發人員可以更靈活地管理應用程式的功 ...
  • 在 WPF 應用程式中,拖放操作是實現用戶交互的重要組成部分。通過拖放操作,用戶可以輕鬆地將數據從一個位置移動到另一個位置,或者將控制項從一個容器移動到另一個容器。然而,WPF 中預設的拖放操作可能並不是那麼好用。為瞭解決這個問題,我們可以自定義一個 Panel 來實現更簡單的拖拽操作。 自定義 Pa ...
  • 在實際使用中,由於涉及到不同編程語言之間互相調用,導致C++ 中的OpenCV與C#中的OpenCvSharp 圖像數據在不同編程語言之間難以有效傳遞。在本文中我們將結合OpenCvSharp源碼實現原理,探究兩種數據之間的通信方式。 ...
  • 一、前言 這是一篇搭建許可權管理系統的系列文章。 隨著網路的發展,信息安全對應任何企業來說都越發的重要,而本系列文章將和大家一起一步一步搭建一個全新的許可權管理系統。 說明:由於搭建一個全新的項目過於繁瑣,所有作者將挑選核心代碼和核心思路進行分享。 二、技術選擇 三、開始設計 1、自主搭建vue前端和. ...
  • Csharper中的表達式樹 這節課來瞭解一下表示式樹是什麼? 在C#中,表達式樹是一種數據結構,它可以表示一些代碼塊,如Lambda表達式或查詢表達式。表達式樹使你能夠查看和操作數據,就像你可以查看和操作代碼一樣。它們通常用於創建動態查詢和解析表達式。 一、認識表達式樹 為什麼要這樣說?它和委托有 ...
  • 在使用Django等框架來操作MySQL時,實際上底層還是通過Python來操作的,首先需要安裝一個驅動程式,在Python3中,驅動程式有多種選擇,比如有pymysql以及mysqlclient等。使用pip命令安裝mysqlclient失敗應如何解決? 安裝的python版本說明 機器同時安裝了 ...