2.3 NiFi Processor應用介紹對於NiFi的使用者來說,如果想要創建一個高效的數據流,那麼就需要瞭解什麼樣的單元處理器才最適合這個數據流。NiFi擁有大量的可以用於各種業務場景的單元處理器可供使用者挑選和使用,這些單元處理器主要提供例如系統之間數據的傳輸,數據的路由,數據的轉換、處理、... ...
2.3 NiFi Processor應用介紹
對於NiFi的使用者來說,如果想要創建一個高效的數據流,那麼就需要瞭解什麼樣的單元處理器才最適合這個數據流。NiFi擁有大量的可以用於各種業務場景的單元處理器可供使用者挑選和使用,這些單元處理器主要提供例如系統之間數據的傳輸,數據的路由,數據的轉換、處理、分割和聚合等大類的功能。
在每個NiFi的版本發佈中都會有大量的新的處理器單元產生,這就導致本書中講重點介紹1.4.0版本及之前的常用處理器單元的功能,我們講根據這些常用的處理器單元的不同用處進行分類。
2.3.1 數據轉換類處理器單元
CompressContent
CompressContent處理器單元主要用途是對NiFi數據流的FlowFile的內容進行壓縮和解壓縮,支持的壓縮種類如圖

ConvertCharacterSet
ConvertCharacterSet處理器單元主要用途將NiFi數據流的FlowFile的內容從一種字元集轉換成另外一種字元集。配置例子如圖

EncryptContent
EncryptContent處理器單元主要用途將NiFi數據流的FlowFile的內容進行加密/解密傳輸。

ReplaceText
ReplaceText處理器單元主要用途是根據處理器屬性配置的正則表達式對FlowFile的內容進行匹配,如果匹配成功將會降匹配成功的欄位替換為配置屬性中的欄位。將FlowFile的內容全部替換為nifi的配置例子如圖

⁃
2.3.2 數據路由類和調製處理器單元
ControlRate
ControlRate處理器單元用來控制數據流部分流量的速率。

上面的圖中的例子表示1分鐘內只允許最多1000個FlowFile流過。
DetectDuplicate
DetectDuplicate處理器單元用來依據用戶定義的特征來監控和發現重覆的FlowFile。通常這個處理器會搭配HashContent單元處理器來完成功能。

上面的圖中的例子表示Processor根據輸入的FlowFile的hash.value屬性值作為去重條件對FlowFile進行匹配,將去重後的映射到non-duplicate的Relationship中,將重覆的FlowFile映射到duplicate的Relationship中。
MonitorActivity
MonitorActivity處理器單元可以在用戶定義的時段內如果沒有數據流量就是發送告警通知,也可以選擇附加功能,在數據流量恢復之後發送恢復通知。

上面的圖中例子標示Processor每1分鐘內沒有FlowFile輸入就會不間斷的發出Inactivity Message屬性的內容,且檢測範圍是本Node節點。
RouteOnAttribute
RouteOnAttribute處理器單元可以根據FlowFile的屬性制定路由規則來對FlowFile進行路由。

上面的圖中例子表示Processor根據輸入的FlowFile的value屬性進行路由,將含有hello的FlowFile路由到include hello text的Relationship中,將含有world的FlowFile路由到include world text的Relationship中。
ScanAttribute
ScanAttribute處理器單元用途是將FlowFile屬性中被用戶定義的屬性與用戶自定義的字典進行對比,看是否能夠匹配。

上面的圖中例子表示Processor輸入的FlowFile中的屬性值只要有一個包含了Sample.txt字典中任意一行的字元,那麼Processor就會將這個FlowFile路由到matched的Relationship中。
RouteOnContent
RouteOnContent處理器單元的功能近似於RouteOnAttribute,區別在於RouteOnContent處理器單元進行路由判定的內容是FlowFile的內容而不是之前RouteOnAttribute處理器單元所使用的屬性。

上面的圖中例子表示Processor根據輸入FlowFile的內容進行路由,如果輸入的FlowFile的內容為hello,那麼它將會被路由到hello relationship的relationship中。
ScanContent
ScanContent處理器單元同樣也近似於ScanAttribute,區別在於前者用戶選取的比對對象是內容而後者定義卻是屬性。

