今天我想要介紹 C++ 執行緒的高階 API:std::promise、std::future、std::packaged_task 與 std::async。本文的內容可以濃縮為下圖:
其中 std::promise 與 std::future 是執行緒之間的同步通道。std::packaged_task 類別樣版是「函式(Function)」或「函式物件(Function Object)」的轉接器。它會將函式的回傳值包裝為 std::future,讓我們能方便地以 std::thread 執行任何函式。std::async 函式相當於 std::packaged_task 與 std::thread 的總和。
接下來我會分段介紹各個 API:
std::promise 與 std::future
std::promise<T> 與 std::future<T> 類別樣版定義於 <future> 標頭檔。兩者共同組成一個同步通道。其中 std::promise 是送出端而 std::future 是接收端。具體的使用方法如下:
#include <future>
#include <iostream>
int main() {
std::promise<int> p;
std::future<int> f = p.get_future();
p.set_value(42);
std::cout << f.get() << std::endl;
}
在上面的範例裡,我們先建立一個 std::promise<int> 物件。其中,int 樣版參數表示這個同步通道會傳遞一個 int 物件。接著,我們呼叫 p.get_future() 取得接收端。之後,我們以 p.set_value(42) 傳送 42,再以 f.get() 取得該物件。
接著,讓我們加上一個執行緒:
#include <future>
#include <iostream>
#include <thread>
int main() {
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t(
[](std::promise<int> p) {
p.set_value(42);
},
std::move(p));
std::cout << f.get() << std::endl;
t.join();
}
在這個範例裡,我們將 std::promise<int> 移交給另一個執行緒,由另一個執行緒傳送 int 物件,而主執行緒會以 f.get() 讀取 int 物件。如果主執行緒先執行到 f.get(),主執行緒會等待另一個執行緒,直到 p.set_value(42) 執行完畢。
另外,這個同步通道只能使用一次。如果我們多次呼叫 p.set_value(...) 或 f.get() 它們會拋出 std::future_error 例外[1]:
#include <future>
#include <iostream>
#include <thread>
int main() {
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t(
[](std::promise<int> p) {
p.set_value(42);
try {
p.set_value(43); // set the second value
} catch (std::future_error &e) {
std::cerr << "caught: " << e.what() << std::endl;
}
},
std::move(p));
std::cout << f.get() << std::endl;
try {
std::cout << f.get() << std::endl; // get the second value
} catch (std::future_error &e) {
std::cerr << "caught: " << e.what() << std::endl;
}
t.join();
}
wait 成員函式
在接收端我們能將接收端「等待」與「讀取」拆成兩步。std::future<T> 有三個成員函式:
void wait():等待直到可以讀取物件。future_status wait_for(const std::chrono::duration<...> &):等待直到可以讀取物件或者用完所有等待時間。future_status wait_until(const std::chrono::time_point<...> &):等待直到可以讀取物作或者到達截止時間。
後兩個函式的回傳值可以是:
future_status::deferred:這個std::future物件對應到一個惰性估值(Lazy Evaluation)的送出端(參見 std::async 段落)。future_status::ready:已經可以讀取物件。future_status::timeout:等待超時。
舉例來說,假設另一個執行緒需要一段時間才能產生回傳值,我們希望主執行緒能在等待過程中印出一些訊息,此時我們就能使用 wait_for 成員函式:
#include <future>
#include <iostream>
#include <thread>
int main() {
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t(
[](std::promise<int> p) {
std::this_thread::sleep_for(std::chrono::seconds(5));
p.set_value(42);
},
std::move(p));
for (int i = 0; ; ++i) {
std::cout << "waiting attempt " << i << " ..." << std::endl;
std::future_status status = f.wait_for(std::chrono::seconds(1));
if (status != std::future_status::timeout) {
break;
}
}
std::cout << f.get() << std::endl;
t.join();
}
另一個情況是我們只想要用 std::promise 與 std::future 同步「開始執行」的時間點,我們並不是真的要傳送一個物件。此時我們就能使用 wait 成員函式:
#include <numeric>
#include <future>
#include <iostream>
#include <thread>
#include <vector>
int main() {
std::vector<long long int> vec;
std::promise<void> p;
std::thread t(
[&vec](std::future<void> f) {
std::cout << "thread: started\n" << std::flush;
f.wait();
std::cout << "thread: start computation\n" << std::flush;
long long int sum = std::accumulate(vec.begin(), vec.end(), 0LL);
std::cout << "thread: end computation\n" << std::flush;
std::cout << "sum=" << sum << std::endl;
},
p.get_future());
// Initialize the data
for (long long int i = 0; i < 1000000; ++i) {
vec.push_back(i);
}
std::cout << "main: notify thread\n" << std::flush;
p.set_value();
t.join();
}
例外處理
我們也能使用 std::promise 傳送例外(Exception)。如果送出端要傳送一個「例外」給接收端,我們可以呼叫 std::promise<T>::set_exception。當接收端呼叫 std::future<T>::get() 的時候,get() 函式會再次拋出該例外:
#include <future>
#include <iostream>
#include <thread>
int main() {
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t(
[](std::promise<int> p) {
try {
throw std::runtime_error("some exception");
} catch (...) {
p.set_exception(std::current_exception());
}
},
std::move(p));
try {
std::cout << f.get() << std::endl;
} catch (std::runtime_error &exp) {
std::cout << "main thread: caught: " << exp.what() << std::endl;
}
t.join();
}
值得留意的是 p.set_exception(...) 的參數型別為 std::exception_ptr,所以我們不能直接傳送 std::runtime_error 物件實例。我們必須先以 throw 述句拋出例外,之後在 catch 子句以 std::current_exception() 取得 std::exception_ptr 並呼叫 set_exception 送出例外。
std::packaged_task
std::packaged_task 類別樣版也是定義在 <future> 標頭檔。它的用途是作為「函式」或「函式物件」與 std::thread 之間的轉接器。一般而言,在考慮多執行緒之前,我們定義的函式會是:
ReturnType Function(ArgType1 arg1, ArgType2 arg2, ..., ArgTypeN argn)
然而如果將上述函式作為 std::thread 建構式的第一個參數,該函式的回傳值會被 std::thread 忽略。另外,如果上述函式拋出例外,std::thread 會直接終止整個程式。為了解決介面之間的落差,C++ 標準函式庫定義了 std::packaged_task 類別樣版。它會定義一個 get_future 成員函式用以回傳「能接收回傳值的 std::future 物件」。另外也會定義一個 operator() 成員函式用以呼叫原本的函式。
簡化的 std::packaged_task 實作如下(僅供參考,實際的實作更複雜):
#include <exception>
#include <functional>
#include <future>
template <typename Func>
class my_packaged_task;
template <typename Ret, typename... Args>
class my_packaged_task<Ret(Args...)> {
private:
std::promise<Ret> promise_;
std::function<Ret(Args...)> func_;
public:
my_packaged_task(std::function<Ret(Args...)> func)
: func_(std::move(func)) {}
void operator()(Args&&... args) {
try {
promise_.set_value(func_(std::forward<Args&&>(args)...));
} catch (...) {
promise_.set_exception(std::current_exception());
}
}
std::future<Ret> get_future() {
return promise_.get_future();
}
};
以下是 std::packaged_task 的使用方法:
#include <future>
#include <iostream>
int compute(int a, int b) {
return 42 + a + b;
}
int main() {
std::packaged_task<int(int, int)> task(compute);
std::future<int> f = task.get_future();
task(3, 4);
std::cout << f.get() << std::endl;
return 0;
}
在不改變 compute 函式的前提下,我們以 std::packaged_task 包裹 compute 函式。呼叫 task(3, 4) 之後,可以透過 f.get() 取得回傳值。
接著我們可以再加上執行緒:
#include <future>
#include <iostream>
#include <thread>
int compute(int a, int b) {
return 42 + a + b;
}
int main() {
std::packaged_task<int(int, int)> task(compute);
std::future<int> f = task.get_future();
std::thread t(std::move(task), 3, 4); // Added
t.detach(); // Added
std::cout << f.get() << std::endl;
return 0;
}
上述程式碼將原本的 task(3, 4) 替換為建立執行緒的程式碼。因為 std::packaged_task 是 Non-copyable(不可複製)類別,因此我們必須以 std::move(task) 將 std::packaged_task 物件轉讓給 std::thread 的建構式。接著,我們呼叫 t.detach() 以避免 std::thread 的解構式呼叫 std::terminate。另一方面,std::thread 的建構式會在另一個執行緒為我們呼叫 std::packaged_task::operator() 並執行 compute 函式。當 compute 執行完畢之後,主執行緒能透過 f.get() 接收回傳值。
std::async
最後我要介紹 std::async 函式樣版。使用 std::async 的時候,我們必須傳入一個「函式」或「函式物件」與呼叫該函式所需的參數。std::async 會在某個時間點呼叫我們傳入的函式。std::async 的呼叫者能夠通過 std::async 回傳的 std::future<T> 物件讀取傳入函式的回傳值。
std::async 有兩種不同的執行策略:
std::launch::async:建立一個執行緒、執行指定工作並回傳一個std::future<T>物件。std::launch::deferred:直接回傳一個std::future<T>物件並將指定工作延遲到std::future<T>::get()的呼叫點。
舉例來說,前面 std::packaged_task 的範例也可以改寫為:
#include <future>
#include <iostream>
int compute(int a, int b) {
return 42 + a + b;
}
int main() {
std::future<int> f = std::async(std::launch::async, compute, 3, 4);
// ...
std::cout << f.get() << std::endl;
return 0;
}
上述程式碼的 std::async 會建立一個執行緒,執行 compute(3, 4)。主執行緒能透過 f.get() 取得 compute(3, 4) 的回傳值。
一個簡單的 std::async(std::launch::async, ...) 實作如下(僅供參考,實際的實作更複雜):
#include <cassert>
#include <future>
#include <thread>
#include <type_traits>
template <typename Func, typename... Args>
std::future<typename std::result_of<Func(Args...)>::type>
my_async(std::launch policy, Func&& func, Args&&... args) {
assert(policy == std::launch::async && "only async is supported");
using Result = typename std::result_of<Func(Args...)>::type;
std::packaged_task<Result(Args...)> task(func);
std::future<Result> future = task.get_future();
std::thread t(std::move(task), args...);
t.detach();
return future;
}
另一方面,std::launch::deferred 執行策略不會建立新的執行緒。它的工作原理是在 std::future<T> 物件內部維護一個額外的狀態。當使用者呼叫 std::future<T>::get 的時候,get 成員函式才會執行傳入的函式。如果沒有人呼叫 std::future<T>::get,std::async(std::launch::deferred, ...) 就不會執行傳入的函式。
舉例來說:在 std::launch::deferred 模式下,下面的程式碼一定會先列印 first line 之後才會列印 second line。如果刪除 f.get() 那行,整個程式一定不會列印 second line。
#include <chrono>
#include <future>
#include <iostream>
#include <thread>
int compute(int a, int b) {
std::cout << "this must be the second line\n" << std::flush;
return 42 + a + b;
}
int main() {
std::future<int> f = std::async(std::launch::deferred, compute, 3, 4);
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "this must be the first line\n" << std::flush;
std::cout << f.get() << std::endl;
return 0;
}
參考資料
- cppreference.com, std::promise
- cppreference.com, std::future
- cppreference.com, std::packaged_task
- cppreference.com, std::async
| [1] | 嚴格地說,多次呼叫 std::future<T>::get() 或 std::promise<T>::set_value() 是未定義行為(Undefined Behavior)。不過 C++ 標準鼓勵 C++ 實作者拋出 std::future_error 例外。 |





