用Python進行實時計算——PyFlink快速入門

来源:https://www.cnblogs.com/tree1123/archive/2020/06/24/13185670.html
-Advertisement-
Play Games

Flink 1.9.0及更高版本支持Python,也就是PyFlink。 在最新版本的Flink 1.10中,PyFlink支持Python用戶定義的函數,使您能夠在Table API和SQL中註冊和使用這些函數。但是,聽完所有這些後,您可能仍然想知道PyFlink的架構到底是什麼?作為PyFlin ...


Flink 1.9.0及更高版本支持Python,也就是PyFlink。

在最新版本的Flink 1.10中,PyFlink支持Python用戶定義的函數,使您能夠在Table API和SQL中註冊和使用這些函數。但是,聽完所有這些後,您可能仍然想知道PyFlink的架構到底是什麼?作為PyFlink的快速指南,本文將回答這些問題。

為什麼需要PyFlink?

Python上的Flink和Flink上的Python

那麼,PyFlink到底是什麼?顧名思義,PyFlink就是Apache Flink與Python的組合,或者說是Python上的Flink。但是Flink on Python是什麼意思?首先,兩者的結合意味著您可以在Python中使用Flink的所有功能。而且,更重要的是,PyFlink還允許您在Flink上使用Python廣泛的生態系統的計算功能,從而可以進一步促進其生態系統的開發。換句話說,這對雙方都是雙贏。如果您更深入地研究這個主題,您會發現Flink框架和Python語言的集成絕不是巧合。

Python和大數據生態系統

python語言與大數據緊密相連。為了理解這一點,我們可以看一下人們正在使用Python解決的一些實際問題。一項用戶調查顯示,大多數人都在使用Python進行數據分析和機器學習應用程式。對於此類情況,大數據空間中還解決了一些理想的解決方案。除了擴大大數據產品的受眾範圍之外,Python和大數據的集成還通過將其獨立體繫結構擴展到分散式體繫結構,極大地增強了Python生態系統的功能。這也解釋了在分析大量數據時對Python的強烈需求。

為什麼選擇Flink和Python?

Python和大數據的集成與其他最近的趨勢一致。但是,再次說明一下,為什麼Flink現在支持Python,而不是Go或R或另一種語言?而且,為什麼大多數用戶選擇PyFlink而不是PySpark和PyHive?

為了理解原因,讓我們首先考慮使用Flink框架的一些優勢:

  • 有利的體繫結構: Flink是具有統一流和批處理功能的純流計算引擎。
  • 新的活力:根據ASF的客觀統計,Flink是2019年最活躍的開源項目。
  • 高可靠性:作為一個開源項目,Flink經過長期測試,並廣泛應用於大數據公司的生產環境中。

接下來,讓我們看看為什麼Flink支持Python而不是其他語言。統計數據顯示,Python是繼Java和C之後最受歡迎的語言,並且自2018年以來一直在快速發展。Java和Scala是Flink的預設語言,但是Flink支持Python似乎是合理的。

PyFlink是相關技術發展的必然產物。但是,僅僅瞭解PyFlink的重要性是不夠的,因為我們的最終目標是使Flink和Python用戶受益並解決實際問題。因此,我們需要進一步探索如何實現PyFlink。

PyFlink架構

要實現PyFlink,我們需要知道要實現的關鍵目標和要解決的核心問題。PyFlink的主要目標是什麼?簡而言之,PyFlink的主要目標如下:

  1. 使所有Flink功能對Python用戶可用。
  2. 在Flink上運行Python的分析和計算功能,以提高Python解決大數據問題的能力。

在此基礎上,讓我們分析實現這些目標需要解決的關鍵問題。

使Flink功能可供Python用戶使用

要實現PyFlink,是否需要像現有Java引擎一樣在Flink上開發Python引擎?答案是NO。嘗試在Flink 1.8版或更早版本中進行,但效果不佳。基本設計原則是以最小的成本實現給定的目標。最簡單但最好的方法是提供一層Python API,並重用現有的計算引擎。