上面的圖中例子表示Processor根據輸入FlowFile內容進行掃描路由,如果FlowFile的內容為hello,那麼它將會被路由到matched的relationship中。
ValidateXml
ValidateXml處理器單元將FlowFile的XML內容和用戶的XML定義進行校驗,將符合XML定義的FlowFile進行路由。

上面的圖中表示Processor根據輸入XML的Schema文件對輸入的FlowFile內容進行校驗匹配,如果校驗合格的FlowFile會被映射到valid的relationship中。
上面的圖中表示Processor根據輸入的FlowFile中的HiveQL往Hive中寫入或者更新數據。
2.3.3 數據接入類處理器單元
ConvertJSONToSQL
ConvertJSONToSQL處理器單元可以將結構化的Json轉換成INSERT或者UPDATE這樣命令的SQL,配合PutSQL處理器單元可以直接根據這鞋命令將數據插入資料庫中。

上面的圖中表示Processor根據輸入的FlowFile的JSON內容,將JSON轉化成Update的SQL語句。
ExecuteSQL
ExecuteSQL處理器單元直接運行運行用戶配置的SQL查詢語句,並將查詢結果以Avro的格式寫入到FlowFile的內容中去。

上面的圖中表示Processor根據用戶配置SQL select query語句,從資料庫中查詢出結果,並將結果FlowFile映射到success的relationship中。
PutSQL
PutSQL處理器單元可以根據傳入的FlowFile內容中的DDM SQL對資料庫進行更新操作。

上面的圖中表示Processor根據輸入的FlowFile的SQL內容,每100個SQL作為一個事務提交資料庫,並將生成的Key返回且在事務提交失敗的情況下對事務進行回滾。
SelectHiveQL
SelectHiveQL處理器單元執行Hive的查詢語句HiveQL,並且將結果以Avro或者CSV的格式寫入到FlowFile中。

上面的圖中表示Processor根據HiveQL語句查詢Hive,並將結果以CSV格式輸出,CSV擁有Header為username和age。
PutHiveQL
PutHiveQL處理器根據傳入的HiveQL DDM語句對Hive數據倉庫的內容進行更新。

上面的圖中表示Processor根據輸入的FlowFile中的HiveQL往Hive中寫入或者更新數據。
2.3.4 屬性提取類處理器單元
EvaluateJsonPath
EvaluateJsonPath處理器單元根據用戶定義的JSONpath表達式對FlowFile的JSON內容進行解析,將這些表達式所解析出來的內容替換FlowFile的內容或者將其更新到FlowFile的屬性中,以便於後續的單元處理器的引用。

上面的圖中表示Processor將輸入內容為JSON格式的FlowFile例如{“name”:”zhangsan”,”phone”:”13734564321”},將其中的phone解析出來後輸出到FlowFile的內容中。
EvaluateXPath
EvaluateXPath處理器單元功能近似於EvaluateJsonPath,根據用戶提供的XPath表達式,將FlowFile的XML內容用表達式進行解析,將解析的結果替換如FlowFile的正文或者更新FlowFile的屬性。

上面的圖中表示Processor對輸入內容為XML格式的FlowFile利用配置XPath表達式進行解析,並將結果輸出到FlowFile的內容中。
EvaluateXQuery
EvaluateXQuery處理器單元根據用戶定義的XQuery,將FlowFile的XML正文與表達式進行進行虯枝,將提取的結果替換FlowFile的正文或者更新FlowFile的屬性。

上面的圖中表示Processor對輸入內容為XML格式的FlowFile利用配置的XQuery表達式進行解析,並將結果以XML格式輸入到FlowFile的內容中。
HashAttribute
HashAttribute處理器單元對用戶選擇的已有屬性列表的值拼接後的字元串進行Hash計算。

上面的圖中表示Processor對輸入的FlowFile中value屬性值進行hash計算,並將結果輸出到FlowFile的value屬性中。
HashContent
HashContent處理器單元對FlowFile的內容進行Hash,並將Hash值添加到FlowFile的屬性中。

