C++ std::thread join() 與解構函式

前幾天在除錯的過程中,我注意到程式印出的 Stack Trace 和我的預期相去甚遠。在反覆研究之後,我發現一段有問題的程式碼。大家看得出來有什麼問題嗎?

thread_bug.cpp:

extern int subtask1(int x);
extern int subtask2(int x);

int run(int a, int b) {
  int result1;
  std::thread t([&]() { result1 = subtask1(a); });

  int result2 = subtask2(b);
  if (result2 < 0) {
    return -2;
  }

  t.join();
  if (result1 < 0) {
    return -1;
  }

  return 0;
}

問題

上面的程式碼會在 result2 小於 0 的時候直接回傳 -2。但是如果一個 std::thread 物件被解構之前沒有先呼叫 joindetach 成員函式,std::thread 的解構函式會直接呼叫 std::terminate 終止整個程式。

雖然直接呼叫 std::terminate 令人感到錯愕,但這並不是完全沒道理。如果 std::thread 解構函式自動呼叫 detach 成員函式,另一個執行緒的執行時間可能會比其參照物件的生命週期還長。這可能導致未定義行為。例如:前述範例中,另一個執行緒會參照 result1a。如果 return -2 的時候,std::thread 解構函式呼叫 t.detach()result1a 就會變成懸空參照(Dangling Reference),存取它們會產生未定義行為。

如果 std::thread 解構函式自動呼叫 join 成員函式,則有可能產生非預期的 Dead Lock。如果上述範例的 subtask1 在執行過程中停下來等待 subtask2,但是在讓 subtask1 繼續執行之前 subtask2 就回傳錯誤並觸發 std::thread 解構函式(自動呼叫 t.join()),這就會讓兩個執行緒相互等待並形成 Dead Lock。

除此之外,自動呼叫 join 成員函式也有可能拉長整個程式的執行時間。以前面的例子來說,如果 subtask2 發生錯誤,我們就不在意 subtask1 的執行結果。但是為了執行 t.join(),主執行緒必須等待另一個執行緒。在一些情況下,這是不必要的浪費。

解決方法

首先我們必檢查兩個執行緒之間的同步關係。如果兩者之間除了「建立新執行緒」與「以 join 函式合併執行緒」之外還有其他同步關係(例如:以 Mutex 或 Condition Variable 相互溝通),我們就必須重新審視兩者的同步協議。我們必須確定「等待的執行緒」絕對能得到「另一個執行緒」的回應。舉例來說:

thread_bug_complex1.cpp:

bool is_ready = false;
std::mutex m;
std::condition_variable cv;

int subtask1(int x) {
  std::unique_lock<std::mutex> lock(m);
  cv.wait(lock, []() { return is_ready; });

  // ...
}

int subtask2(int x) {
  if (!is_valid(x)) {
    return -1;  // Problemetic
  }

  {
    std::lock_guard<std::mutex> lock(m);
    is_ready = true;
    cv.notify_all();
  }

  // ...
}

上述程式在處理錯誤時會忘記通知另一方。如果你的程式有這種問題,單純地呼叫 joindetach 函式是無法解決問題的。我們必須在同步協議裡定義「錯誤狀態(Error State)」,讓另一個執行緒也能處理例外情況。例如:

thread_bug_complex2.cpp:

bool is_error = false;  // Added
bool is_ready = false;
std::mutex m;
std::condition_variable cv;

int subtask1(int x) {
  std::unique_lock<std::mutex> lock(m);
  cv.wait(lock, []() { return is_ready || is_error; });

  if (is_error) {  // Added
    // Return error early
    return -1;
  }

  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  return x;
}

int subtask2(int x) {
  if (!is_valid(x)) {
    std::lock_guard<std::mutex> lock(m);  // Added
    is_error = true;                      // Added
    cv.notify_all();                      // Added
    return -1;
  }

  {
    std::lock_guard<std::mutex> lock(m);
    is_ready = true;
    cv.notify_all();
  }

  std::this_thread::sleep_for(std::chrono::milliseconds(500));
  return x;
}

我們也能近一步思考要不要改寫整個同步流程。例如:把上面的 is_valid(x) 檢查搬出 subtask2,直接在建立執行緒 t 之前就先排除問題。不過這已經超出本文的討論範圍,以後有機會再另外介紹。

在檢查同步關係之後,我們必須思考要以 join() 或是 detach() 解決 std::thread::~thread 呼叫 std::terminate 的問題。使用 join() 會比較簡單,但是如前所述 join() 會讓主執行緒等待另一個執行緒(不論你是否在乎其執行結果)。另一方面,使用 detach() 時,我們必須確保另一個執行緒使用的物件在其執行期間都不會被解構。一個簡單的充份條件是讓另一個執行緒持有它所需的物件。如果情況複雜無法簡單地判斷,使用 join() 會是比較安全的選擇。

以下筆者分別介紹四種解法:

  1. 呼叫 join 函式
  2. 改用 std::jthread(呼叫 join 的變型)
  3. 呼叫 detach 函式
  4. 改用 std::async(呼叫 detach 的變型)

解法一:呼叫 join 函式

最直接的作法是在 std::thread::~thread 被呼叫之前呼叫 join 成員函式。本文一開始的程式碼可以改寫為:

solution_naive_join.cpp:

