前幾天在除錯的過程中,我注意到程式印出的 Stack Trace 和我的預期相去甚遠。在反覆研究之後,我發現一段有問題的程式碼。大家看得出來有什麼問題嗎?
extern int subtask1(int x);
extern int subtask2(int x);
int run(int a, int b) {
int result1;
std::thread t([&]() { result1 = subtask1(a); });
int result2 = subtask2(b);
if (result2 < 0) {
return -2;
}
t.join();
if (result1 < 0) {
return -1;
}
return 0;
}
問題
上面的程式碼會在 result2 小於 0 的時候直接回傳 -2。但是如果一個 std::thread 物件被解構之前沒有先呼叫 join 或 detach 成員函式,std::thread 的解構函式會直接呼叫 std::terminate 終止整個程式。
雖然直接呼叫 std::terminate 令人感到錯愕,但這並不是完全沒道理。如果 std::thread 解構函式自動呼叫 detach 成員函式,另一個執行緒的執行時間可能會比其參照物件的生命週期還長。這可能導致未定義行為。例如:前述範例中,另一個執行緒會參照 result1 與 a。如果 return -2 的時候,std::thread 解構函式呼叫 t.detach(),result1 與 a 就會變成懸空參照(Dangling Reference),存取它們會產生未定義行為。
如果 std::thread 解構函式自動呼叫 join 成員函式,則有可能產生非預期的 Dead Lock。如果上述範例的 subtask1 在執行過程中停下來等待 subtask2,但是在讓 subtask1 繼續執行之前 subtask2 就回傳錯誤並觸發 std::thread 解構函式(自動呼叫 t.join()),這就會讓兩個執行緒相互等待並形成 Dead Lock。
除此之外,自動呼叫 join 成員函式也有可能拉長整個程式的執行時間。以前面的例子來說,如果 subtask2 發生錯誤,我們就不在意 subtask1 的執行結果。但是為了執行 t.join(),主執行緒必須等待另一個執行緒。在一些情況下,這是不必要的浪費。
解決方法
首先我們必檢查兩個執行緒之間的同步關係。如果兩者之間除了「建立新執行緒」與「以 join 函式合併執行緒」之外還有其他同步關係(例如:以 Mutex 或 Condition Variable 相互溝通),我們就必須重新審視兩者的同步協議。我們必須確定「等待的執行緒」絕對能得到「另一個執行緒」的回應。舉例來說:
bool is_ready = false;
std::mutex m;
std::condition_variable cv;
int subtask1(int x) {
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, []() { return is_ready; });
// ...
}
int subtask2(int x) {
if (!is_valid(x)) {
return -1; // Problemetic
}
{
std::lock_guard<std::mutex> lock(m);
is_ready = true;
cv.notify_all();
}
// ...
}
上述程式在處理錯誤時會忘記通知另一方。如果你的程式有這種問題,單純地呼叫 join 或 detach 函式是無法解決問題的。我們必須在同步協議裡定義「錯誤狀態(Error State)」,讓另一個執行緒也能處理例外情況。例如:
bool is_error = false; // Added
bool is_ready = false;
std::mutex m;
std::condition_variable cv;
int subtask1(int x) {
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, []() { return is_ready || is_error; });
if (is_error) { // Added
// Return error early
return -1;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
return x;
}
int subtask2(int x) {
if (!is_valid(x)) {
std::lock_guard<std::mutex> lock(m); // Added
is_error = true; // Added
cv.notify_all(); // Added
return -1;
}
{
std::lock_guard<std::mutex> lock(m);
is_ready = true;
cv.notify_all();
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return x;
}
我們也能近一步思考要不要改寫整個同步流程。例如:把上面的 is_valid(x) 檢查搬出 subtask2,直接在建立執行緒 t 之前就先排除問題。不過這已經超出本文的討論範圍,以後有機會再另外介紹。
在檢查同步關係之後,我們必須思考要以 join() 或是 detach() 解決 std::thread::~thread 呼叫 std::terminate 的問題。使用 join() 會比較簡單,但是如前所述 join() 會讓主執行緒等待另一個執行緒(不論你是否在乎其執行結果)。另一方面,使用 detach() 時,我們必須確保另一個執行緒使用的物件在其執行期間都不會被解構。一個簡單的充份條件是讓另一個執行緒持有它所需的物件。如果情況複雜無法簡單地判斷,使用 join() 會是比較安全的選擇。
以下筆者分別介紹四種解法:
- 呼叫
join函式 - 改用
std::jthread(呼叫join的變型) - 呼叫
detach函式 - 改用
std::async(呼叫detach的變型)
解法一:呼叫 join 函式
最直接的作法是在 std::thread::~thread 被呼叫之前呼叫 join 成員函式。本文一開始的程式碼可以改寫為:
int run(int a, int b) {
int result1;
std::thread t([&]() { result1 = subtask1(a); });
int result2;
try { // Added
result2 = subtask2(b);
} catch (...) { // Added
t.join();
throw;
}
if (result2 < 0) {
t.join(); // Added
return -2;
}
t.join();
if (result1 < 0) {
return -1;
}
return 0;
}
因為還要處理例外(Exception),整個程式會變得很繁瑣。我們可以寫一個 scoped_thread_join 類別:
class scoped_thread_join {
private:
std::thread* thread_;
public:
explicit scoped_thread_join(std::thread& thread) : thread_(&thread) {}
~scoped_thread_join() {
if (thread_->joinable()) {
thread_->join();
}
}
};
然後將程式改寫為:
solution_scoped_thread_join1.cpp:
int run(int a, int b) {
int result1;
std::thread t([&]() { result1 = subtask1(a); });
scoped_thread_join thread_guard(t); // Added
int result2 = subtask2(b);
if (result2 < 0) {
return -2;
}
t.join();
if (result1 < 0) {
return -1;
}
return 0;
}
或者更近一步合併 t.join():
solution_scoped_thread_join2.cpp:
int run(int a, int b) {
int result1;
std::thread t([&]() { result1 = subtask1(a); });
{
scoped_thread_join thread_guard(t);
int result2 = subtask2(b);
if (result2 < 0) {
return -2;
}
}
if (result1 < 0) {
return -1;
}
return 0;
}
解法二:改用 std::jthread
C++ 20 新增一個 std::jthread 類別(名字前面多一個 j)。和 std::thread 不同,std::jthread 會在解構函式呼叫 join 函式。所以我們也可以把原本的程式改寫為:
int run(int a, int b) {
int result1;
std::jthread t([&]() { result1 = subtask1(a); }); // Changed
int result2 = subtask2(b);
if (result2 < 0) {
return -2;
}
t.join();
if (result1 < 0) {
return -1;
}
return 0;
}
C++ 20 替代方案
然而 C++ 20 比較新。至截稿前,一些 C++ 實作還沒有 std::jthread 類別。作為替代方案,我們可以使用 Nicolai Josuttis 編寫的 jthread 函式庫:
git clone https://github.com/josuttis/jthread
然後在我們的程式加上:
#ifndef __cpp_lib_jthread
// jthread library: https://github.com/josuttis/jthread
#include "jthread.hpp"
#endif
最後以下方指令編譯:
g++ -pthread -std=c++17 -Ijthread/source solution_jthread.cpp
解法三:呼叫 detach 函式
我們也在建立執行緒之後呼叫 detach。不過為了確保物件的生命週期,我將原本傳參考的 Lambda Capture([&])改為傳值的 Lambda Capture([a, sync])。另外,我也將同步所需的資料結構定義為一個 struct並以 std::shared_ptr 讓兩個執行緒共同持有。
原本正常流程的 t.join() 也應以 Mutex 與 Condition Variable 改寫。主執行緒會以 std::unique_lock 鎖定 std::mutex 物件 sync->m 然後以 sync->cv.wait(lock, ...) 等待回傳值。而另一個執行緒會先執行 subtask1。在得到回傳值之後,它會以 std::lock_guard 鎖定 sync->m、設定回傳值、最後再以 sync->cv.notify_all() 通知主執行緒。
#include <condition_variable>
#include <memory>
#include <mutex>
struct Sync { // Added
std::mutex m;
std::condition_variable cv;
bool result1_ready = false;
int result1;
};
int run(int a, int b) {
auto sync = std::make_shared<Sync>(); // Added
std::thread t([a, sync]() { // Changed
int tmp = subtask1(a);
std::lock_guard<std::mutex> lock(sync->m); // Added
sync->result1 = tmp;
sync->result1_ready = true;
sync->cv.notify_all();
});
t.detach();
int result2 = subtask2(b);
if (result2 < 0) {
return -2;
}
std::unique_lock<std::mutex> lock(sync->m); // Added
sync->cv.wait(lock, [&]() { return sync->result1_ready; });
if (sync->result1 < 0) {
return -1;
}
return 0;
}
解法四:改用 std::async
如果覺得自己編寫 std::mutex 與 std::condition_variable 過於麻煩,我們也能以 <future> 標頭檔定義的 std::async 函式改寫:
#include <future>
int run(int a, int b) {
std::future<int> result1 = std::async(std::launch::async, subtask1, a);
int result2 = subtask2(b);
if (result2 < 0) {
return -2;
}
if (result1.get() < 0) {
return -1;
}
return 0;
}
上面的程式碼之中,std::async(std::launch::async, subtask1, a) 會建立一個執行緒執行 subtask1(a)。執行完畢之後,subtask1 的回傳值會被放進 std::future<int>。我們能以 result1.get() 取得回傳值。如果 subtask1 執行時間較長,result1.get() 會停下來等待 subtask1 的執行結果。
std::async 的底層實作也是呼叫 detach 成員函式。因此和解法三相同,我們必須確保物件的生命週期長於執行時間。
參考資料
- Scott Meyers, 2014, Effective Modern C++, O'Reilly Media, Item 37: Make std::threads unjoinable on all paths.
- cppreference.com, std::async
- P0660R10: Stop Token and Joining Thread, Rev 10