上面的圖中表示Processor對輸入的FlowFile中的內容進行hash計算,並將結果輸出到FlowFile的hash.value屬性中。
IdentifyMimeType
IdentifyMimeType處理器單元對FlowFile的內容格式進行判定。此處理器能夠檢測許多不同的MIME類型,例如它能夠判定出FlowFile的內容是圖片,文本和壓縮文件等格式。
UpdateAttribute
UpdateAttribute處理器單元可以對FlowFile添加任意的用戶定義的屬性。這將有利於對FlowFile添加靜態的屬性,也可以根據NiFi表達式語言來動態的添加屬性。

上面的圖中表示Processor對輸入的FlowFile中屬性進行修改,添加一個鍵值為value值為helloworld的屬性。
2.3.5 系統交互類處理器單元
ExecuteProcess
ExecuteProcess處理器單元能夠運行用戶定義的操作系統命令,將處理完的標準輸出內容寫入flowfile中。該處理器是一個不需要輸入的源處理器,它會輸出產生一個新的FlowFile。如果需要提供輸入源請使用下麵介紹的executestreamcommand處理器單元。

上面的圖中表示Processor根據輸入的指令和參數執行命令ls –l /user,並將結果輸出到FlowFile中。
ExecuteStreamCommand
ExecuteStreamCommand處理器單元運行用戶定義的操作系統命令。輸入的FlowFile的內容作為命令的標準輸入。將處理完的標準輸出內容寫入FlowFile內容中。此處理器單元不同於ExecuteProcess,它必須有FlowFile的輸入才能正常完成功能。

上面的圖中表示Processor根據輸入的FlowFile中的path屬性值為/usr/cmd.sh命令腳本的運行參數來運行命令,並將結果輸出到FlowFile中。
2.3.6 數據提取類處理器單元
GetFile
GetFile處理器單元從本地磁碟獲取文件的內容到NiFi,並刪除原有的磁碟文件。這個處理器應用場景是將一個文件從一個地方搬移到另外一個地方而不是對文件的拷貝。

上面的圖中表示Processor將/user/sample.txt文件的內容輸出到FlowFile的內容中。
GetFTP
GetFTP處理器單元從FTP伺服器文件內容輸出到FlowFile中,並可以選擇刪除原有文件。同樣它的使用場景是文件的搬移而不是文件的拷貝。

上面的圖中表示Processor將ftpServer01上/resource路徑下的文件內容輸出到FlowFile中,並將源文件刪除。
GetSFTP
GetSFTP處理器單元從SFTP文件內容輸出到FlowFile中,並可以選擇刪除原有文件。同樣它的使用場景是文件的搬移而不是文件的拷貝。

上面的圖中表示Processor將sftpServer01上/resource路徑下的文件內容輸出到FlowFile中,並將源文件刪除。
GetJMSQueue
GetJMSQueue處理器單元從JMS隊列中下載消息,並通過JMS Message來創建FlowFile的內容,同時也可以指定創建FlowFile的屬性。

GetJMSTopic
GetJMSTopic處理器單元從JMS的Topic中下載消息,並根據JMS消息創建FlowFile的內容,通過選擇也能生成FlowFile的屬性。這個處理器單元支持長期和非長期的訂閱模式。
GetHTTP
GetHTTP處理器單元能夠根據URL通過HTTP或者HTTPS協議下載內容到NiFi,從而形成的新的FlowFile內容。同時處理器單元在下載的同時也記憶ETag和最新修改時間來防止數據的重覆下載問題。

上面的圖中表示Processor根據配置的URL進行http訪問,將訪問結果發送到FlowFile的內容中並且filename屬性值為配置的Filename的值。
ListenHTTP
ListenHTTP處理器單元啟動一個HTTP或者HTTPS監聽埠,當監聽到有POST請求過來的時候,會首先返回200狀態,並利用POST的請求內容形成新的FlowFile。

