再有兩天就進入2018了,想想還是要準備一下明年的工作方向。回想當初開始學習函數式編程時的主要目的是想設計一套標準API給那些習慣了OOP方式開發商業應用軟體的程式員們,使他們能用一種接近傳統資料庫軟體編程的方式來實現多線程,並行運算,分散式的數據處理應用程式,前提是這種編程方式不需要對函數式編程語 ...
再有兩天就進入2018了,想想還是要準備一下明年的工作方向。回想當初開始學習函數式編程時的主要目的是想設計一套標準API給那些習慣了OOP方式開發商業應用軟體的程式員們,使他們能用一種接近傳統資料庫軟體編程的方式來實現多線程,並行運算,分散式的數據處理應用程式,前提是這種編程方式不需要對函數式編程語言、多線程軟體編程以及集群環境下的分散式軟體編程方式有很高的經驗要求。前面試著發佈了一個基於scalaz-stream-fs2的數據處理工具開源項目。該項目基本實現了多線程的資料庫數據並行處理,能充分利用域內伺服器的多核CPU環境以streaming,non-blocking方式提高數據處理效率。最近剛完成了對整個akka套裝(suite)的瞭解,感覺akka是一套理想的分散式編程工具:一是actor模式提供了多種多線程編程方式,再就是akka-cluster能輕鬆地實現集群式的分散式編程,而集群環境變化只需要調整配置文件,無需改變代碼。akka-stream是一套功能更加完整和強大的streaming工具庫,那麼如果以akka-stream為基礎,設計一套能在集群環境里進行分散式多線程並行數據處理的開源編程工具應該可以是2018的首要任務。同樣,用戶還是能夠按照他們熟悉的資料庫應用編程方式輕鬆實現分散式多線程並行數據處理程式的開發。
我把一般中小企業的IT系統分成兩大部分:一是實時的數據採集(輸入)部分,二是批量數據抽取、分析、處理部分。為了讓傳統中小型企業IT軟體編程人員能開發伺服器集群環境上數據平臺(如雲端數據平臺)運行的軟體系統,我打算通過這個DSP(Streaming-Data-Processor)項目來實現上面提到的第二部分。第一部分可以用CQRS(Command-Query-Responsibility-Separation)即讀寫分離架構和事件記錄(event-sourcing)模式來實現一種高效快速響應、安全穩定運行的數據採集體系。這部分我會在完成SDP項目後以akka-persistence為核心,通過akka-http,AMQP如RabitMQ等技術來實現。
按一般的scala和akka的編程方式編寫多線程分散式資料庫管理軟體時一是要按照akka代碼模式,使用scala編程語言的一些較深的語法;二是需要涉及非同步Async調用,集群Cluster節點任務部署及Streaming對外集成actor運算模式的細節,用戶需要具備一定的scala,akka使用經驗。再接下來就需要按業務流程把各業務環節分解成不依賴順序的功能模塊,然後把這些分拆出來的功能分派給集群中不同的節點上去運算處理。而對於SDP用戶來說,具備最基本的scala知識,無需瞭解akka、actor、threads、cluster,只要按照SDP自定義的業務處理流模式就可以編製多線程分散式數據處理程式了。下麵我就用一些文字及偽代碼來描述一下SDP的結構和功能:
總體來說SDP是由一或多個Stream組成的;每個Stream就代表一段程式。一段完整的程式Stream是由流元素源Source、處理節點Process-Node(Flow)及數據輸出終點Sink三個環節組成,下麵是一個典型的程式框架:
def load(qry: Query): PRG[R,M] = ???
def process1: PRG[R,M] = ???
def process2: PRG[R,M] = ???
def recursiveProcess(prg: PRG[R,M]): PRG[R,M] = ???
def results: PRG = ???
load(qryOrders).process1.process2.recursiveProcess(subprogram).results.run
從上面的示範中我們可以看到所有定義的函數都產生PRG[R,M]類型結果。其中R類型就是stream的元素,它流動貫穿了程式的所有環節。就像下水道網路運作原理一樣:污水由源頭Source流入終點Sink,在途中可能經過多個污水處理節點Node。每一個節點代表對管道中流淌污水處理的方式,包括分叉引流、並叉合流、添加化學物質、最後通過終點把處理過的水向外輸出。在PRG中流動的R類型可能是數據如資料庫表的一行,又或者是一條Sring類型的query如plain-sql,可以用JDBC來運行。cassandra的CQL也是String類型的。Slick,Quill,ScalikeJDBC和一些其它ORM的Query都可以產生plain-sql。
Source是一段程式的開始部分。一般來說Source是通過運算Query產生一串數據行或者人工構建而成。Source也可以並行運算Query產生,然後合併成一條無序的數據源,如下偽代碼的類型:
def load_par(qrys: Query*): PRG[R,M] = ???
Process-Node是SDP最重要的一個組成部分,因為大部分用戶定義的各種業務功能是在這裡運算的。用戶可以選擇對業務功能進行拆分然後分派給不同的線程或不同的集群節點進行多線程並行或分散式的運算。SDP應該為用戶程式提供多線程,並行式、分散式的運算函數。首先,運算用戶程式後應產生R類型結果而且,作為一種reactive軟體,必須保證完全消耗上一階段產生的所有R類型元素。下麵是一個用戶函數的款式:
type UserFunc = R => R
除了fire-and-run類型的運算函數,SDP還應該提供針對多線程或分散式程式的map-reduce式運算函數。初步想法是:無論返回結果與否,分派任務都是由persistence-actor來執行的,這樣能保證不會漏掉任何任務。如果整體任務需要在所有分派任務返回運算結果後再統一進行深度運算時akka的actor消息驅動模式是最適合不過的了。具體情況可以參考我前面關於cluster-sharding的博文。
Sink的主要作用實際上是保證完全消耗程式中產生的所有元素,這是reactive類型程式的必須要求。
好了,不知不覺還有幾個鐘頭就進入2017倒計時了。趕快湊合著在跨入2018之前把這篇發佈出去,剛好是今年的最後一篇博文。祝各位在新的一年中工作生活稱心如意!