根據上一篇博客我們知道,JDFS的服務端主程式在epoll裡面先recv客戶端的數據,然後解析頭部,根據請求類型,把作業交給線程池來執行。對於查詢、下載部分的功能這是沒有問題的,因為查詢、下載部分客服端只是發送一個頭部過來,服務端接收後解析的過程不會太占用多少時間。而如果是上傳功能的話,服務端rec... ...
一 前言
本文是《JDFS:一款分散式文件管理實用程式》系列博客的第二篇,在上一篇博客中,筆者向讀者展示了JDFS的核心功能部分,包括:服務端與客戶端部分的上傳、下載功能的實現,epoll的運用,線程池的運用等。當然目前JDFS還僅僅支持上傳、下載功能,還不具備分散式文件管理的功能,這些都會在後續的開發過程中加進來。在寫博客的過程中,筆者發現最好是每完成一個小的功能就及時用博客記錄下來,如果等功能全部實現完成後再寫博客的話,一方面由於功能點比較多,博客寫起來會比較費力;另一方面由於時間間隔太長,有些有價值的細節恐怕會忘記。所以最好是增量式寫博客,每實現一個關鍵點的功能,及時用博客記錄下來。本文是在該系列博客第一篇的基礎上做了部分更新升級以及解決一些小bug.當然主要針對的是上傳部分的功能。如果讀者對這篇博客的背景不是太瞭解的話,請先移步筆者的上一篇博客:點擊我 。
根據上一篇博客我們知道,JDFS的服務端主程式在epoll裡面先recv客戶端的數據,然後解析頭部,根據請求類型,把作業交給線程池來執行。對於查詢、下載部分的功能這是沒有問題的,因為查詢、下載部分客服端只是發送一個頭部過來,服務端接收後解析的過程不會太占用多少時間。而如果是上傳功能的話,服務端recv到的數據不僅包含頭部而且包含客服端期望上傳的文件實體的數據,而筆者的本意是讓線程池來接收數據的,所以這個代碼的實現與筆者的期望是矛盾的。本文首先就會對這一點進行更新改進,使得查詢、上傳、下載都可以並行的被線程池來執行。
另外在上一篇博客中,上傳部分的功能代碼比較粗糙,這次也進行了一些更新改進。在筆者測試上傳功能的時候,發現了一些偶爾出現而且不容易重現的bug,而下載功能目前為止在筆者的測試過程當中還沒有遇到過bug。所以從代碼實現以及測試的過程來看,上傳部分的功能要比下載部分複雜、更難調試。具體代碼實現請移步筆者的github鏈接:點擊我。
PS: 本篇博客是博客園用戶“cs小學生”的原創作品,轉載請註明原作者和原文鏈接,謝謝。
二 上傳功能演示
在上一篇博客中,筆者展示了上傳功能的截圖,但那個只有一個客戶端在向服務端上傳文件,在這裡再補充一個同時有兩個客服端向服務端上傳文件的截圖。
在此次的上傳中(使用shell腳本來提交),兩個客服端分別同時向服務端上傳不同的文件:CRLS-en.pdf和CRLS-e.pdf. 圖左半邊是服務端列印的一些信息,我們可以清晰的看到服務端交叉的接收CRLS-e.pdf和CRLS-en.pdf。圖右半邊是客服端列印的一些消息,我們也可以清晰的看到客服端也是交叉上傳兩個文件。服務端最後三次接收是:CRLS-e.pdf、CRLS-en.pdf、CRLS-en.pdf,客服端最後三次上傳的是CRLS-en.pdf、CRLS-e.pdf、CRLS-en.pdf,可見客服端上傳和服務端接收的文件的次序並不是一致的。但是從圖中我們也可以很容易的發現:對於同一個文件,服務端接收的次序和客服端發送的次序是一致的。
下圖是服務端接收完成後的截圖:
三 改進
1. 修改服務端epoll框架,使得上傳也能並行化
1 for(int i=0;i<num_of_events_to_happen;i++){ 2 struct sockaddr_in client_socket; 3 int client_socket_len=sizeof(client_socket); 4 if(*server_listen_fd==event_for_epoll_wait[i].data.fd){ 5 int client_socket_fd=accept(*server_listen_fd,(struct sockaddr *)&client_socket,&client_socket_len); 6 if(client_socket_fd==-1){ 7 perror("Http_server_body,accept"); 8 continue; 9 } 10 11 event_for_epoll_ctl.data.fd=client_socket_fd; 12 event_for_epoll_ctl.events=EPOLLIN; 13 14 epoll_ctl(epoll_fd,EPOLL_CTL_ADD,client_socket_fd,&event_for_epoll_ctl); 15 16 }else if(event_for_epoll_wait[i].events & EPOLLIN){ 17 18 int client_socket_fd=event_for_epoll_wait[i].data.fd; 19 if(client_socket_fd<0){ 20 continue; 21 } 22 23 callback_arg *cb_arg=(callback_arg *)malloc(sizeof(callback_arg)); 24 cb_arg->socket_fd=client_socket_fd; 25 threadpool_add_jobs_to_taskqueue(pool, Http_server_callback, (void *)cb_arg); 26 27 //epoll delete client_fd 28 29 } 30 }
如上述代碼是服務端主程式使用epoll不斷接收客服端連接、發送數據的主要邏輯部分。在第16行,原來的代碼是先接收,然後解析頭部,根據具體的請求再將之打包成作業加入到作業隊列,然後線程池的線程就會來執行之。現在不這樣做了,只要服務端epoll監聽到讀請求,就把該讀請求打包成作業交給線程池來處理。線程相關函數會負責從傳入的客戶端連接的socket fd上讀數據,然後根據具體請求再做不同操作。整個邏輯很簡單,就是把要乾的事情從服務端推遲到線程池裡面去做。前文提到的打包作業是這樣的:以前服務端解析完頭部後,分別把三個代表查詢、下載、上傳的回調函數指針設置好,加入到作業隊列裡面。按照本文所述的新方法,服務端不用關心具體的操作是什麼,只需要把回調函數指針Http_server_callback()和參數client_socket_fd傳遞到線程池就行了。而Http_server_callback()是新添加的一個函數,其代碼如下:
1 void *Http_server_callback(void *arg){ 2 3 if(arg==NULL){ 4 printf("Http_server_callback,argument error\n"); 5 exit(0); 6 } 7 8 callback_arg *cb_arg=(callback_arg *)arg; 9 int client_socket_fd=cb_arg->socket_fd; 10 memset(cb_arg->server_buffer, 0, sizeof(http_request_buffer)+4); 11 int ret=recv(client_socket_fd,cb_arg->server_buffer,sizeof(http_request_buffer)+4,0); 12 if(ret!=(4+sizeof(http_request_buffer))){ 13 close(client_socket_fd); 14 return (void *)0; 15 } 16 17 http_request_buffer *hrb=(http_request_buffer *)(cb_arg->server_buffer); 18 if(hrb->request_kind==0){ 19 20 callback_arg_query cb_arg_query; 21 cb_arg_query.socket_fd=client_socket_fd; 22 cb_arg_query.server_buffer=cb_arg->server_buffer; 23 cb_arg_query.server_buffer_size=cb_arg->server_buffer_size; 24 strcpy(cb_arg_query.file_name, hrb->file_name); 25 26 Http_server_callback_query((void *)(&cb_arg_query)); 27 28 }else if(hrb->request_kind==1){ 29 30 callback_arg_upload cb_arg_upload; 31 cb_arg_upload.socket_fd=client_socket_fd; 32 cb_arg_upload.server_buffer=cb_arg->server_buffer; 33 cb_arg_upload.server_buffer_size=cb_arg->server_buffer_size; 34 cb_arg_upload.range_begin=hrb->num1; 35 cb_arg_upload.range_end=hrb->num2; 36 37 strcpy(cb_arg_upload.file_name, hrb->file_name); 38 39 Http_server_callback_upload((void *)(&cb_arg_upload)); 40 41 }else if(hrb->request_kind==2){ 42 43 callback_arg_download cb_arg_download; 44 cb_arg_download.socket_fd=client_socket_fd; 45 cb_arg_download.server_buffer=cb_arg->server_buffer; 46 cb_arg_download.server_buffer_size=cb_arg->server_buffer_size; 47 cb_arg_download.range_begin=hrb->num1; 48 cb_arg_download.range_end=hrb->num2; 49 50 strcpy(cb_arg_download.file_name, hrb->file_name); 51 52 Http_server_callback_download((void *)(&cb_arg_download)); 53 54 }else{ 55 56 } 57 58 59 }
2. 上傳部分代碼的改進
int recv_size=0;
1 while(1){ 2 int ret=recv(client_socket_fd,server_buffer+recv_size,range_end-range_begin+1-recv_size,0); 3 if(ret<=0){ 4 perror("Http_server_callback_upload,recv in while"); 5 break; 6 } 7 8 recv_size+=ret; 9 if(recv_size==(range_end-range_begin+1)){ 10 break; 11 } 12 13 }
上面是服務端上傳功能的部分邏輯,在一個while無限迴圈中,服務端接收客服端發送過來的[n,m]區間的數據,因為recv一次不一定能夠接收完整個[n,m]區間的數據,因此需要在迴圈裡面不斷地接收,直到接收到的數據達到m-n+1的長度,這個時候就用break跳出迴圈。另外如果recv的返回值ret<=0,則表明網路出錯或者客服端斷開網路,此時也要break出去。
跳出while迴圈後,要判斷是正確接收到了完整數據還是出錯了,並且給客服端發送一個ack消息,客服端根據ack消息來決定下一步的走向,該部分邏輯如下:
1 if(recv_size==(range_end-range_begin+1)){ 2 3 int ret1=fwrite(server_buffer,range_end-range_begin+1, 1, fp); 4 memset(server_buffer, 0, sizeof(http_request_buffer)+4); 5 http_request_buffer *hrb=(http_request_buffer *)server_buffer; 6 if(ret1==1){ 7 hrb->request_kind=3; 8 hrb->num1=range_begin; 9 hrb->num2=range_end; 10 }else{ 11 hrb->request_kind=4; 12 } 13 14 int ret=send(client_socket_fd,server_buffer,sizeof(http_request_buffer)+4,0); 15 16 if(ret!=(sizeof(http_request_buffer)+4)){ 17 18 perror("Http_server_callback_upload, send ack to client"); 19 20 }else{ 21 22 23 } 24 25 26 }else{
在第6行判斷如果數據接收完畢並且成功寫入到服務端,則給客服端發送一個正確接收並寫入的消息,或者設置request_kind=4,代表服務端接收失敗,客服端需要重新發送數據。更詳細的代碼請讀者直接閱讀github裡面的源代碼。
四 一些遇到的問題、bug
1. 在後來跑JDFS的時候,發現會提示no host to route的錯誤,發現原因是虛擬Ubuntu的ip地址發生了變化,執行下ifconfig命令,用最新的ip地址就可以了。
2.
1 int ret=fread(upload_buffer+sizeof(http_request_buffer)+4, size_of_last_piece, 1, fp); 2 if(ret!=1 && ret!=0){ 3 printf("JDFS_http_upload,fread failed,ret=%d\n",ret); 4 exit(0); 5 }
上面這段代碼是客戶端讀取文件的最後一段數據準備上傳,下麵是一個if判斷語句,之前判斷語句是if(ret!=1){...},而且之前一直上傳也沒出現過fread的錯誤,但是這次卻發生客戶端上傳文件都能成功但是到了傳送最後一部分數據的時候fread老是提示錯誤,經分析fread由於到達了文件尾部,所以返回0,在if語句裡面加上這個判斷就沒問題了。但是奇怪的是,筆者之前好多次上傳都成功並沒有提示這個錯誤啊。
3. 在下載功能部分,客戶端從服務端recv數據,一旦recv返回值小於等於0,則不管錯誤原因的類型,客戶端無條件重新連接到服務端,並請求數據。而客戶端上傳數據到服務端,某種程度上比較像服務端從客戶端下載數據,不同的是服務端此時是被動從客戶端下載數據。那麼此時如果服務端接收數據時recv返回錯誤的結果,服務端不應該重新連接客戶端請求那部分失敗的數據,而應該是告訴客戶端數據接收失敗,由客戶端決定此時應該怎麼辦。為什麼呢?一方面,服務端是被動的服務客戶端的請求,如果服務端主動向客服端重新連接,並請求那部分失敗的數據,此時服務端變成了客戶端,客戶端變成了服務端,這不符合C/S的模型;另一方面,服務端應該是服務大量併發的請求,也不應該因為某一個請求服務失敗,就主動重新連接客戶端,請求數據,萬一這個過程老是出錯,服務端豈不是一直陷入特定的請求泥潭,而大量其他的請求得不到服務?
所以,服務端只需要告訴客戶端該請求服務失敗就行了,剩下的客戶端要麼重新向服務端提交請求,要麼終止執行或者其他。筆者一開始想的比較簡單,在服務失敗的時候,服務端關閉socket fd,這樣客戶端檢測到鏈接被關閉,自然就知道服務失敗了。在客戶端邏輯里,如果send失敗,只發送了部分數據,也關閉鏈接,這樣服務端就檢測到socket fd鏈接被關閉。這麼做結果引發了很多問題,原因在於send()端一次發送的數據,recv()端可能要分好幾次才能接收完畢,如果一方已經close套接字,而另一方還沒有接收完數據,就因為對端close了而接收失敗,因此close的時機不好協調。
於是取消了用close()傳遞消息的方法,而改為:線程池Job的一次操作結果,無論有沒有達到目的,都給客戶端發送一條ack確認信息,客戶端根據ack信息如果成功則繼續,否則重新上傳失敗的數據。這麼做基本上解決了問題,但是很奇怪的是,非常偶然的情況下會出現一個bug:在重新啟動server端,然後執行客戶端的時候,服務端調用recv的時候會提示bad file descriptor的錯誤;在重新啟動server一到兩次後又恢復正常了,這個錯誤由於非常難重新,目前還沒有找到問題的根源所在。
4. 在調試的過程中,還遇到過另外一個問題:客戶端是執行的上傳功能,而服務端有時候會解析為查詢的操作。經分析可能原因如下:有可能客戶端發送了[n1,m1] [n2,m2]兩段數據, 而服務端接收的序列很可能是這樣的,[n1,b],[b+1,m1],[n2,m2]. 服務端在接收完[n1,b]後(由於網路原因[n1,m1]很可能不是一次接收完成),下一次接收[b+1,m1]的時候誤以為是一段新的數據,並把開頭若幹位元組的數據當做頭部處理,而頭部恰好有一部分數據是0,而0就代表著查詢請求。但是經過研究代碼並沒有發現明顯會產生上述場景的條件。由於修複其他bug後,導致這個錯誤沒能繼續重現,現在也很難找到根源,也留到以後再研究吧。
五 結束語
至此本篇博客就結束了,此篇博客主要解決了上傳功能的並行化問題,以及修複了一些bug,當然還有一些不容易重新的bug,其原因有待進一步的分析解決。截止目前JDFS的上傳、下載功能已趨於完成了,下一篇博客開始將在此基礎上增加分散式文件管理的功能,比如把本地文件冗餘地存儲於不同的虛擬節點上,查詢虛擬文件系統,從虛擬文件系統上讀取目標文件等。歡迎繼續關註本系列博客,我們下期再見。