上面的圖中表示Processor監聽8811埠的HTTP POST請求,當有POST請求訪問http://localhost:8811/contentListener的時候,Processor就會首先返回200狀態,讓後將POST請求的參數輸出到新的FlowFile的內容中。
ListenUDP
ListenUDP處理器單元監聽UDP數據包,並根據配置獲取一定量的包來創建一個FlowFile並將FlowFile發射到success的Relationships關係中。

GetHDFS
GetHDFS處理器單元監控用戶定義的HDFS指定路徑的文件變化,當有新的文件寫入HDFS中的該路徑下,那麼文件的內容被用來創建新的FlowFile的內容,同時刪除原有的文件。這個處理器同前面一樣適用於文件的搬移場景而非文件的複製場景。

上面的圖中表示Processor將HDFS上/target路徑下的文件內容輸出到FlowFile中,並將源文件刪除。
2.3.7數據發送類處理器單元
PutEmail
PutEmail處理器單元主要功能是將FlowFile的內容以郵件的形式發送給配置的用戶郵箱,也可以通過配置選擇將FlowFile的內容以附件的方式發送出去。

PutFile
PutFile處理器主要功能是將FlowFile的內容以文件的形式寫入本地磁碟。

上面的圖中表示Processor將接收到的FlowFile的內容寫入到本地的磁碟文件中。(註意:1.5.0之前此Processor不支持追加寫入)
PutFTP
PutFTP處理器單元將FlowFile的內容拷貝到遠程的FTP伺服器上。

上面的圖中表示Processor將輸入的FlowFile的內容通過FTP協議寫入到ftpServer01的/upload路徑下且上傳路徑不存在的情況下自動創建路徑。
PutSFTP
PutSFTP處理器單元主要功能將FlowFile的內容拷貝到遠程的SFTP伺服器上。

上面的圖中表示Processor將輸入的FlowFile的內容通過SFTP協議寫入到sftpServer01的/upload路徑下且上傳路徑不存在的情況下自動創建路徑。
PutJMS
PutJMS處理器單元主要功能將FlowFile的內容座位JMS消息發送到JMS代理上,也可以通過配置根據FlowFile的屬性來添加JMS配置屬性。

PutSQL
PutSQL處理器單元的主要功能是將FlowFile的正文當作SQL DDL聲明。FlowFile必須是正確的符合SQL規範的SQL聲明。FlowFile的屬性被用作DDL SQL的參數,這樣可以有效的防止SQL註入攻擊。

上面的圖中表示Processor將輸入的FlowFile的內容按照100個進行batch操作寫入資料庫。
PutKafka
PutKafka處理器單元專門是針對0.8.x版本的Kafka,它將FlowFile的內容以消息的形式發送到Kafka消息隊列中。FlowFile的內容既可以作為一條完整的消息發送到Kafka,同時也可以通過分隔符將它切分為多個消息來發送到Kafka,例如換行符。

上面的圖中表示Processor從localhost安裝的Kafka的Sample_topic_A消費數據,並將數據輸出到FlowFile的內容中。
PutMongo
PutMongo處理器單元將FlowFile的內容插入或者更新到MongoDB中。

上面的圖中表示Processor根據輸入的FlowFile內容中的doc來寫入MongoDB。
2.3.8切分和聚合類處理器單元
SplitText
SplitText處理器單元可以將一個文本內容的FlowFile切分成你想要數量的FlowFile。

上面的圖中表示Processor將輸入的FlowFile的內容切分成多個FlowFile,每個FlowFile的內容都來自於FlowFile中的一行內容。
SplitJson
SplitJson處理器單元可以將一個JSON對象根據它的結構拆解成JSON內部的字對象。

上面的圖中表示Processor將輸入的FlowFile內容中的Json按照JsonPath表達式$.*進行第一級切分生成新的FlowFile。
SplitXml
SplitXml處理器單元可以將XML消息分解為多個FlowFile,且新的FlowFile中包含原有的分段信息。這種處理器單元經常適用於多個XML元素被封裝在一個元素中,而此處理器單元允許這些元素分離成各自單獨的XML元素。