那麼,我們應該為Flink提供哪些Python API?他們對我們很熟悉:高級表API和SQL,以及有狀態的DataStream API。現在,我們越來越接近Flink的內部邏輯,下一步是提供適用於Python的Table API和DataStream API。但是,剩下要解決的關鍵問題到底是什麼呢?

關鍵問題

顯然,關鍵問題在於在Python虛擬機(PyVM)和Java虛擬機(JVM)之間建立握手,這對於Flink支持多種語言至關重要。要解決此問題,我們必須選擇適當的通信技術。

選擇虛擬機通信技術

當前,有兩種解決方案可用於實現PyVM和JVM之間的通信,它們是Beam和Py4J。前者是一個著名的項目,具有多語言和多引擎支持,而後者是用於PyVM和JVM之間通信的專用解決方案。我們可以從幾個不同的角度比較和對比Apache Beam和Py4J,以瞭解它們之間的區別。首先,考慮一個比喻:要越過一堵牆,Py4J會像痣一樣在其中挖一個洞,而Apache Beam會像大熊一樣把整堵牆推倒。從這個角度來看,使用Apache Beam來實現VM通信有點複雜。簡而言之,這是因為Apache Beam專註於通用性,在極端情況下缺乏靈活性。

除此之外,Flink還需要互動式編程。此外,為了使Flink正常工作,我們還需要確保其API設計中的語義一致性,尤其是在其多語言支持方面。Apache Beam的現有體繫結構無法滿足這些要求,因此答案很明顯,Py4J是支持PyVM和JVM之間通信的最佳選擇。

技術架構

在PyVM和JVM之間建立通信之後,我們已經實現了向Python用戶提供Flink功能的第一個目標。我們已經在Flink 1.9版中實現了這一點。現在,讓我們看一下Flink 1.9版中PyFlink API的體繫結構:

Flink 1.9版使用Py4J來實現虛擬機通信。我們為PyVM啟用了網關,為JVM啟用了網關伺服器以接收Python請求。此外,我們還提供了Python API中的TableENV和Table之類的對象,這些對象與Java API中提供的對象相同。因此,編寫Python API的本質是關於如何調用Java API。Flink 1.9版還解決了作業部署問題。它使您可以通過各種方式提交作業,例如運行Python命令以及使用Python Shell和CLI。

但是,此體繫結構提供了哪些優勢?首先,該體繫結構很簡單,並且可以確保Python API和Java API之間的語義一致性。其次,它還提供了與Java作業相當的出色Python作業處理性能。

在Flink上運行Python的分析和計算功能

上一節介紹瞭如何使Flink功能可供Python用戶使用。本節說明如何在Flink上運行Python函數。通常,我們可以通過以下兩種方式之一在Flink上運行Python函數:

  1. 選擇一個典型的Python類庫,並將其API添加到PyFlink。該方法花費很長時間,因為Python包含太多的類庫。在合併任何API之前,我們需要簡化Python執行。
  2. 基於現有的Flink Table API和Python類庫的特征,我們可以將所有現有的Python類庫函數視為用戶定義的函數,並將其集成到Flink中。Flink 1.10及更高版本中支持此功能。功能集成的關鍵問題是什麼?同樣,它取決於Python用戶定義函數的執行。

接下來,讓我們為這個關鍵問題選擇一種技術。

選擇執行用戶定義功能的技術

實際上,執行Python用戶定義的函數非常複雜。它不僅涉及虛擬機之間的通信,還涉及以下所有方面:管理Python執行環境,解析Java和Python之間交換的業務數據,將Flink中的狀態後端傳遞給Python以及監視執行狀態。鑒於所有這些複雜性,現在是Apache Beam發揮作用的時候了。作為支持多種引擎和多種語言的大熊,Apache Beam可以在解決這種情況方面做很多工作,所以讓我們看看Apache Beam如何處理執行Python用戶定義的函數。

下麵顯示了可移植性框架,該框架是Apache Beam的高度抽象的體繫結構,旨在支持多種語言和引擎。當前,Apache Beam支持幾種不同的語言,包括Java,Go和Python。

用戶定義的功能架構

