對於OpenMP,小喵其實並不是瞭解很多,而且小喵本身也只用到了OpenMP的最簡單的功能。在這裡主要是分享一個自己常用的寫簡單的並行程式的思路。希望能幫助到大家。 這個設計模式的主要特點如下: 1,處理的任務是獨立的; 2,可以在運行中輸出結果,而不是最終才輸出; 3,有限的資源占用; 4,在每次... ...
小喵的嘮叨話:最近很久沒寫博客了,一是因為之前寫的LSoftmax後饋一直沒有成功,所以在等作者的源碼。二是最近沒什麼想寫的東西。前兩天,在預處理圖片的時候,發現處理200w張圖片,跑了一晚上也才處理完一半。早上的時候,出於無奈,花半小時改寫了一個簡單調用OpenMP的處理程式,用了30個核心,然後一小時不到就處理完了。感慨在多核的時代,即使是簡單的程式,如果能支持多核,應該都能節省不少時間。
本文系原創,轉載請註明出處~
小喵的博客:http://www.miaoerduo.com
博客原文:http://www.miaoerduo.com/openmp/應用openmp的一個簡單的設計模式.html
一、寫在前面
對於OpenMP,小喵其實並不是瞭解很多,而且小喵本身也只用到了OpenMP的最簡單的功能。在這裡主要是分享一個自己常用的寫簡單的並行程式的思路。希望能幫助到大家。
這個設計模式的主要特點如下:
1,處理的任務是獨立的;
2,可以在運行中輸出結果,而不是最終才輸出;
3,有限的資源占用;
4,在每次任務的執行時間不同的情況下,也能很好的工作;
5,在每次任務執行需要占用私有的數據時(依賴了線程不安全的庫),也可以很好的工作;
6,輸出是有序的
註意:本文中僅介紹小喵自己用到的幾個OpenMP的功能。既不深入也不完善。僅適合初學者。
小喵學習OpenMP主要是看了周明偉的博客:
OpenMP編程指南:http://blog.csdn.net/drzhouweiming/article/details/4093624
想要比較深入地學習的童鞋請看周老師的博客。
對於什麼是OpenMP,OpenMP有什麼優點等的問題。周老師的博客也很詳細的說明。這裡小喵就不多廢話了。直奔主題。
二、如何使用OpenMP
小喵使用的開發環境是Linux,windows的童鞋可以看一下這個博客:http://www.cnblogs.com/yangyangcv/archive/2012/03/23/2413335.html。MAC上的GCC實際上是Clang,想要使用OpenMP的話比較麻煩。要額外裝一些東東,自己bing一下就有。
本喵的編譯環境是CentOS 7, GCC 4.8.5。大多數系統和編譯器都支持OpenMP了。
先舉個小慄子:
不使用OpenMP:
1 #include <iostream> 2 #define N 100000000 3 4 int fun() { 5 int a = 0; 6 for (int i = 0; i < N; ++ i) { 7 a += i; 8 } 9 return a; 10 } 11 12 int main() { 13 14 for (int i = 0; i < 100; ++ i) { 15 fun(); 16 } 17 std::cout << "finish" << std::endl; 18 return 0; 19 }
之後使用g++編譯,並計時:
g++ sample_without_omp.cpp -o sample_without_omp.bin
time ./sample_with_omp.bin
運行結果:
./sample_without_omp.bin 24.42s user 0.00s system 100% cpu 24.417 total
這裡可以看到用了100%的cpu,總時間是24.417 s。
使用OpenMP,調用2個線程:
1 #include <iostream> 2 #include <omp.h> 3 4 #define N 100000000 5 6 int fun() { 7 int a = 0; 8 for (int i = 0; i < N; ++ i) { 9 a += i; 10 } 11 return a; 12 } 13 14 int main() { 15 16 #pragma omp parallel for num_threads(2) schedule(dynamic) 17 for (int i = 0; i < 100; ++ i) { 18 fun(); 19 } 20 std::cout << "finish" << std::endl; 21 return 0; 22 }
這裡源碼的差別是多了一個omp.h的頭文件,和一個奇怪的語句:
#pragma omp parallel for num_threads(2) schedule(dynamic)
編譯的時候,也有點小修改:
g++ sample_with_omp.cpp -o sample_with_omp.bin -fopenmp
time ./sample_with_omp.bin
運行結果如下:
./sample_with_omp.bin 24.32s user 0.01s system 199% cpu 12.182 total
可以看出,user的時間幾乎沒變,這表示CPU總的運行時間沒有變化。但是cpu的使用變成了199%,total的時間變成了12.182 s。這就表明瞭我們使用了2個cpu,使得運行時間成功減半了!
是不是很愉快,我們只添加了2行代碼,就使得程式的速度翻倍。可見OpenMP是多麼的簡潔實用。
那麼,現在是不是不用小喵說,我們也知道怎麼給程式加入OpenMP的支持了呢?
歸納一下,主要有三點:
1,加入OpenMP的頭文件 omp.h
2,使用合適的編譯器指令修飾我們需要並行的部分(線程數、任務分配模式等等,後面會講到)
3,編譯的時候加入openmp的支持,編譯的時候加入參數 -fopenmp
三、fork/join的並行執行模式
我們之前看到了一個簡單的例子,可以看出,程式其實是有串列部分和並行部分兩個部分組成的。
在程式剛啟動的時候,只有一個主線程,當執行到並行部分的時候(上面的例子中就是pragma之後的for迴圈),並行的代碼會通過派生其他線程來執行。只有當並行的所有代碼執行完之後,才會繼續執行串列的部分。
因此主要的運行流程是這個樣子的:
理解這個流程是相當重要的,可以避免很多的不必要的錯誤。一個常見的錯誤就是資源訪問的衝突。比如文件,流對象等,如果在並行的代碼部分隨意訪問這些資源,就可能會導致不可預見的錯誤。這在多線程編程中也是最常出現的錯誤,我們在下麵會具體說到。
四、OpenMP的常用指令和庫函數
在C/C++中,OpenMP的指令使用的格式如下:
#pragma omp 指令 [子句[子句]...]
指令用來指示下麵的代碼的運行模式。子句是給出一些額外的信息。
這裡主要介紹兩個指令:parallel,for
parallel:用在代碼段之前,表示下麵的代碼段使用多線程運行。
for:用於for迴圈之前,將迴圈分配到多個線程中並行執行,必須保證每次迴圈之間無相關性。
parallel for:parallel 和for語句的結合,也是用在一個for迴圈之前,表示for迴圈的代碼將被多個線程並行執行。
小喵使用的時候都是直接使用了parallel for這個組合指令。用來對緊接著的for迴圈的代碼段進行並行。其他的指令請查閱之前提到的博客。
子句中主要是給出一些額外的設置,這裡也主要介紹2個:num_threads,schedule。
num_threads:指定線程的數目(不設置該參數似乎會使用和cpu核心數相同的線程數)。
schedule:指定如何調度for迴圈迭代。有4種模式:static、dynamic、guided,runtime,後面會專門講到。
這裡,我們再回顧一下之前寫的代碼:
#pragma omp parallel for num_threads(2) schedule(dynamic)
是不是豁然開朗。這句話的意思是,使用OpenMP(#pragma omp),將下麵的for迴圈使用多線程去執行(parallel for),線程數為2(num_threads(2)),任務調度方式使用dynamic模式(schedule(dynamic))。
現在,讓我們趁熱打鐵,學習for迴圈的寫法。
這裡,小喵直接複製了周老師的說法(解釋得實在太好了):
for 迴圈語句中,書寫是需要按照一定規範來寫才可以的,即for迴圈小括弧內的語句要按照一定的規範進行書寫,for語句小括弧里共有三條語句
for( i = start; i < end; i++)i = start; 是for迴圈里的第一條語句,必須寫成 “變數=初值” 的方式。如 i=0
i < end; 是for迴圈里的第二條語句,這個語句里可以寫成以下4種形式之一:
變數 < 邊界值
變數 <= 邊界值
變數 > 邊界值
變數 >= 邊界值
如 i>10 i<10 i>=10 i<=10 等等
最後一條語句i++可以有以下9種寫法之一
i++
++i
i--
--i
i += inc
i -= inc
i = i + inc
i = inc + i
i = i – inc
例如i += 2; i -= 2;i = i + 2;i = i - 2;都是符合規範的寫法。
可見一般來說,我們的for迴圈的寫法OpenMP是支持的。那麼有沒有OpenMP不支持的for迴圈呢?小喵沒試過,不過可以猜想,for (auto &v: arr) 這種寫法是不支持的。使用迭代器的話,不知道能不能使用,小喵沒有驗證過。喵粉如果好奇的話,可以自行驗證一下。
在介紹schedule之前,我們先學習幾個常用的庫函數,用來獲取和設置OpenMP的各種運行時狀態:
omp_get_num_procs, 返回運行本線程的多處理機的處理器個數。通常可以根據處理器的個數來合理設置並行的線程數。
omp_get_num_threads, 返回當前並行區域中的活動線程個數。比如上面的例子,應該就會返回2。
omp_get_thread_num, 返回線程號。並行區域的代碼會被多個線程執行,而每個線程都有一個自己的ID,也就是線程號。如果我們設置使用N個線程,那麼線程號會是0,1,2,...,N-1。
omp_set_num_threads, 設置並行執行代碼時的線程個數。和num_threads功能相同。
五、OpenMP中的任務調度
那麼接下來,我們開始學習任務調度的四種模式。使用的子句就是schedule。
schedule的使用格式:
schedule(type[, size])
type主要有4種:static,dynamic,guilded,runtime。
1、static(靜態調度)
表示靜態調度,當不設置schedule的時候,多數編譯器就是使用這種調度方式。它十分的簡單。給定N個任務,啟用t個線程,那麼直接給每個線程分配N/t個任務,考慮到N可能不能整除t,所以每個線程的任務數會有極小的不同。
下麵舉個例子:
1 #include <omp.h> 2 #include <iostream> 3 4 int main() { 5 6 const int task_num = 10; 7 8 #pragma omp parallel for num_threads(2) schedule(static) 9 for (int i = 0; i < task_num; ++ i) { 10 std::cout << "i = " << i << " thread_id = " << omp_get_thread_num() << std::endl; 11 } 12 13 return 0; 14 }
輸出結果如下:
i = 0 thread_id = 0 i = 5 thread_id = 1 i = 6 thread_id = 1 i = 7 thread_id = 1 i = 8 thread_id = 1 i = 9 thread_id = 1 i = 1 thread_id = 0 i = 2 thread_id = 0 i = 3 thread_id = 0 i = 4 thread_id = 0
可以看出,0-4被分配給了0線程,5-9被分配給了1線程。由於是多線程,所以列印出來的順序並不能保證。
如果使用了size,則每次回分配給一個線程size次任務,依次迭代。
1 #include <omp.h> 2 #include <iostream> 3 4 int main() { 5 6 const int task_num = 10; 7 8 #pragma omp parallel for num_threads(2) schedule(static, 2) 9 for (int i = 0; i < task_num; ++ i) { 10 std::cout << "i = " << i << " thread_id = " << omp_get_thread_num() << std::endl; 11 } 12 13 return 0; 14 }
運行結果和上面稍有不同:
i = 2 thread_id = 1 i = 0 thread_id = 0 i = 3 thread_id = 1 i = 6 thread_id = 1 i = 7 thread_id = 1 i = 1 thread_id = 0 i = 4 thread_id = 0 i = 5 thread_id = 0 i = 8 thread_id = 0 i = 9 thread_id = 0
可以看出,連續的2個任務會被分配到同一個線程。0、1給線程0,2、3給線程1,4、5給線程0,6、7給線程1。。。
static是一個十分簡單的策略,但同時會帶來一些問題。比如當任務的執行時間差異很大的時候,由於OpenMP的fork/join的機制,速度快的線程必須等待速度慢的線程,如果恰好分配的很不合理的話(耗時的任務集中在了某一個線程),其他的線程可能會等待較長的時間。這顯然不利於我們充分利用多核資源。
2、dynamic(動態調度)
動態調度會根據運行時的線程狀態來決定下一次的迭代。當一個線程執行完自己的任務之後,會再去領取任務。不設置size的話,一個線程一次會分配一個任務,當執行完了,會再領取一個任務。如果設置了size,線程則一次領取size個任務。
dynamic是小喵最愛的模式!是因為它和標準的生產者消費者模式很相似。這裡生產者預設一次性生產所有的任務,然後每個線程都是一個消費者,當自己執行完了,會再次去領取任務。這樣,任務的分配會更加的有彈性,更好的適應了任務時間不同的情況。
下麵也是一個小慄子,不使用size:
1 #include <omp.h> 2 #include <iostream> 3 4 int main() { 5 6 const int task_num = 10; 7 8 #pragma omp parallel for num_threads(2) schedule(dynamic) 9 for (int i = 0; i < task_num; ++ i) { 10 std::cout << "i = " << i << " thread_id = " << omp_get_thread_num() << std::endl; 11 } 12 13 return 0; 14 }
運行結果:
i = 0 thread_id = 0 i = 1 thread_id = 1 i = 2 thread_id = 0 i = 3 thread_id = 0 i = 4 thread_id = 1 i = 5 thread_id = 1 i = 6 thread_id = 1 i = 7 thread_id = 1 i = 8 thread_id = 1 i = 9 thread_id = 1
可以看出任務的分配是不均勻的。
使用size之後:
1 #include <omp.h> 2 #include <iostream> 3 4 int main() { 5 6 const int task_num = 10; 7 8 #pragma omp parallel for num_threads(2) schedule(dynamic, 2) 9 for (int i = 0; i < task_num; ++ i) { 10 std::cout << "i = " << i << " thread_id = " << omp_get_thread_num() << std::endl; 11 } 12 13 return 0; 14 15 }
運行結果如下:
i = 0 thread_id = 0 i = 2 thread_id = 1 i = 3 thread_id = 1 i = 4 thread_id = 1 i = 5 thread_id = 1 i = 6 thread_id = 1 i = 7 thread_id = 1 i = 8 thread_id = 1 i = 9 thread_id = 1 i = 1 thread_id = 0
線程0先領取了任務0、1。線程1領取了2、3。線程1做完之後,又領取了4、5。。。
可以看出,每次的任務分配是以2個為單位的,分配的順序視運行時狀態動態調整。
3、guided(啟髮式調度)
採用啟髮式調度方法進行調度,每次分配給線程迭代次數不同,開始比較大,以後逐漸減小。
size表示每次分配的迭代次數的最小值,由於每次分配的迭代次數會逐漸減少,少到size時,將不再減少。如果不知道size的大小,那麼預設size為1,即一直減少到1。具體採用哪一種啟髮式演算法,需要參考具體的編譯器和相關手冊的信息。
4、runtime
runtime調用,並不是一個真的調度方式。它是根據環境變數的OMP_SCHEDULE來確定調度模式。最終仍然是上述三種方式之一。具體用法可以查看相關文檔。
六、一個常用的設計模式
在做了前5個部分的鋪墊之後,相信喵粉們已經初步掌握了OpenMP的幾個基本的知識。那麼,現在就開始講我們最重要的部分——小喵最常用的一個設計模式。
主要流程如下:
<1>初始化:
1,定義線程數為thread_num
2,定義平均每個線程上的任務數為task_per_thread
3,初始化處理器對象(handle_arr),大小為thread_num
4,初始化任務空間(task_arr),大小為thread_num * task_per_thread
5,初始化結果空間(result_arr),大小為thread_num * task_per_thread
<2>讀取任務(串列):
1,讀取thread_num * task_per_thread個任務,存入task_arr。
2,記錄讀取任務的數目task_num(task_num <= thread_num * task_per_thread)
<3>任務處理(並行):
1,任務的task_id就是for迴圈的下標
2,通過omp_get_thread_num獲取當前的線程id,根據線程id查找處理器對象。
3,使用處理器處理定義的task_id對應的任務task_arr[task_id]
4,將執行結果存入result_arr[task_id]的位置
<4>結果處理(串列):
根據task_num,處理完result_arr中的結果。
<5>程式狀態判斷
判斷task_num是否等於thread_num * task_per_thread。
如果相等,說明任務隊列沒有執行完,繼續<2>開始執行。
如果不相等,則說明任務隊列全部處理完,程式執行結束<6>。
<6>enjoy your programming
讓我們來一步一步的理解這個模式。
<1>初始化:
這裡主要完成一些初始化的工作。
1)thread_num和task_per_thread
可以看到,這裡初始化了兩個參數。那麼為什麼需要thread_num和task_per_thread這兩個參數呢?
為了更好的利用和控制資源。
根據機器的不同,我們可以自己設置需要開啟的線程數,這就是thread_num。
反派汪:我覺得你說的有問題。我們在程式中明明可以利用omp_get_num_procs獲取機器的所有的處理器的數目,然後就啟用這麼多的線程的話,不就能最大限度的使用所有的計算能力了嗎?
喵座:其實不然。假如伺服器的處理器數目為40,按照你的思路,則會啟用40個線程。這樣一是會造成其他人不能正常的工作,二是當伺服器本來就有其他的程式在run的時候,你的40個線程亦不能很好的工作。不如自己在運行之前設置一下需要的計算資源數,會更方便一點。
那麼為什麼我們需要設置這個task_per_thread呢?
因為資源是有限的。
考慮到最高效的工作方式,就是讓所有的線程不間斷的工作。比如一次性讀完所有的任務列表,然後使用dynamic做完所有的任務。這樣在任務做完之前,每個線程都會無間歇的工作。
理想是完美的,現實是殘酷的。如果任務非常多,比如小喵需要處理的200w條數據。很難一次性全部載入記憶體。而且,即使這麼做了,也必須得任務全部做完,才能得到運行結果,時效性很差。
那麼我們不設置thask_per_thread不行嗎?或者就把這個設置成1。每次就讀取線程數相同的任務數,這樣代碼編寫不應該更簡單嗎?
這時候,讓我們回顧一下OpenMP的調度機制。如果每次只讀取thread_num這麼多的個任務數,那麼每次並行計算的時候,每個線程都會分配到一個任務。那麼總的耗時將變成最慢的任務的執行時間。
舉個簡單的例子,比如有12個任務,耗時為2,1,2,1,2,1,2,1,2,1,2,1。我們使用2個線程。那麼每處理2個任務,耗時都是2。總時間是12。
如果我們每6個一起執行,也是使用2個線程。需要的總時間會變成了10。
執行過程看下圖:
可以使用task_per_task這個策略,每次處理thread_num * task_per_task個任務的話,可以更好了利用多核的資源。(task_per_task設得越大,講道理效果應該越好。小喵自己喜歡設成10或20)
另一個好處是,當我們處理完這些任務之後,可以立刻將結果寫入結果文件。
2)處理器對象:
這是可選的。我們在實際處理任務的時候,有時候會使用到一些特殊的資源,而且必須保證這些資源是獨占的(比如網路通信的套接字,文件對象,或是線程不安全的一些實例的對象)。最簡單高效的方法就是為每個線程都初始化一個自己的處理器(或是資源)對象。這樣在實際處理的時候,每個線程可以根據自己的線程id找到自己的處理器,從而避免了多線程中的各種問題。
3)task_arr和result_arr
這兩個空間是用來存放每次並行處理的任務和結果的。大小自然和每次並行的任務數(thread_num * task_per_thread)相等。考慮到每次並行都可以復用這些空間,所以提前申請好足夠的空間可以提高運行效率。
<2>讀取任務:
我們通常會將任務的內容保存在文件中。而文件的讀取是不能並行的。因此我們需要提前按串列的方式將任務讀取到任務隊列task_arr中。每次讀取thread_num * task_per_thread個。考慮到任務總數可能不是thread_num * task_per_thread的整數倍,因此最後一次讀取的任務數會稍小一點。我們將每次讀取的任務數記錄下來,命名為task_num。
<3>任務處理:
這裡就是我們剛剛學習到的OpenMP的用武之地。
通常的寫法是:
1 #pragma omp parallel for num_threads(thread_num) schedule(dynamic) 2 for (int task_idx = 0; task_idx < task_num; ++ task_idx) { 3 int thread_id = omp_get_thread_num(); // 獲取當前的線程id 4 handle_type handle = handle_arr[thread_id]; // 根據線程id,獲取處理器 5 result_type result = handle->process(task_arr[task_idx]); // 處理指定的任務 6 result_arr[task_idx] = result; // 在指定位置寫回執行的結果 7 }
獲取當前的線程號,然後獲取處理器,然後處理對應的任務,並將結果存放進對應的位置。
註意,線程之間是獨立的,不能讀寫同一個線程不安全的資源。而且在並行區域不保證任何的線程間的順序。
這樣,我們就能安全且高效的執行完每次的任務了。
<4>結果處理:
這部分十分簡單,因為任務的結果已經按順序存進了result_arr中,有效的result是前task_num個,之後想怎麼處理都是喵粉自己的事情了。
<5>程式狀態判斷:
正如我們在<2>中說到的,我們每次處理一批任務,最後的一批任務的個數將不是thread_num * task_per_thread這麼多。因此需要與task_num比較一下。如果相等,就可能是我們還沒有處理完,回到<2>繼續執行。如果不相等,那就說明我們處理完了所有的任務了!你可以坐下來喝杯caffe,然後enjoy多線程帶來的快感了。
最後,附上一個簡單的demo。使用多線程,從文本上讀取圖片的list,讀取圖片的大小,並將結果存入一個新的文本中。
1 #include <opencv2/opencv.hpp> 2 #include <fstream> 3 #include <iostream> 4 #include <string> 5 6 typedef struct { 7 int width; 8 int height; 9 } Size; 10 11 int main(int argc, char **argv) { 12 13 if (argc < 3) { 14 std::cerr << "usage: get_size.bin input_list output_list" 15 " [thread_num] [task_per_thread]" << std::endl; 16 return 1; 17 } 18 const int thread_num = (argc > 3) ? atoi(argv[3]):1; 19 const int task_per_thread = (argc > 4) ? atoi(argv[4]): 10; 20 21 const int total_task = thread_num * task_per_thread; 22 23 std::string image_name_arr[total_task]; // task arr 24 Size image_size_arr[total_task]; // result arr 25 26 std::ifstream is(argv[1]); 27 std::ofstream os(argv[2]); 28 29 int count = 0; 30 31 while (1) { 32 33 // 讀取任務 34 int task_num = 0; 35 for (int task_idx = 0; task_idx < total_task; ++ task_idx) { 36 if (!(is >> image_name_arr[task_idx])) break; 37 ++ task_num; 38 ++ count; 39 } 40 41 // 處理任務 42 #pragma omp parallel for num_threads(thread_num) schedule(dynamic) 43 for (int task_idx = 0; task_idx < task_num; ++ task_idx) { 44 45 cv::Mat image = cv::imread(image_name_arr[task_idx]); 46 image_size_arr[task_idx].width = image.cols; 47 image_size_arr[task_idx].height = image.rows; 48 49 } 50 51 std::cout << "process #" << count << std::endl; 52 53 // 處理結果 54 for (int task_idx = 0; task_idx < task_num; ++ task_idx) { 55 os << image_name_arr[task_idx] << " " 56 << image_size_arr[task_idx].width << " " 57 << image_size_arr[task_idx].height << "\n"; 58 } 59 60 // 狀態判斷 61 if (task_num != total_task) break; 62 63 } 64 return 0; 65 }
編譯和執行:
g++ get_image_size_with_omp.cpp -o get_image_size_with_omp -fopenmp -I/path/to/opencv/include -L/path/to/opencv/lib -lopencv_core -lopencv_highgui ./get_image_size_with_omp /path/to/image_list /path/to/save/result 2 20
怎麼樣,使用這種模式來實現簡單的多線程程式是不是很簡單?
如果您覺得本文對您有幫助,那請小喵喝杯茶吧~~O(∩_∩)O~~
轉載請註明出處~