上面的圖中表示Processor對於輸入的FlowFile內容中的XML按照第一層級進行切分,切分出來的子XML輸出到FlowFile中。
UnpackContent
UnpackContent處理器單元可以對壓縮格式的文件如ZIP和TAR進行解壓,且解壓後的文件作為一個FlowFile的內容輸出。

上圖中UnpackContent和IdentifyMimeType一起使用,後者輸出的FlowFile由前者來進行處理,UnpackContent根據輸入的FlowFile的mime.type屬性對FlowFile的內容進行解壓。
MergeContent
MergeContent處理器單元的主要功能是將多個FlowFile的內容合併成一個FlowFile。這些FlowFile的內容合併的同時,也可以通過配置對合併後的內容增加標題,頁腳和分隔符,也可以對合併後的內容置頂歸檔格式,比如ZIP和TAR。在FlowFile合併的過程中可以依據相同的屬性進行合併,也可以根據之前分片處理器分片後的序號來進行合併。用戶可以定義合併後FlowFile內容的最大值和最小值,當達到這個值的時候FlowFile就合併完畢。為了防止在FlowFile沒有達到配置的大小值的過程中時間太久,用戶也可以通過配置超時參數來有效的解決這個問題。

上圖中表示Processor將輸入的FlowFile的內容按照從Queue中任意消費的FlowFile的內容進行Merge輸出到新的FlowFile中,FlowFile的內容格式為TAR,選擇各個輸入FlowFile中一致的屬性寫入到新輸出的FlowFile中,對於不同的Metadata不進行Merge,輸出的新的FlowFile內容中同事也增加了頁頭和頁腳。
SegmentContent
SegmentContent處理器單元可以根據配置切分後的FlowFile大小將一個大的FlowFile切分成許多小的FlowFile。分片是基於位元組的偏移量而不是分隔符。這種將大的FlowFile以分片的形式進行傳輸可以有效的減少大文件傳輸過程中的延時問題。當這些分片傳輸到達目標端的時候,可以通過其它的處理器單元重新進行組裝,例如上面所說的MergeContent處理器單元。

上面的途中表示Processor把輸入的FlowFile的內容按照1MB的大小進行切分,切分成新的FlowFile且新的FlowFile中寫入了分片的序號segment.index和數量segment.count屬性。
SplitContent
SplitContent處理器單元的功能近似於SegmentContent將一個FlowFile分解成多個FlowFile。但區別在於SplitContent在進行分解的過程中不是按照設定的位元組大小,而是根據分隔符進行分裂。

上面的圖中表示Processor對輸入的FlowFile的內容按照豎線 | 符號進行切分,切分成多個FlowFile。
2.3.9 HTTP協議類處理器單元
GetHTTP
GetHTTP處理器單元對配置的http或者https協議的URL發起請求並將返回結果輸出到新的FlowFile中。而且GetHTTP會記錄Etag和最新數據修改時間避免不停的訪問給服務端產生不必要的開銷。如下圖

ListenHTTP
ListenHTTP處理器單元監聽Http或者Https請求,如果有請求先返回200然後將POST的請求參數輸出到新的FlowFile中。

上面的圖中表示Processor監聽locahost的http請求,請求URL為http://localhost:9080/contentListener
InvokeHTTP
InvokeHTTP處理器單元能夠根據用戶的配置發送HTTP協議請求。InvokeHTTP處理器單元通過更多的配置可以完成比GetHTTP和PostHTTP更多的功能。如下圖

PostHTTP
PostHTTP處理器單元將FlowFile的內容作為HTTP POST請求的body消息。它通常與ListenHTTP處理器單元組合使用,應用於當多個NiFi實例之間不能通過Site-to-Site的方式進行數據交換的場景。如下圖

HandleHttpRequest / HandleHttpResponse
HandleHttpRequest處理器單元可以作為一個源處理器單元來啟動一個HTTP監聽服務功能,類似於ListenHTTP。但是這個處理器不響應客戶端,它將請求的參數以FlowFile的內容和屬性的方式,響數據流的下游進行傳遞。HandleHttpResponse處理器單元能夠響應並將處理後的FlowFile結果返回請求的客戶端。這兩個處理器通常都是在一起被使用的。