為什麼不呢?我們有了RPC/RMI和MAP,為什麼不能在windows環境下處理大數據呢?windows是迄今為止最普及的操作系統,據市調公司NetMarketShare最新(2019年5月)統計數據,在桌面操作系統方面,目前Windows 10的市場占有率已達45.73%。而Windows 7的市 ...
為什麼不呢?我們有了RPC/RMI和MAP,為什麼不能在windows環境下處理大數據呢?windows是迄今為止最普及的操作系統,據市調公司NetMarketShare最新(2019年5月)統計數據,在桌面操作系統方面,目前Windows 10的市場占有率已達45.73%。而Windows 7的市場占有率為35.44%。排在第三位的是Windows 8.1,市場份額為3.97%。這三個版本的windows市場占有率之和為:85.14%。可以說windows占據了絕大多數用戶的心。這與windows界面友好統一、易操作、功能豐富、軟硬體相容性都高密不可分。我們或絕大多數人(目前非公司)在Windows上產生的數據要遠遠多於在Linux上產生的數據,如果能直接在Windows處理這些數據,那就太方便太好了。
既然Windows這麼普及,又這麼方便實用,而且個人在Windows上產生的數據又那麼多(如果在Windows上能直接處理大數據的話,以後Windows伺服器的數量肯定會超過Linux伺服器),那為什麼從大數據處理技術誕生之日起都是基於Linux的,從最早的hadoop到國人主導的kylin再到較新的flink,莫不如是?這要從Linux和Windows的文件系統和目錄結構說起,Linux操作系統中有一個重要的概念:一切皆文件。Linux將獨立的文件系統組合成了一個層次化的樹形結構,並且由一個統一的、虛擬的根目錄代表整個文件系統。Linux將新的文件系統通過一個稱為“掛裝”或“掛上”的操作將其掛裝到根目錄的某個子目錄上,從而讓不同的文件系統結合成為一個“整體”。這個“整體”即Linux文件系統,實際上是每個實際文件系統從操作系統和系統服務中分離出來,通過一個介面層:虛擬文件系統或VFS來通訊。虛擬文件系統既沒有文件,也不直接管理文件,它只是用戶與實際文件系統之間的介面。我們操作的Linux目錄或文件被虛擬文件系統映射到真實的磁碟存儲區域。VFS使得Linux可以支持多個不同的文件系統,每個表示一個VFS的通用介面。Linux支持的文件系統包括: FAT、VFAT、FAT32、MINIX 等不同類型的文件系統,但ExtN才是Linux的“原配”文件系統。由於軟體將Linux文件系統的所有細節進行了轉換,所以Linux核心的其它部分及系統中運行的程式將看到統一的文件系統。Linux的虛擬文件系統允許用戶同時能透明地安裝許多不同的文件系統,而不需要具體地加以區別,即用戶和進程不需要知道文件所在的文件系統類型,而只需要像使用ExtN文件系統中的文件一樣使用它們。這一點正是Windows所不具備的。
Windows採用地是多根目錄文件系統,即一棵獨立或多棵併列(根據磁碟分區的多少,一個分區對應一棵目錄樹)的樹形結構;而Linux採用地是統一根目錄結構,只有一棵巨大的樹結構。如果用Windows作為伺服器來處理大數據,每台伺服器的磁碟分區數根據自身磁碟容量、使用者分區習慣大概率不相同,導致盤符參差不齊。數據就有可能在這台伺服器存儲在D盤,在另一臺存儲在E盤等等,很難統一管理。而Linux就不存在這樣的問題,因為它的文件系統利用VFS屏蔽了分區細節,可以在每台Linux伺服器上根目錄下或根目錄的某個子目錄下建立一致的數據存儲目錄。但我們的命題就是要在Windows上處理大數據,如何解決Windows數據統一存儲管理問題呢?暫時想到兩個辦法:一是約定俗成,約定同一個集群的Windows伺服器分區數相同,盤符相同,或至少都有某一個分區,數據存放在該分區;二是由客戶端向集群提交任務時,附帶提交一個數據存儲地址/目錄表,表裡記錄得是待處理分析的數據具體在某台伺服器或節點上的某個具體位置,比如說:某節點:port/某分區/某目錄/某文件.xxx(如果整個目錄都是,就不需要具體到文件),這樣就可以將數據存儲到具體伺服器不同的盤符下。第二種方法靈活,易於在現實中的機器上搭建集群,但麻煩,需要占用額外的存儲,可以用Map等集合類對象存儲,至於具體怎樣實現,等到“構想”完集群、模擬提交任務時再說。
大數據,故名思義,數據量很大,單結點(主機)的存儲容量有限,需要多結點、分散式地存儲。而且任一個節點的數據量也往往遠遠大於要分析處理數據的應用代碼量,所以計算向數據移動,可以大大減少IO(包括磁碟IO和網路IO),從時間上和空間上降低成本。無論是將代碼發往數據節點,還是shuffle階段從map端向reduce端拉取數據,都需要數據遷移。而數據遷移就要用到RPC(Remote Procedure Call)——遠程過程調用,在百度百科中是這樣解釋的:它是一種通過網路從遠程電腦程式上請求服務,而不需要瞭解底層網路技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通信程式之間攜帶信息數據。在OSI網路通信模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網路分散式多程式在內的應用程式更加容易。RPC採用客戶機/伺服器模式。請求程式就是一個客戶機,而服務提供程式就是一個伺服器。首先,客戶機調用進程發送一個有進程參數的調用信息到服務進程,然後等待應答信息。在伺服器端,進程保持睡眠狀態直到調用信息到達為止。當一個調用信息到達,伺服器獲得進程參數,計算結果,發送答覆信息,然後等待下一個調用信息,最後,客戶端調用進程接收答覆信息,獲得進程結果,然後調用執行繼續進行。有多種 RPC模式和執行。最初由 Sun 公司提出。IETF ONC 憲章重新修訂了 Sun 版本,使得 ONC RPC 協議成為 IETF 標準協議。現在使用最普遍的模式和執行是開放式軟體基礎的分散式計算環境(DCE)。DCE也是大數據框架執行的基礎。在Java中叫RMI。無論hadoop還是spark底層都是先搭建一個RPC框架,各個角色對象在此基礎上進行通信和數據傳輸。
有了RPC/RMI,我們就可以在Windows伺服器間傳輸各種數據包括代碼數據和待分析處理數據了。當然還有很多問題需要解決,等遇到時再說。我們先從最簡單的說起,RPC/RMI可以在Windows下輕而易舉地編程、實現、應用。舉例如下,wordCount在本地是這樣實現的:
public class StringValueMemStore implements Serializable {
/**
* xxxxx
*/
private static final long serialVersionUID = -4505251514307025804L;
......(省略)
/**
* 重點是下麵這個方法
*/
public static Map<String, Integer> wordCount(String filePath, String separator) {
Map<String, Integer> map = new HashMap<>();
int one = 1;
int val;
Path path = Paths.get(filePath);
if (Files.notExists(path)) {
System.err.println(path + " dose not exist.");
return null;
}
if (Files.isDirectory(path)) {
System.err.println(path + " is not a file.");
return null;
}
Reader fin = null;
BufferedReader in = null;
try {
fin = Files.newBufferedReader(path, Charset.forName("UTF-8"));
in = new BufferedReader(fin);
String line;
while((line = in.readLine()) != null) {
for (String key : line.split(separator)) {
if(map.get(key) == null) {
map.put(key, one);
} else {
val = map.get(key) + 1;
map2.put(key, val);
}
}
}
} catch (IOException e) {
e.printStackTrace();
return null;
} finally {
try {
if(fin != null) {
fin.close();
}
if(in != null) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return map;
}
}
調用如下:
Map<String, Integer> map = StringValueMemStore.wordCount("./data/words.txt",
" ");
for(String k : map.keySet()) {
System.out.println(k + "\t" + map.get(k));
}
加上RMI實現最簡分散式是這樣實現的:
public interface WCService extends Remote {
Map<String, Integer> wordCount(String filePath, String separator) throws RemoteException;
}
public class WCServiceImpl extends UnicastRemoteObject implements WCService {
/**
*
*/
private static final long serialVersionUID = 4478936029983919271L;
protected WCServiceImpl() throws RemoteException {
}
@Override
public Map<String, Integer> wordCount(String filePath, String separator) throws RemoteException {
Map<String, Integer> map = new HashMap<>();
int val;
Path path = Paths.get(filePath);
if (Files.notExists(path)) {
System.err.println(path + " dose not exist.");
return null;
}
if (Files.isDirectory(path)) {
System.err.println(path + " is not a file.");
return null;
}
Reader fin = null;
BufferedReader in = null;
try {
fin = Files.newBufferedReader(path, Charset.forName("UTF-8"));
in = new BufferedReader(fin);
String line;
while((line = in.readLine()) != null) {
for (String key : line.split(separator)) {
if(map.get(key) == null) {
map.put(key, Constants.ONE);
} else {
val = map.get(key) + 1;
map.put(key, val);
}
}
}
} catch (IOException e) {
e.printStackTrace();
return null;
} finally {
try {
if(fin != null) {
fin.close();
}
if(in != null) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return map;
}
}
public class WCServer {
public static void main(String[] args) throws RemoteException, MalformedURLException {
// 註冊埠
LocateRegistry.createRegistry(Constants.PORT);
/*
Constants.WCCount_RMI = "rmi://某節點(這裡同伺服器節點):" + PORT + "/net.xxx.xxx.wordCount.xxx.WCServiceImpl";
*/
// 將服務對象綁定url
Naming.rebind(Constants.WCCount_RMI, new WCServiceImpl());
}
}
public class WCClient {
public static void main(String[] args) throws MalformedURLException, RemoteException, NotBoundException {
// 在註冊的服務中根據url尋找服務
WCService wcService = (WCService) Naming.lookup(Constants.WCCount_RMI);
Map<String, Integer> wordCount = wcService.wordCount("X:/xxx/words.txt", " ");
for(String k : wordCount.keySet()) {
System.out.println(k + "\t" + wordCount.get(k));
}
}
}
當然這裡為了簡單起見,我們把代碼寫死了,在實際的大數據處理框架中處理邏輯是由用戶完成的。我們只需要提供介面,而且第一步先進行數據映射,即map階段,第二步才是聚合統計reduce/aggregate,中間還有shuffle,最開始還有split。這裡只是為了說明RMI在Windows中可以輕易、方便地使用。有了RMI,我們一開始的數據存儲和處理邏輯就可以分佈在分散式環境中了;有了RMI,我們的數據和處理邏輯就可以“自由”地在節點間“旅行”了。這構成了大數據處理框架的基礎。千里之行,始於足下。借鑒hadoop和spark,我們應該先搭建RPC環境。RMI使用起來太麻煩,而且功能有限,現實中的大數據框架從未真正使用過RMI,而是使用對它的封裝和改良,比如說Netty。下一節我們介紹Netty,並逐漸用Netty搭建起RPC環境。學識有限,描述錯誤、不周的地方,還望各位技術大佬批評指正。這一節主要論述了在windows環境下處理大數據的可能性。