C++ 執行緒:promise、future、packaged_task 與 async 的使用方法

今天我想要介紹 C++ 執行緒的高階 API:std::promisestd::futurestd::packaged_taskstd::async。本文的內容可以濃縮為下圖:

std::future、std::promise、std::packaged_task 與 std::async 的關聯圖

其中 std::promisestd::future 是執行緒之間的同步通道。std::packaged_task 類別樣版是「函式(Function)」或「函式物件(Function Object)」的轉接器。它會將函式的回傳值包裝為 std::future,讓我們能方便地以 std::thread 執行任何函式。std::async 函式相當於 std::packaged_taskstd::thread 的總和。

接下來我會分段介紹各個 API:

  1. std::promise 與 std::future
  2. std::packaged_task
  3. std::async

std::promise 與 std::future

std::promise<T>std::future<T> 類別樣版定義於 <future> 標頭檔。兩者共同組成一個同步通道。其中 std::promise 是送出端而 std::future 是接收端。具體的使用方法如下:

#include <future>
#include <iostream>

int main() {
  std::promise<int> p;
  std::future<int> f = p.get_future();

  p.set_value(42);
  std::cout << f.get() << std::endl;
}

在上面的範例裡,我們先建立一個 std::promise<int> 物件。其中,int 樣版參數表示這個同步通道會傳遞一個 int 物件。接著,我們呼叫 p.get_future() 取得接收端。之後,我們以 p.set_value(42) 傳送 42,再以 f.get() 取得該物件。

接著,讓我們加上一個執行緒:

#include <future>
#include <iostream>
#include <thread>

int main() {
  std::promise<int> p;
  std::future<int> f = p.get_future();

  std::thread t(
      [](std::promise<int> p) {
        p.set_value(42);
      },
      std::move(p));

  std::cout << f.get() << std::endl;
  t.join();
}

在這個範例裡,我們將 std::promise<int> 移交給另一個執行緒,由另一個執行緒傳送 int 物件,而主執行緒會以 f.get() 讀取 int 物件。如果主執行緒先執行到 f.get(),主執行緒會等待另一個執行緒,直到 p.set_value(42) 執行完畢。

另外,這個同步通道只能使用一次。如果我們多次呼叫 p.set_value(...)f.get() 它們會拋出 std::future_error 例外[1]

#include <future>
#include <iostream>
#include <thread>

int main() {
  std::promise<int> p;
  std::future<int> f = p.get_future();

  std::thread t(
      [](std::promise<int> p) {
        p.set_value(42);

        try {
          p.set_value(43);  // set the second value
        } catch (std::future_error &e) {
          std::cerr << "caught: " << e.what() << std::endl;
        }
      },
      std::move(p));

  std::cout << f.get() << std::endl;

  try {
    std::cout << f.get() << std::endl;  // get the second value
  } catch (std::future_error &e) {
    std::cerr << "caught: " << e.what() << std::endl;
  }

  t.join();
}

wait 成員函式

在接收端我們能將接收端「等待」與「讀取」拆成兩步。std::future<T> 有三個成員函式:

  • void wait():等待直到可以讀取物件。
  • future_status wait_for(const std::chrono::duration<...> &):等待直到可以讀取物件或者用完所有等待時間。
  • future_status wait_until(const std::chrono::time_point<...> &):等待直到可以讀取物作或者到達截止時間。

後兩個函式的回傳值可以是:

  • future_status::deferred:這個 std::future 物件對應到一個惰性估值(Lazy Evaluation)的送出端(參見 std::async 段落)。
  • future_status::ready:已經可以讀取物件。
  • future_status::timeout:等待超時。

舉例來說,假設另一個執行緒需要一段時間才能產生回傳值,我們希望主執行緒能在等待過程中印出一些訊息,此時我們就能使用 wait_for 成員函式:

#include <future>
#include <iostream>
#include <thread>

int main() {
  std::promise<int> p;
  std::future<int> f = p.get_future();

  std::thread t(
      [](std::promise<int> p) {
        std::this_thread::sleep_for(std::chrono::seconds(5));
        p.set_value(42);
      },
      std::move(p));

  for (int i = 0; ; ++i) {
    std::cout << "waiting attempt " << i << " ..." << std::endl;
    std::future_status status = f.wait_for(std::chrono::seconds(1));
    if (status != std::future_status::timeout) {
      break;
    }
  }
  std::cout << f.get() << std::endl;

  t.join();
}

另一個情況是我們只想要用 std::promisestd::future 同步「開始執行」的時間點,我們並不是真的要傳送一個物件。此時我們就能使用 wait 成員函式:

#include <numeric>
#include <future>
#include <iostream>
#include <thread>
#include <vector>

int main() {
  std::vector<long long int> vec;

  std::promise<void> p;

  std::thread t(
      [&vec](std::future<void> f) {
        std::cout << "thread: started\n" << std::flush;
        f.wait();
        std::cout << "thread: start computation\n" << std::flush;
        long long int sum = std::accumulate(vec.begin(), vec.end(), 0LL);
        std::cout << "thread: end computation\n" << std::flush;
        std::cout << "sum=" << sum << std::endl;
      },
      p.get_future());

  // Initialize the data
  for (long long int i = 0; i < 1000000; ++i) {
    vec.push_back(i);
  }

  std::cout << "main: notify thread\n" << std::flush;
  p.set_value();

  t.join();
}

