SDP(0):Streaming-Data-Processor - Data Processing with Akka-Stream

来源:https://www.cnblogs.com/tiger-xc/archive/2017/12/31/8159049.html
-Advertisement-
Play Games

再有兩天就進入2018了,想想還是要準備一下明年的工作方向。回想當初開始學習函數式編程時的主要目的是想設計一套標準API給那些習慣了OOP方式開發商業應用軟體的程式員們,使他們能用一種接近傳統資料庫軟體編程的方式來實現多線程,並行運算,分散式的數據處理應用程式,前提是這種編程方式不需要對函數式編程語 ...


   再有兩天就進入2018了,想想還是要準備一下明年的工作方向。回想當初開始學習函數式編程時的主要目的是想設計一套標準API給那些習慣了OOP方式開發商業應用軟體的程式員們,使他們能用一種接近傳統資料庫軟體編程的方式來實現多線程,並行運算,分散式的數據處理應用程式,前提是這種編程方式不需要對函數式編程語言、多線程軟體編程以及集群環境下的分散式軟體編程方式有很高的經驗要求。前面試著發佈了一個基於scalaz-stream-fs2的數據處理工具開源項目。該項目基本實現了多線程的資料庫數據並行處理,能充分利用域內伺服器的多核CPU環境以streaming,non-blocking方式提高數據處理效率。最近剛完成了對整個akka套裝(suite)的瞭解,感覺akka是一套理想的分散式編程工具:一是actor模式提供了多種多線程編程方式,再就是akka-cluster能輕鬆地實現集群式的分散式編程,而集群環境變化只需要調整配置文件,無需改變代碼。akka-stream是一套功能更加完整和強大的streaming工具庫,那麼如果以akka-stream為基礎,設計一套能在集群環境里進行分散式多線程並行數據處理的開源編程工具應該可以是2018的首要任務。同樣,用戶還是能夠按照他們熟悉的資料庫應用編程方式輕鬆實現分散式多線程並行數據處理程式的開發。

   我把一般中小企業的IT系統分成兩大部分:一是實時的數據採集(輸入)部分,二是批量數據抽取、分析、處理部分。為了讓傳統中小型企業IT軟體編程人員能開發伺服器集群環境上數據平臺(如雲端數據平臺)運行的軟體系統,我打算通過這個DSP(Streaming-Data-Processor)項目來實現上面提到的第二部分。第一部分可以用CQRS(Command-Query-Responsibility-Separation)即讀寫分離架構和事件記錄(event-sourcing)模式來實現一種高效快速響應、安全穩定運行的數據採集體系。這部分我會在完成SDP項目後以akka-persistence為核心,通過akka-http,AMQP如RabitMQ等技術來實現。

  按一般的scala和akka的編程方式編寫多線程分散式資料庫管理軟體時一是要按照akka代碼模式,使用scala編程語言的一些較深的語法;二是需要涉及非同步Async調用,集群Cluster節點任務部署及Streaming對外集成actor運算模式的細節,用戶需要具備一定的scala,akka使用經驗。再接下來就需要按業務流程把各業務環節分解成不依賴順序的功能模塊,然後把這些分拆出來的功能分派給集群中不同的節點上去運算處理。而對於SDP用戶來說,具備最基本的scala知識,無需瞭解akka、actor、threads、cluster,只要按照SDP自定義的業務處理流模式就可以編製多線程分散式數據處理程式了。下麵我就用一些文字及偽代碼來描述一下SDP的結構和功能:

總體來說SDP是由一或多個Stream組成的;每個Stream就代表一段程式。一段完整的程式Stream是由流元素源Source、處理節點Process-Node(Flow)及數據輸出終點Sink三個環節組成,下麵是一個典型的程式框架:

  def load(qry: Query): PRG[R,M] = ???
  def process1: PRG[R,M] = ???
  def process2: PRG[R,M] = ???
  def recursiveProcess(prg: PRG[R,M]): PRG[R,M] = ???
  def results: PRG = ???
  
  load(qryOrders).process1.process2.recursiveProcess(subprogram).results.run

從上面的示範中我們可以看到所有定義的函數都產生PRG[R,M]類型結果。其中R類型就是stream的元素,它流動貫穿了程式的所有環節。就像下水道網路運作原理一樣:污水由源頭Source流入終點Sink,在途中可能經過多個污水處理節點Node。每一個節點代表對管道中流淌污水處理的方式,包括分叉引流、並叉合流、添加化學物質、最後通過終點把處理過的水向外輸出。在PRG中流動的R類型可能是數據如資料庫表的一行,又或者是一條Sring類型的query如plain-sql,可以用JDBC來運行。cassandra的CQL也是String類型的。Slick,Quill,ScalikeJDBC和一些其它ORM的Query都可以產生plain-sql。

