前幾天在除錯的過程中,我注意到程式印出的 Stack Trace 和我的預期相去甚遠。在反覆研究之後,我發現一段有問題的程式碼。大家看得出來有什麼問題嗎?
extern int subtask1(int x);
extern int subtask2(int x);
int run(int a, int b) {
int result1;
std::thread t([&]() { result1 = subtask1(a); });
int result2 = subtask2(b);
if (result2 < 0) {
return -2;
}
t.join();
if (result1 < 0) {
return -1;
}
return 0;
}
問題
上面的程式碼會在 result2
小於 0 的時候直接回傳 -2。但是如果一個 std::thread
物件被解構之前沒有先呼叫 join
或 detach
成員函式,std::thread
的解構函式會直接呼叫 std::terminate
終止整個程式。
雖然直接呼叫 std::terminate
令人感到錯愕,但這並不是完全沒道理。如果 std::thread
解構函式自動呼叫 detach
成員函式,另一個執行緒的執行時間可能會比其參照物件的生命週期還長。這可能導致未定義行為。例如:前述範例中,另一個執行緒會參照 result1
與 a
。如果 return -2
的時候,std::thread
解構函式呼叫 t.detach()
,result1
與 a
就會變成懸空參照(Dangling Reference),存取它們會產生未定義行為。
如果 std::thread
解構函式自動呼叫 join
成員函式,則有可能產生非預期的 Dead Lock。如果上述範例的 subtask1
在執行過程中停下來等待 subtask2
,但是在讓 subtask1
繼續執行之前 subtask2
就回傳錯誤並觸發 std::thread
解構函式(自動呼叫 t.join()
),這就會讓兩個執行緒相互等待並形成 Dead Lock。
除此之外,自動呼叫 join
成員函式也有可能拉長整個程式的執行時間。以前面的例子來說,如果 subtask2
發生錯誤,我們就不在意 subtask1
的執行結果。但是為了執行 t.join()
,主執行緒必須等待另一個執行緒。在一些情況下,這是不必要的浪費。
解決方法
首先我們必檢查兩個執行緒之間的同步關係。如果兩者之間除了「建立新執行緒」與「以 join
函式合併執行緒」之外還有其他同步關係(例如:以 Mutex 或 Condition Variable 相互溝通),我們就必須重新審視兩者的同步協議。我們必須確定「等待的執行緒」絕對能得到「另一個執行緒」的回應。舉例來說:
bool is_ready = false;
std::mutex m;
std::condition_variable cv;
int subtask1(int x) {
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, []() { return is_ready; });
// ...
}
int subtask2(int x) {
if (!is_valid(x)) {
return -1; // Problemetic
}
{
std::lock_guard<std::mutex> lock(m);
is_ready = true;
cv.notify_all();
}
// ...
}
上述程式在處理錯誤時會忘記通知另一方。如果你的程式有這種問題,單純地呼叫 join
或 detach
函式是無法解決問題的。我們必須在同步協議裡定義「錯誤狀態(Error State)」,讓另一個執行緒也能處理例外情況。例如:
bool is_error = false; // Added
bool is_ready = false;
std::mutex m;
std::condition_variable cv;
int subtask1(int x) {
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, []() { return is_ready || is_error; });
if (is_error) { // Added
// Return error early
return -1;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
return x;
}
int subtask2(int x) {
if (!is_valid(x)) {
std::lock_guard<std::mutex> lock(m); // Added
is_error = true; // Added
cv.notify_all(); // Added
return -1;
}
{
std::lock_guard<std::mutex> lock(m);
is_ready = true;
cv.notify_all();
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return x;
}
我們也能近一步思考要不要改寫整個同步流程。例如:把上面的 is_valid(x)
檢查搬出 subtask2
,直接在建立執行緒 t
之前就先排除問題。不過這已經超出本文的討論範圍,以後有機會再另外介紹。
在檢查同步關係之後,我們必須思考要以 join()
或是 detach()
解決 std::thread::~thread
呼叫 std::terminate
的問題。使用 join()
會比較簡單,但是如前所述 join()
會讓主執行緒等待另一個執行緒(不論你是否在乎其執行結果)。另一方面,使用 detach()
時,我們必須確保另一個執行緒使用的物件在其執行期間都不會被解構。一個簡單的充份條件是讓另一個執行緒持有它所需的物件。如果情況複雜無法簡單地判斷,使用 join()
會是比較安全的選擇。
以下筆者分別介紹四種解法:
- 呼叫
join
函式 - 改用
std::jthread
(呼叫join
的變型) - 呼叫
detach
函式 - 改用
std::async
(呼叫detach
的變型)
解法一:呼叫 join 函式
最直接的作法是在 std::thread::~thread
被呼叫之前呼叫 join
成員函式。本文一開始的程式碼可以改寫為:
int run(int a, int b) {
int result1;
std::thread t([&]() { result1 = subtask1(a); });
int result2;
try { // Added
result2 = subtask2(b);
} catch (...) { // Added
t.join();
throw;
}
if (result2 < 0) {
t.join(); // Added
return -2;
}
t.join();
if (result1 < 0) {
return -1;
}
return 0;
}
因為還要處理例外(Exception),整個程式會變得很繁瑣。我們可以寫一個 scoped_thread_join
類別:
class scoped_thread_join {
private:
std::thread* thread_;
public:
explicit scoped_thread_join(std::thread& thread) : thread_(&thread) {}
~scoped_thread_join() {
if (thread_->joinable()) {
thread_->join();
}
}
};
然後將程式改寫為:
solution_scoped_thread_join1.cpp:
int run(int a, int b) {
int result1;
std::thread t([&]() { result1 = subtask1(a); });
scoped_thread_join thread_guard(t); // Added
int result2 = subtask2(b);
if (result2 < 0) {
return -2;
}
t.join();
if (result1 < 0) {
return -1;
}
return 0;
}
或者更近一步合併 t.join()
:
solution_scoped_thread_join2.cpp:
int run(int a, int b) {
int result1;
std::thread t([&]() { result1 = subtask1(a); });
{
scoped_thread_join thread_guard(t);
int result2 = subtask2(b);
if (result2 < 0) {
return -2;
}
}
if (result1 < 0) {
return -1;
}
return 0;
}
解法二:改用 std::jthread
C++ 20 新增一個 std::jthread
類別(名字前面多一個 j
)。和 std::thread
不同,std::jthread
會在解構函式呼叫 join
函式。所以我們也可以把原本的程式改寫為:
int run(int a, int b) {
int result1;
std::jthread t([&]() { result1 = subtask1(a); }); // Changed
int result2 = subtask2(b);
if (result2 < 0) {
return -2;
}
t.join();
if (result1 < 0) {
return -1;
}
return 0;
}
C++ 20 替代方案
然而 C++ 20 比較新。至截稿前,一些 C++ 實作還沒有 std::jthread
類別。作為替代方案,我們可以使用 Nicolai Josuttis 編寫的 jthread 函式庫:
git clone https://github.com/josuttis/jthread
然後在我們的程式加上:
#ifndef __cpp_lib_jthread
// jthread library: https://github.com/josuttis/jthread
#include "jthread.hpp"
#endif
最後以下方指令編譯:
g++ -pthread -std=c++17 -Ijthread/source solution_jthread.cpp
解法三:呼叫 detach 函式
我們也在建立執行緒之後呼叫 detach
。不過為了確保物件的生命週期,我將原本傳參考的 Lambda Capture([&]
)改為傳值的 Lambda Capture([a, sync]
)。另外,我也將同步所需的資料結構定義為一個 struct
並以 std::shared_ptr
讓兩個執行緒共同持有。
原本正常流程的 t.join()
也應以 Mutex 與 Condition Variable 改寫。主執行緒會以 std::unique_lock
鎖定 std::mutex
物件 sync->m
然後以 sync->cv.wait(lock, ...)
等待回傳值。而另一個執行緒會先執行 subtask1
。在得到回傳值之後,它會以 std::lock_guard
鎖定 sync->m
、設定回傳值、最後再以 sync->cv.notify_all()
通知主執行緒。
#include <condition_variable>
#include <memory>
#include <mutex>
struct Sync { // Added
std::mutex m;
std::condition_variable cv;
bool result1_ready = false;
int result1;
};
int run(int a, int b) {
auto sync = std::make_shared<Sync>(); // Added
std::thread t([a, sync]() { // Changed
int tmp = subtask1(a);
std::lock_guard<std::mutex> lock(sync->m); // Added
sync->result1 = tmp;
sync->result1_ready = true;
sync->cv.notify_all();
});
t.detach();
int result2 = subtask2(b);
if (result2 < 0) {
return -2;
}
std::unique_lock<std::mutex> lock(sync->m); // Added
sync->cv.wait(lock, [&]() { return sync->result1_ready; });
if (sync->result1 < 0) {
return -1;
}
return 0;
}
解法四:改用 std::async
如果覺得自己編寫 std::mutex
與 std::condition_variable
過於麻煩,我們也能以 <future>
標頭檔定義的 std::async
函式改寫:
#include <future>
int run(int a, int b) {
std::future<int> result1 = std::async(std::launch::async, subtask1, a);
int result2 = subtask2(b);
if (result2 < 0) {
return -2;
}
if (result1.get() < 0) {
return -1;
}
return 0;
}
上面的程式碼之中,std::async(std::launch::async, subtask1, a)
會建立一個執行緒執行 subtask1(a)
。執行完畢之後,subtask1
的回傳值會被放進 std::future<int>
。我們能以 result1.get()
取得回傳值。如果 subtask1
執行時間較長,result1.get()
會停下來等待 subtask1
的執行結果。
std::async
的底層實作也是呼叫 detach
成員函式。因此和解法三相同,我們必須確保物件的生命週期長於執行時間。
參考資料
- Scott Meyers, 2014, Effective Modern C++, O'Reilly Media, Item 37: Make std::threads unjoinable on all paths.
- cppreference.com, std::async
- P0660R10: Stop Token and Joining Thread, Rev 10