UDF體繫結構不僅需要實現PyVM與JVM之間的通信,還需要在編譯和運行階段滿足不同的要求。在下麵的PyLink用戶定義功能架構圖中,JVM中的行為以綠色表示,而PyVM中的行為以藍色表示。讓我們看看編譯期間的局部設計。本地設計依賴於純API映射調用。Py4J用於VM通信。

現在,讓我們看看Python API和Java API在此架構中的工作方式。在Java方面,JobMaster將作業分配給TaskManager,就像處理普通Java作業一樣,並且TaskManager執行任務,這涉及到操作員在JVM和PyVM中的執行。在Python用戶定義的函數運算符中,我們將設計各種gRPC服務,用於JVM和PyVM之間的通信。例如,用於業務數據通信的DataService和用於Python UDF的StateService來調用Java State後端。還將提供許多其他服務,例如日誌記錄和指標。

我們如何使用PyFlink?

瞭解了PyFlink的體繫結構及其背後的思想之後,我們來看一下PyFlink的特定應用場景,以更好地瞭解其背後的方式和原因。

PyFlink的應用場景

PyFlink支持哪些業務方案?我們可以從兩個角度分析其應用場景:Python和Java。請記住,PyFlink也適用於Java可以應用的所有情況。

  1. 事件驅動的方案,例如實時數據監控。
  2. 數據分析,例如庫存管理和數據可視化。
  3. 數據管道,也稱為ETL方案,例如日誌解析。
  4. 機器學習,例如有針對性的建議。

您可以在所有這些情況下使用PyFlink。PyFlink也適用於特定於Python的方案,例如科學計算。在如此眾多的應用場景中,您可能想知道現在可以使用哪些特定的PyFlink API。因此,現在我們也來研究這個問題。

PyFlink安裝

在使用任何API之前,您需要安裝PyFlink。當前,要安裝PyFlink,請運行命令:pip install apache-Flink

PyFlink API與Java Table API完全一致,以支持各種關係和視窗操作。某些易於使用的PyFlink API比SQL API更為強大,例如特定於列操作的API。除了API,PyFlink還提供了多種定義Python UDF的方法。

PyFlink中用戶定義的函數定義

可以擴展ScalarFunction(例如,通過添加指標)以提供更多輔助功能。另外,PyFlink用戶功能函數支持Python支持的所有方法定義,例如lambda,命名函數和可調用函數。

定義完這些方法後,我們可以使用PyFlink Decorators進行標記,並描述輸入和輸出數據類型。我們還可以基於Python的類型提示功能進一步簡化更高版本,以進行類型派生。以下示例將幫助您更好地瞭解如何定義用戶定義的函數。

定義Python用戶定義函數的一種情況

在本例中,我們將兩個數字相加。首先,為此,導入必要的類,然後定義前面提到的函數。這非常簡單,因此讓我們進行一個實際案例。

PyFlink的未來前景如何?

通常,使用PyFlink進行業務開發很簡單。您可以通過SQL或Table API輕鬆描述業務邏輯,而無需瞭解基礎實現。讓我們看一下PyFlink的整體前景。

目標驅動路線圖

PyFlink的開發始終受到目標的推動,這些目標是使Flink功能可供Python用戶使用並將Python函數集成到Flink中。根據下麵顯示的PyFlink路線圖,我們首先在PyVM和JVM之間建立了通信。然後,在Flink 1.9中,我們提供了Python Table API,向Python用戶開放了現有的Flink Table API功能。在Flink 1.10中,我們準備通過以下操作將Python函數集成到Flink:集成Apache Beam,設置Python用戶定義的函數執行環境,管理Python對其他類庫的依賴關係以及為用戶定義用戶定義的函數API,以便支持Python用戶定義函數。

為了擴展分散式Python的功能,PyFlink提供了對Pandas SeriesDataFrame支持,以便用戶可以在PyFlink中直接使用Pandas用戶定義的函數。此外,將來會在SQL客戶端上啟用Python用戶定義函數,以使PyFlink易於使用。PyFlink還將提供Python ML管道API,以使Python用戶能夠在機器學習中使用PyFlink。監視Python用戶定義的函數執行對實際生產和業務至關重要。因此,PyFlink將進一步為Python用戶定義函數提供度量管理。這些功能將包含在Flink 1.11中。