例外處理

我們也能使用 std::promise 傳送例外(Exception)。如果送出端要傳送一個「例外」給接收端,我們可以呼叫 std::promise<T>::set_exception。當接收端呼叫 std::future<T>::get() 的時候,get() 函式會再次拋出該例外:

#include <future>
#include <iostream>
#include <thread>

int main() {
  std::promise<int> p;
  std::future<int> f = p.get_future();

  std::thread t(
      [](std::promise<int> p) {
        try {
          throw std::runtime_error("some exception");
        } catch (...) {
          p.set_exception(std::current_exception());
        }
      },
      std::move(p));

  try {
    std::cout << f.get() << std::endl;
  } catch (std::runtime_error &exp) {
    std::cout << "main thread: caught: " << exp.what() << std::endl;
  }

  t.join();
}

值得留意的是 p.set_exception(...) 的參數型別為 std::exception_ptr,所以我們不能直接傳送 std::runtime_error 物件實例。我們必須先以 throw 述句拋出例外,之後在 catch 子句以 std::current_exception() 取得 std::exception_ptr 並呼叫 set_exception 送出例外。

std::shared_future

雖然 std::futurestd::promise 的用途是在執行緒之間傳送物件。但是 std::future 物件不能被多個執行緒同時操作。舉例來說,在下面的例子之中,執行緒 t1t2 同時呼叫 f.get()。但是因為 get() 成員函式本身不完全是 Thread-safe 的,所以下面的程式碼會有未定義行為(在我的機器上會產生 Segmentation Fault):

#include <chrono>
#include <future>
#include <iostream>
#include <sstream>
#include <thread>

int main() {
  std::promise<std::unique_ptr<int>> p;
  std::future<std::unique_ptr<int>> f = p.get_future();

  std::thread t1(
      [&f]() {
        std::cout << "t1: waiting\n" << std::flush;
        int value = *f.get();  // Race condition

        std::ostringstream ss;
        ss << "t1: " << value << "\n";
        std::cout << ss.str() << std::flush;
      });
  std::thread t2(
      [&f]() {
        std::cout << "t2: waiting\n" << std::flush;
        int value = *f.get();  // Race condition

        std::ostringstream ss;
        ss << "t2: " << value << "\n";
        std::cout << ss.str() << std::flush;
      });

  std::this_thread::sleep_for(std::chrono::seconds(1));
  p.set_value(std::make_unique<int>(42));

  t1.join();
  t2.join();

  return 0;
}

解決方法也很簡單。如果我們要讓兩個(或以上)執行緒同時當接收端,我們應該要先呼叫 std::futureshare() 成員函式。它會把 std::future 物件轉換為 std::shared_future 物件。接著,我們再複製 std::shared_future 物件,讓 t1t2 執行緒各自有一個複本:

#include <chrono>
#include <future>
#include <iostream>
#include <sstream>
#include <thread>

int main() {
  std::promise<std::unique_ptr<int>> p;
  std::future<std::unique_ptr<int>> f = p.get_future();
  std::shared_future<std::unique_ptr<int>> sf = f.share();  // Added

  std::thread t1(
      [sf]() {  // Copy sf by value
        std::cout << "t1: waiting\n" << std::flush;
        int value = *sf.get();

        std::ostringstream ss;
        ss << "t1: " << value << "\n";
        std::cout << ss.str() << std::flush;
      });
  std::thread t2(
      [sf]() {  // Copy sf by value
        std::cout << "t2: waiting\n" << std::flush;
        int value = *sf.get();

        std::ostringstream ss;
        ss << "t2: " << value << "\n";
        std::cout << ss.str() << std::flush;
      });

  std::this_thread::sleep_for(std::chrono::seconds(1));
  p.set_value(std::make_unique<int>(42));

  t1.join();
  t2.join();

  return 0;
}

std::packaged_task

std::packaged_task 類別樣版也是定義在 <future> 標頭檔。它的用途是作為「函式」或「函式物件」與 std::thread 之間的轉接器。一般而言,在考慮多執行緒之前,我們定義的函式會是:

ReturnType Function(ArgType1 arg1, ArgType2 arg2, ..., ArgTypeN argn)

然而如果將上述函式作為 std::thread 建構式的第一個參數,該函式的回傳值會被 std::thread 忽略。另外,如果上述函式拋出例外,std::thread 會直接終止整個程式。為了解決介面之間的落差,C++ 標準函式庫定義了 std::packaged_task 類別樣版。它會定義一個 get_future 成員函式用以回傳「能接收回傳值的 std::future 物件」。另外也會定義一個 operator() 成員函式用以呼叫原本的函式。

簡化的 std::packaged_task 實作如下(僅供參考,實際的實作更複雜):

#include <exception>
#include <functional>
#include <future>

template <typename Func>
class my_packaged_task;