int run(int a, int b) {
  int result1;
  std::thread t([&]() { result1 = subtask1(a); });

  int result2;
  try {  // Added
    result2 = subtask2(b);
  } catch (...) {  // Added
    t.join();
    throw;
  }
  if (result2 < 0) {
    t.join();  // Added
    return -2;
  }

  t.join();
  if (result1 < 0) {
    return -1;
  }

  return 0;
}

因為還要處理例外(Exception),整個程式會變得很繁瑣。我們可以寫一個 scoped_thread_join 類別:

class scoped_thread_join {
private:
  std::thread* thread_;

public:
  explicit scoped_thread_join(std::thread& thread) : thread_(&thread) {}
  ~scoped_thread_join() {
    if (thread_->joinable()) {
      thread_->join();
    }
  }
};

然後將程式改寫為:

solution_scoped_thread_join1.cpp:

int run(int a, int b) {
  int result1;
  std::thread t([&]() { result1 = subtask1(a); });
  scoped_thread_join thread_guard(t);  // Added

  int result2 = subtask2(b);
  if (result2 < 0) {
    return -2;
  }

  t.join();
  if (result1 < 0) {
    return -1;
  }

  return 0;
}

或者更近一步合併 t.join()

solution_scoped_thread_join2.cpp:

int run(int a, int b) {
  int result1;
  std::thread t([&]() { result1 = subtask1(a); });

  {
    scoped_thread_join thread_guard(t);

    int result2 = subtask2(b);
    if (result2 < 0) {
      return -2;
    }
  }

  if (result1 < 0) {
    return -1;
  }

  return 0;
}

解法二:改用 std::jthread

C++ 20 新增一個 std::jthread 類別(名字前面多一個 j)。和 std::thread 不同,std::jthread 會在解構函式呼叫 join 函式。所以我們也可以把原本的程式改寫為:

solution_jthread.cpp:

int run(int a, int b) {
  int result1;
  std::jthread t([&]() { result1 = subtask1(a); });  // Changed

  int result2 = subtask2(b);
  if (result2 < 0) {
    return -2;
  }

  t.join();
  if (result1 < 0) {
    return -1;
  }

  return 0;
}

C++ 20 替代方案

然而 C++ 20 比較新。至截稿前,一些 C++ 實作還沒有 std::jthread 類別。作為替代方案,我們可以使用 Nicolai Josuttis 編寫的 jthread 函式庫:

git clone https://github.com/josuttis/jthread

然後在我們的程式加上:

#ifndef __cpp_lib_jthread
// jthread library: https://github.com/josuttis/jthread
#include "jthread.hpp"
#endif

最後以下方指令編譯:

g++ -pthread -std=c++17 -Ijthread/source solution_jthread.cpp

解法三:呼叫 detach 函式

我們也在建立執行緒之後呼叫 detach。不過為了確保物件的生命週期,我將原本傳參考的 Lambda Capture([&])改為傳值的 Lambda Capture([a, sync])。另外,我也將同步所需的資料結構定義為一個 struct並以 std::shared_ptr 讓兩個執行緒共同持有。

原本正常流程的 t.join() 也應以 Mutex 與 Condition Variable 改寫。主執行緒會以 std::unique_lock 鎖定 std::mutex 物件 sync->m 然後以 sync->cv.wait(lock, ...) 等待回傳值。而另一個執行緒會先執行 subtask1。在得到回傳值之後,它會以 std::lock_guard 鎖定 sync->m、設定回傳值、最後再以 sync->cv.notify_all() 通知主執行緒。

solution_naive_detach.cpp:

#include <condition_variable>
#include <memory>
#include <mutex>

struct Sync {  // Added
  std::mutex m;
  std::condition_variable cv;
  bool result1_ready = false;
  int result1;
};

int run(int a, int b) {
  auto sync = std::make_shared<Sync>();  // Added

  std::thread t([a, sync]() {  // Changed
    int tmp = subtask1(a);

    std::lock_guard<std::mutex> lock(sync->m);  // Added
    sync->result1 = tmp;
    sync->result1_ready = true;
    sync->cv.notify_all();
  });
  t.detach();

  int result2 = subtask2(b);
  if (result2 < 0) {
    return -2;
  }

  std::unique_lock<std::mutex> lock(sync->m);  // Added
  sync->cv.wait(lock, [&]() { return sync->result1_ready; });
  if (sync->result1 < 0) {
    return -1;
  }

  return 0;
}

解法四:改用 std::async

如果覺得自己編寫 std::mutexstd::condition_variable 過於麻煩,我們也能以 <future> 標頭檔定義的 std::async 函式改寫:

solution_async.cpp:

#include <future>

int run(int a, int b) {
  std::future<int> result1 = std::async(std::launch::async, subtask1, a);

  int result2 = subtask2(b);
  if (result2 < 0) {
    return -2;
  }

  if (result1.get() < 0) {
    return -1;
  }

  return 0;
}

上面的程式碼之中,std::async(std::launch::async, subtask1, a) 會建立一個執行緒執行 subtask1(a)。執行完畢之後,subtask1 的回傳值會被放進 std::future<int>。我們能以 result1.get() 取得回傳值。如果 subtask1 執行時間較長,result1.get() 會停下來等待 subtask1 的執行結果。

std::async 的底層實作也是呼叫 detach 成員函式。因此和解法三相同,我們必須確保物件的生命週期長於執行時間。

參考資料

  • Scott Meyers, 2014, Effective Modern C++, O'Reilly Media, Item 37: Make std::threads unjoinable on all paths.
  • cppreference.com, std::async
  • P0660R10: Stop Token and Joining Thread, Rev 10