但是,這些只是PyFlink未來發展計劃的一部分。還有更多工作要做,例如優化PyFlink的性能,提供圖形計算API以及為Flink上的Pandas支持Pandas的本機API。我們將繼續向Python用戶提供Flink的現有功能,並將Python的強大功能集成到Flink中,以實現擴展Python生態系統的最初目標。

PyFlink的前景如何?您可能知道,PyFlink是Apache Flink的一部分,它涉及運行時和API層。

PyFlink在這兩層將如何發展?在運行時方面,PyFlink將構建用於JVM和PyVM之間通信的gRPC常規服務(例如控制項,數據和狀態)。在此框架中,將抽象化Java Python用戶定義函數運算符,並構建Python執行容器以支持Python的多種執行方式。例如,PyFlink可以在Docker容器中甚至在外部服務集群中作為進程運行。特別是在外部服務群集中運行時,將以套接字的形式啟用無限擴展功能。這一切在後續的Python集成中都起著至關重要的作用。

在API方面,我們將在Flink中啟用基於Python的API,以實現我們的使命。這也依賴於Py4J VM通信框架。PyFlink將逐漸支持更多的API,包括Flink中的Java API(例如Python Table API,UDX,ML Pipeline,DataStream,CEP,Gelly和State API)以及在Python用戶中最受歡迎的Pandas API。基於這些API,PyFlink將繼續與其他生態系統集成以便於開發;例如Notebook,Zeppelin,Jupyter和Alink,這是阿裡巴巴的Flink開源版本。到目前為止,PyAlink已完全整合了PyFlink的功能。PyFlink也將與現有的AI系統平臺集成,例如著名的TensorFlow。

為此,PyFlink將一直保持活力。同樣,PyFlink的任務是使Flink功能可供Python用戶使用,併在Flink上運行Python分析和計算功能。

更多實時數據分析相關博文與科技資訊,歡迎關註 “實時流式計算”
關註 “實時流式計算” 回覆 “電子書” 獲取Flink 300頁實戰電子書


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 實驗一 安裝OpenShift1.1 前置準備[student@workstation ~]$ lab review-install setup1.2 配置規劃OpenShift集群有三個節點:master.lab.example.com:OpenShift master節點,是一個不可調度pod的 ...
  • 鍵盤俠Linux乾貨| ELK(Elasticsearch + Logstash + Kibana) 搭建教程 全網最簡單 ...
  • 常見註釋 -- 很少支持 #行內註釋 /**/段落註釋 基礎語法 SELECT 檢索數據 語法 作用 例子 釋義 select 查找列,並返回行 select prod_name from products;#可使用,分隔列名來查找多個列。 查找prod_name列,並返回其下的所有行,在produ ...
  • 索引首碼 使用 字元串列的索引規範中的語法,您可以創建僅使用列首字元的索引 。以這種方式僅索引列值的首碼可以使索引文件小得多。為a 或 column 編製索引時 , 必須為索引指定首碼長度。例如: col_name(N)NBLOBTEXT CREATE TABLE test (blob_col BL ...
  • 一、異常現象截圖 二、解決方式: 1、背景 早期的canal版本(<=1.0.24),在處理表結構的DDL變更時採用了一種簡單的策略,在記憶體里維護了一個當前資料庫內表結構的鏡像(通過desc table獲取)。 這樣的記憶體表結構鏡像的維護存在問題,如果當前在處理的binlog為歷史時間段T0,當前時 ...
  • 1 select DATE_FORMAT(dtl.transdate,'%Y-%m-%d') as transdate, 2 right(DATE_FORMAT(concat(transdate,transtime),'%Y-%m-%d %H:%i:%s'),8) as 3 transtime,dt ...
  • SELECT語句用於從表中選取/查詢數據,結果被存儲在一個結果表中(稱為結果集)。 ...
  • 本文更新於2020-06-14,使用MySQL 5.7,操作系統為Deepin 15.4。 算數運算符 運算符 語法 說明 + a + b 加法 - a - b 減法 * a * b 乘法 / a / b 除法。類似DIV,但DIV會對結果取整,/的結果可為小數 % a % b 取餘。類似MOD 除 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...