template <typename Ret, typename... Args>
class my_packaged_task<Ret(Args...)> {
private:
  std::promise<Ret> promise_;
  std::function<Ret(Args...)> func_;

public:
  my_packaged_task(std::function<Ret(Args...)> func)
      : func_(std::move(func)) {}

  void operator()(Args&&... args) {
    try {
      promise_.set_value(func_(std::forward<Args&&>(args)...));
    } catch (...) {
      promise_.set_exception(std::current_exception());
    }
  }

  std::future<Ret> get_future() {
    return promise_.get_future();
  }
};

以下是 std::packaged_task 的使用方法:

#include <future>
#include <iostream>

int compute(int a, int b) {
  return 42 + a + b;
}

int main() {
  std::packaged_task<int(int, int)> task(compute);
  std::future<int> f = task.get_future();
  task(3, 4);
  std::cout << f.get() << std::endl;

  return 0;
}

在不改變 compute 函式的前提下,我們以 std::packaged_task 包裹 compute 函式。呼叫 task(3, 4) 之後,可以透過 f.get() 取得回傳值。

接著我們可以再加上執行緒:

#include <future>
#include <iostream>
#include <thread>

int compute(int a, int b) {
  return 42 + a + b;
}

int main() {
  std::packaged_task<int(int, int)> task(compute);
  std::future<int> f = task.get_future();

  std::thread t(std::move(task), 3, 4);  // Added
  t.detach();                            // Added

  std::cout << f.get() << std::endl;
  return 0;
}

上述程式碼將原本的 task(3, 4) 替換為建立執行緒的程式碼。因為 std::packaged_task 是 Non-copyable(不可複製)類別,因此我們必須以 std::move(task)std::packaged_task 物件轉讓給 std::thread 的建構式。接著,我們呼叫 t.detach() 以避免 std::thread 的解構式呼叫 std::terminate。另一方面,std::thread 的建構式會在另一個執行緒為我們呼叫 std::packaged_task::operator() 並執行 compute 函式。當 compute 執行完畢之後,主執行緒能透過 f.get() 接收回傳值。

std::async

最後我要介紹 std::async 函式樣版。使用 std::async 的時候,我們必須傳入一個「函式」或「函式物件」與呼叫該函式所需的參數。std::async 會在某個時間點呼叫我們傳入的函式。std::async 的呼叫者能夠通過 std::async 回傳的 std::future<T> 物件讀取傳入函式的回傳值。

std::async 有兩種不同的執行策略:

  1. std::launch::async:建立一個執行緒、執行指定工作並回傳一個 std::future<T> 物件。
  2. std::launch::deferred:直接回傳一個 std::future<T> 物件並將指定工作延遲到 std::future<T>::get() 的呼叫點。

舉例來說,前面 std::packaged_task 的範例也可以改寫為:

#include <future>
#include <iostream>

int compute(int a, int b) {
  return 42 + a + b;
}

int main() {
  std::future<int> f = std::async(std::launch::async, compute, 3, 4);
  // ...
  std::cout << f.get() << std::endl;
  return 0;
}

上述程式碼的 std::async 會建立一個執行緒,執行 compute(3, 4)。主執行緒能透過 f.get() 取得 compute(3, 4) 的回傳值。

一個簡單的 std::async(std::launch::async, ...) 實作如下(僅供參考,實際的實作更複雜):

#include <cassert>
#include <future>
#include <thread>
#include <type_traits>

template <typename Func, typename... Args>
std::future<typename std::result_of<Func(Args...)>::type>
my_async(std::launch policy, Func&& func, Args&&... args) {
  assert(policy == std::launch::async && "only async is supported");

  using Result = typename std::result_of<Func(Args...)>::type;
  std::packaged_task<Result(Args...)> task(func);
  std::future<Result> future = task.get_future();
  std::thread t(std::move(task), args...);
  t.detach();
  return future;
}

另一方面,std::launch::deferred 執行策略不會建立新的執行緒。它的工作原理是在 std::future<T> 物件內部維護一個額外的狀態。當使用者呼叫 std::future<T>::get 的時候,get 成員函式才會執行傳入的函式。如果沒有人呼叫 std::future<T>::getstd::async(std::launch::deferred, ...) 就不會執行傳入的函式。

舉例來說:在 std::launch::deferred 模式下,下面的程式碼一定會先列印 first line 之後才會列印 second line。如果刪除 f.get() 那行,整個程式一定不會列印 second line

#include <chrono>
#include <future>
#include <iostream>
#include <thread>

int compute(int a, int b) {
  std::cout << "this must be the second line\n" << std::flush;
  return 42 + a + b;
}

int main() {
  std::future<int> f = std::async(std::launch::deferred, compute, 3, 4);
  std::this_thread::sleep_for(std::chrono::seconds(1));
  std::cout << "this must be the first line\n" << std::flush;
  std::cout << f.get() << std::endl;
  return 0;
}

參考資料


[1]嚴格地說,多次呼叫 std::future<T>::get()std::promise<T>::set_value() 是未定義行為(Undefined Behavior)。不過 C++ 標準鼓勵 C++ 實作者拋出 std::future_error 例外。