Source是一段程式的開始部分。一般來說Source是通過運算Query產生一串數據行或者人工構建而成。Source也可以並行運算Query產生,然後合併成一條無序的數據源,如下偽代碼的類型:

  def load_par(qrys: Query*): PRG[R,M] = ???

Process-Node是SDP最重要的一個組成部分,因為大部分用戶定義的各種業務功能是在這裡運算的。用戶可以選擇對業務功能進行拆分然後分派給不同的線程或不同的集群節點進行多線程並行或分散式的運算。SDP應該為用戶程式提供多線程,並行式、分散式的運算函數。首先,運算用戶程式後應產生R類型結果而且,作為一種reactive軟體,必須保證完全消耗上一階段產生的所有R類型元素。下麵是一個用戶函數的款式:

  type UserFunc = R => R 

除了fire-and-run類型的運算函數,SDP還應該提供針對多線程或分散式程式的map-reduce式運算函數。初步想法是:無論返回結果與否,分派任務都是由persistence-actor來執行的,這樣能保證不會漏掉任何任務。如果整體任務需要在所有分派任務返回運算結果後再統一進行深度運算時akka的actor消息驅動模式是最適合不過的了。具體情況可以參考我前面關於cluster-sharding的博文。

Sink的主要作用實際上是保證完全消耗程式中產生的所有元素,這是reactive類型程式的必須要求。

好了,不知不覺還有幾個鐘頭就進入2017倒計時了。趕快湊合著在跨入2018之前把這篇發佈出去,剛好是今年的最後一篇博文。祝各位在新的一年中工作生活稱心如意!

 

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.Java I/O 概述 2.Java I/O File類 3.Java I/O 獲取文件目錄並寫入到文本 4.Java I/O 輸入與輸出 5.Java I/O 複製文本文件 6.Java I/O 添加屬性和有用的介面 7.Java I/O Reader & Writer(字元流) 8.Java ...
  • 位元組流、字元流涉及的類比較多,比較容易混淆。因此,有必要針對何時使用位元組流、何時使用字元流、何時使用Buffer類的流做一個歸納。要歸納它們,無需過多的語言,只需抓住它們的重點和特性即可。 在決定何時使用何種類時,以下幾個問題需要考慮清楚。 數據源:表示輸入,或稱為讀。可提供使用的兩個父類為Inpu ...
  • 一、原理 Spring MVC基於模型-視圖-控制器(Model-View-Controller,MVC)模式實現,它能夠幫你構建像Spring框架那樣靈活和松耦合的Web應用程式,將請求處理的邏輯和視圖中的渲染實現解耦。 1、DispatcherServlet是Spring MVC的核心 。Spr ...
  • 本人一直在走.NET技術路線,考慮到後期公司搞JAVA項目,也算是進行技術災備,開始對JAVA技術進行關註。萬事開頭難,也是上來一頭包。沒辦法,頂著上吧。上面開始分給我任務了。就是對後期項目報表進行方案選型。哥們兒花了兩周的時間總算是提供了幾個方案,以供相關人員選擇。特將此次過程整理如下: 一、萬事 ...
  • 一、Spring MVC 驗證 JSR 303 是ajvaEE6 中的一項子規範 ,叫 Bean Validation 用於對javaBean中的欄位進行校驗。 官方的參考實現是: Hibernate Validator ,此實現和 Hibernate ORM 沒有任何關係 //http://hib ...
  • ##importlogginglogging.debug('debug message')logging.info('info message')logging.warning('warning message') # WARNING:root:warning messagelogging.erro ...
  • 註意迭代器和可迭代對象不同#迭代器:1、有iter方法,2、有next方法li=[1,2,3,4,5]d=iter(li) # 等於li.__iter__()print(d) # <list_iteratorobjectat0x00000174316CC3C8>可以通過next方法取出元素。for循 ...
  • 要讀取鍵盤輸入的數據,需要使用輸入流,可以是位元組輸入流,也可以是位元組輸入流轉換後的字元輸入流。 關於鍵盤輸入,有幾點註意的是:(1).鍵盤輸入流為System.in,其返回的是InputStream類型,即位元組流。(2).位元組流讀取鍵盤的輸入時,需要考慮回車符(\r:13)、換行符(\n:10)。( ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...