## 1、條件變數 當線程需要等待特定事件發生、或是某個條件成立時,可以使用條件變數`std::condition_variable`,它在標準庫頭文件``內聲明。 ```c++ std::mutex mut; std::queue data_queue; std::condition_variab ...
1、條件變數
當線程需要等待特定事件發生、或是某個條件成立時,可以使用條件變數std::condition_variable
,它在標準庫頭文件<condition_variable>
內聲明。
std::mutex mut;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;
void data_preparation_thread()
{
while (more_data_to_prepare())
{
const data_chunk data = prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
}
void data_processing_thread()
{
while (true)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [] { return !data_queue.empty(); });
data_chunk data = data_queue.front();
data_queue.pop();
lk.unlock();
process(data);
if (is_last_chunk(data)) { break; }
}
}
wait()
會先在內部調用lambda函數判斷條件是否成立,若條件成立則wait()
返回,否則解鎖互斥並讓當前線程進入等待狀態。當其它線程調用notify_one()
時,當前調用wait()
的線程被喚醒,重新獲取互斥鎖並查驗條件,若條件成立則wait()
返回(互斥仍被鎖住),否則解鎖互斥並繼續等待。
wait()
函數的第二個參數可以傳入lambda函數,也可以傳入普通函數或可調用對象,也可以不傳。
notify_one()
喚醒正在等待當前條件的線程中的一個,如果沒有線程在等待,則函數不執行任何操作,如果正在等待的線程多於一個,則喚醒的線程是不確定的。notify_all()
喚醒正在等待當前條件的所有線程,如果沒有正在等待的線程,則函數不執行任何操作。
2、使用future等待一次性事件發生
C++標準程式庫有兩種future
,分別由兩個類模板實現,即std::future<>
和std::shared_future<>
,它們的聲明位於頭文件<future>
內。
2.1、從後臺任務返回值
由於std::thread
沒有提供直接回傳結果的方法,所以我們使用函數模板std::async()
來解決這個問題。std::async()
以非同步方式啟動任務,並返回一個std::future
對象,運行函數一旦完成,其返回值就由該對象持有。在std::future
對象上調用get()
方法時,當前線程就會阻塞,直到std::future
準備妥當並返回非同步線程的結果。std::future
模擬了對非同步結果的獨占行為,get()
僅能被有效調用一次,調用時會對目標值進行移動操作。
int find_the_answer_to_ltuae();
void do_other_stuff();
int main()
{
std::future<int> the_answer = std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout << "The answer is " << the_answer.get() << std::endl;
}
在調用std::async()
時,它可以接收附加參數進而傳遞給任務函數作為其參數,此方式與std::thread
的構造函數相同。更多啟動非同步線程的方法可參考下麵的常式:
struct X
{
void foo(int, const std::string&);
std::string bar(const std::string&);
};
X x;
auto f1 = std::async(&X::foo, &x, 42, "hello"); // 調用p->foo(42, "hello"),p是指向x的指針
auto f2 = std::async(&X::bar, x, "goodbye"); // 調用tmpx.bar("goodbye"), tmpx是x的拷貝副本
struct Y
{
double operator()(double);
};
Y y;
auto f3 = std::async(Y(), 3.141); // 調用tmpy(3.141),tmpy是由Y()生成的匿名變數
auto f4 = std::async(std::ref(y), 2.718); // 調用y(2.718)
X baz(X&);
std::async(baz, std::ref(x)); // 調用baz(x)
我們還能為std::async()
補充一個std::launch
類型的參數,來指定採用哪種方式運行:std::launch::deferred
指定在當前線程上延後調用任務函數,等到在future
上調用了wait()
或get()
,任務函數才會執行;std::launch::async
指定必須開啟專屬的線程,在其上運行任務函數。該參數的還可以是std::launch::deferred | std::launch::async
,表示由std::async()
的實現自行選擇運行方式,這也是這項參數的預設值。
auto f6 = std::async(std::launch::async, Y(), 1.2); // 在新線程上執行
auto f7 = std::async(std::launch::deferred, baz, std::ref(x)); // 在wait()或get()調用時執行
auto f8 = std::async(std::launch::deferred | std::launch::async, baz, std::ref(x)); // 交由實現自行選擇執行方式
auto f9 = std::async(baz, std::ref(x));
f7.wait(); // 調用延遲函數
2.2、關聯future實例和任務
std::packaged_task<>
連結future對象與函數(或可調用對象,下同)。std::packaged_task<>
對象在執行任務時,會調用關聯的函數,把返回值保存為future的內部數據,並令future準備就緒。若一項龐雜的操作能分解為多個子任務,則可以把它們分別包裝到多個std::packaged_task<>
實例之中,再傳遞給任務調度器或線程池,這就隱藏了細節,使任務抽象化,讓調度器得以專註處理std::packaged_task<>
實例,無需糾纏於形形色色的任務函數。
std::packaged_task<>
是類模板,其模板參數是函數簽名(例如void()
表示一個函數,不接收參數,也沒有返回值),傳入的函數必須與之相符,即它應接收指定類型的參數,返回值也必須可以轉換成指定類型。這些類型不必嚴格匹配,若某函數接收int
類型參數並返回float
值,則可以為其構建std::packaged_task<double(double)>
的實例,因為對應的類型可以隱式轉換。
std::packaged_task<>
具有成員函數get_future()
,它返回std::future<>
實例,該future的特化類型取決於函數簽名指定的返回值。std::packaged_task<>
還具備函數調用操作符,它的參數取決於函數簽名的參數列表。
std::mutex m;
std::deque<std::packaged_task<void()>> tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();
void gui_thread()
{
while (!gui_shutdown_message_received())
{
get_and_process_gui_message();
std::packaged_task<void()> task;
{
std::lock_guard<std::mutex> lk(m);
if (tasks.empty()) { continue; }
task = std::move(tasks.front());
tasks.pop_front();
}
task();
}
}
std::thread gui_bg_thread(gui_thread);
template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
std::packaged_task<void()> task(f);
std::future<void> res = task.get_future();
std::lock_guard<std::mutex> lk(m);
tasks.push_back(std::move(task));
return res;
}
2.3、創建std::promise
有些任務無法以簡單的函數調用表達出來,還有一些任務的執行結果可能來自多個部分的代碼,這時可以藉助std::promise
顯式地非同步求值。配對的std::promise
和std::future
可以實現下麵的工作機制:等待數據的線程在future上阻塞,而提供數據的線程利用相配的std::promise
設定關聯的值,使future準備就緒。
若需從給定的std::promise
實例獲取關聯的std::future
對象,調用前者的成員函數get_future()
即可,這與std::package_task
一樣。promise的值通過成員函數set_value()
設置,只要設置好,future即準備就緒,憑藉它就能獲取該值。如果std::promise
在被銷毀時仍未曾設置值,保存的數據則由異常代替。
void f(std::promise<int> ps)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
ps.set_value(42);
}
int main()
{
std::promise<int> ps;
std::future<int> ft = ps.get_future();
std::thread t(f, std::move(ps));
int val = ft.get();
std::cout << val << std::endl;
t.join();
}
2.4、將異常保存到future中
若經由std::async()
調用的函數拋出異常,則會被保存到future中,future隨之進入就緒狀態,等到其成員函數get()
被調用,存儲在內的異常即被重新拋出。std::packaged_task
也是同理,若包裝的任務函數在執行時拋出異常,則也會被保存到future中,只要調用get()
,該異常就會被再次拋出。自然而然,std::promise
也具有同樣的功能,它通過成員函數顯式調用實現。假如我們不想保存值,而想保存異常,就不應調用set_value()
,而應調用成員函數set_exception()
。
2.5、多個線程一起等待
若我們在多個線程上訪問同一個std::future
對象,而不採取額外的同步措施,將引發數據競爭並導致未定義的行為。std::future
僅能移動構造和移動賦值,而std::shared_future
的實例則能複製出副本。但即便改用std::shared_future
,同一個對象的成員函數卻依然沒有同步,若我們從多個線程訪問同一個對象,首選方式是:向每個線程傳遞std::shared_future
對象的副本,它們為各線程獨有,這些副本就作為各線程的內部數據,由標準庫正確地同步,可以安全地訪問。
future和promise都具備成員函數valid()
,用於判別非同步狀態是否有效。std::shared_future
的實例依據std::future
的實例構造而得,前者所指向的非同步狀態由後者決定。因為std::future
對象獨占非同步狀態,所以若要按預設方式構造std::shared_future
對象,則須用std::move
向其預設構造函數傳遞歸屬權。
std::promise<int> p;
std::future<int> f(p.get_future());
assert(f.valid());
std::shared_future<int> sf(std::move(f));
assert(!f.valid());
assert(sf.valid());
std::future
具有成員函數share()
,直接創建新的std::shared_future
對象,並向它轉移歸屬權。
std::promise<std::map<SomeIndexType, SomeDataType, SomeComparator, SomeAllocator>::iterator> p;
auto sf = p.get_future().share();
3、限時等待
有兩種超時機制可供選擇:一是延遲超時,線程根據指定的時長而繼續等待;二是絕對超時,在某個特定時間點來臨之前,線程一直等待。大部分等待函數都有變體,專門處理這兩種機制的超時。處理延遲超時的函數變體以_for
為尾碼,而處理絕對超時的函數變體以_until
為尾碼。例如std::condition_variable
的成員函數wait_for()
和wait_until()
。