今天我想要介紹 C++ 執行緒的高階 API:std::promise
、std::future
、std::packaged_task
與 std::async
。本文的內容可以濃縮為下圖:
其中 std::promise
與 std::future
是執行緒之間的同步通道。std::packaged_task
類別樣版是「函式(Function)」或「函式物件(Function Object)」的轉接器。它會將函式的回傳值包裝為 std::future
,讓我們能方便地以 std::thread
執行任何函式。std::async
函式相當於 std::packaged_task
與 std::thread
的總和。
接下來我會分段介紹各個 API:
std::promise 與 std::future
std::promise<T>
與 std::future<T>
類別樣版定義於 <future>
標頭檔。兩者共同組成一個同步通道。其中 std::promise
是送出端而 std::future
是接收端。具體的使用方法如下:
#include <future>
#include <iostream>
int main() {
std::promise<int> p;
std::future<int> f = p.get_future();
p.set_value(42);
std::cout << f.get() << std::endl;
}
在上面的範例裡,我們先建立一個 std::promise<int>
物件。其中,int
樣版參數表示這個同步通道會傳遞一個 int
物件。接著,我們呼叫 p.get_future()
取得接收端。之後,我們以 p.set_value(42)
傳送 42
,再以 f.get()
取得該物件。
接著,讓我們加上一個執行緒:
#include <future>
#include <iostream>
#include <thread>
int main() {
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t(
[](std::promise<int> p) {
p.set_value(42);
},
std::move(p));
std::cout << f.get() << std::endl;
t.join();
}
在這個範例裡,我們將 std::promise<int>
移交給另一個執行緒,由另一個執行緒傳送 int
物件,而主執行緒會以 f.get()
讀取 int
物件。如果主執行緒先執行到 f.get()
,主執行緒會等待另一個執行緒,直到 p.set_value(42)
執行完畢。
另外,這個同步通道只能使用一次。如果我們多次呼叫 p.set_value(...)
或 f.get()
它們會拋出 std::future_error
例外[1]:
#include <future>
#include <iostream>
#include <thread>
int main() {
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t(
[](std::promise<int> p) {
p.set_value(42);
try {
p.set_value(43); // set the second value
} catch (std::future_error &e) {
std::cerr << "caught: " << e.what() << std::endl;
}
},
std::move(p));
std::cout << f.get() << std::endl;
try {
std::cout << f.get() << std::endl; // get the second value
} catch (std::future_error &e) {
std::cerr << "caught: " << e.what() << std::endl;
}
t.join();
}
wait 成員函式
在接收端我們能將接收端「等待」與「讀取」拆成兩步。std::future<T>
有三個成員函式:
void wait()
:等待直到可以讀取物件。future_status wait_for(const std::chrono::duration<...> &)
:等待直到可以讀取物件或者用完所有等待時間。future_status wait_until(const std::chrono::time_point<...> &)
:等待直到可以讀取物作或者到達截止時間。
後兩個函式的回傳值可以是:
future_status::deferred
:這個std::future
物件對應到一個惰性估值(Lazy Evaluation)的送出端(參見 std::async 段落)。future_status::ready
:已經可以讀取物件。future_status::timeout
:等待超時。
舉例來說,假設另一個執行緒需要一段時間才能產生回傳值,我們希望主執行緒能在等待過程中印出一些訊息,此時我們就能使用 wait_for
成員函式:
#include <future>
#include <iostream>
#include <thread>
int main() {
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t(
[](std::promise<int> p) {
std::this_thread::sleep_for(std::chrono::seconds(5));
p.set_value(42);
},
std::move(p));
for (int i = 0; ; ++i) {
std::cout << "waiting attempt " << i << " ..." << std::endl;
std::future_status status = f.wait_for(std::chrono::seconds(1));
if (status != std::future_status::timeout) {
break;
}
}
std::cout << f.get() << std::endl;
t.join();
}
另一個情況是我們只想要用 std::promise
與 std::future
同步「開始執行」的時間點,我們並不是真的要傳送一個物件。此時我們就能使用 wait
成員函式:
#include <numeric>
#include <future>
#include <iostream>
#include <thread>
#include <vector>
int main() {
std::vector<long long int> vec;
std::promise<void> p;
std::thread t(
[&vec](std::future<void> f) {
std::cout << "thread: started\n" << std::flush;
f.wait();
std::cout << "thread: start computation\n" << std::flush;
long long int sum = std::accumulate(vec.begin(), vec.end(), 0LL);
std::cout << "thread: end computation\n" << std::flush;
std::cout << "sum=" << sum << std::endl;
},
p.get_future());
// Initialize the data
for (long long int i = 0; i < 1000000; ++i) {
vec.push_back(i);
}
std::cout << "main: notify thread\n" << std::flush;
p.set_value();
t.join();
}
例外處理
我們也能使用 std::promise
傳送例外(Exception)。如果送出端要傳送一個「例外」給接收端,我們可以呼叫 std::promise<T>::set_exception
。當接收端呼叫 std::future<T>::get()
的時候,get()
函式會再次拋出該例外:
#include <future>
#include <iostream>
#include <thread>
int main() {
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t(
[](std::promise<int> p) {
try {
throw std::runtime_error("some exception");
} catch (...) {
p.set_exception(std::current_exception());
}
},
std::move(p));
try {
std::cout << f.get() << std::endl;
} catch (std::runtime_error &exp) {
std::cout << "main thread: caught: " << exp.what() << std::endl;
}
t.join();
}
值得留意的是 p.set_exception(...)
的參數型別為 std::exception_ptr
,所以我們不能直接傳送 std::runtime_error
物件實例。我們必須先以 throw
述句拋出例外,之後在 catch
子句以 std::current_exception()
取得 std::exception_ptr
並呼叫 set_exception
送出例外。
std::packaged_task
std::packaged_task
類別樣版也是定義在 <future>
標頭檔。它的用途是作為「函式」或「函式物件」與 std::thread
之間的轉接器。一般而言,在考慮多執行緒之前,我們定義的函式會是:
ReturnType Function(ArgType1 arg1, ArgType2 arg2, ..., ArgTypeN argn)
然而如果將上述函式作為 std::thread
建構式的第一個參數,該函式的回傳值會被 std::thread
忽略。另外,如果上述函式拋出例外,std::thread
會直接終止整個程式。為了解決介面之間的落差,C++ 標準函式庫定義了 std::packaged_task
類別樣版。它會定義一個 get_future
成員函式用以回傳「能接收回傳值的 std::future
物件」。另外也會定義一個 operator()
成員函式用以呼叫原本的函式。
簡化的 std::packaged_task
實作如下(僅供參考,實際的實作更複雜):
#include <exception>
#include <functional>
#include <future>
template <typename Func>
class my_packaged_task;
template <typename Ret, typename... Args>
class my_packaged_task<Ret(Args...)> {
private:
std::promise<Ret> promise_;
std::function<Ret(Args...)> func_;
public:
my_packaged_task(std::function<Ret(Args...)> func)
: func_(std::move(func)) {}
void operator()(Args&&... args) {
try {
promise_.set_value(func_(std::forward<Args&&>(args)...));
} catch (...) {
promise_.set_exception(std::current_exception());
}
}
std::future<Ret> get_future() {
return promise_.get_future();
}
};
以下是 std::packaged_task
的使用方法:
#include <future>
#include <iostream>
int compute(int a, int b) {
return 42 + a + b;
}
int main() {
std::packaged_task<int(int, int)> task(compute);
std::future<int> f = task.get_future();
task(3, 4);
std::cout << f.get() << std::endl;
return 0;
}
在不改變 compute
函式的前提下,我們以 std::packaged_task
包裹 compute
函式。呼叫 task(3, 4)
之後,可以透過 f.get()
取得回傳值。
接著我們可以再加上執行緒:
#include <future>
#include <iostream>
#include <thread>
int compute(int a, int b) {
return 42 + a + b;
}
int main() {
std::packaged_task<int(int, int)> task(compute);
std::future<int> f = task.get_future();
std::thread t(std::move(task), 3, 4); // Added
t.detach(); // Added
std::cout << f.get() << std::endl;
return 0;
}
上述程式碼將原本的 task(3, 4)
替換為建立執行緒的程式碼。因為 std::packaged_task
是 Non-copyable(不可複製)類別,因此我們必須以 std::move(task)
將 std::packaged_task
物件轉讓給 std::thread
的建構式。接著,我們呼叫 t.detach()
以避免 std::thread
的解構式呼叫 std::terminate
。另一方面,std::thread
的建構式會在另一個執行緒為我們呼叫 std::packaged_task::operator()
並執行 compute
函式。當 compute
執行完畢之後,主執行緒能透過 f.get()
接收回傳值。
std::async
最後我要介紹 std::async
函式樣版。使用 std::async
的時候,我們必須傳入一個「函式」或「函式物件」與呼叫該函式所需的參數。std::async
會在某個時間點呼叫我們傳入的函式。std::async
的呼叫者能夠通過 std::async
回傳的 std::future<T>
物件讀取傳入函式的回傳值。
std::async
有兩種不同的執行策略:
std::launch::async
:建立一個執行緒、執行指定工作並回傳一個std::future<T>
物件。std::launch::deferred
:直接回傳一個std::future<T>
物件並將指定工作延遲到std::future<T>::get()
的呼叫點。
舉例來說,前面 std::packaged_task
的範例也可以改寫為:
#include <future>
#include <iostream>
int compute(int a, int b) {
return 42 + a + b;
}
int main() {
std::future<int> f = std::async(std::launch::async, compute, 3, 4);
// ...
std::cout << f.get() << std::endl;
return 0;
}
上述程式碼的 std::async
會建立一個執行緒,執行 compute(3, 4)
。主執行緒能透過 f.get()
取得 compute(3, 4)
的回傳值。
一個簡單的 std::async(std::launch::async, ...)
實作如下(僅供參考,實際的實作更複雜):
#include <cassert>
#include <future>
#include <thread>
#include <type_traits>
template <typename Func, typename... Args>
std::future<typename std::result_of<Func(Args...)>::type>
my_async(std::launch policy, Func&& func, Args&&... args) {
assert(policy == std::launch::async && "only async is supported");
using Result = typename std::result_of<Func(Args...)>::type;
std::packaged_task<Result(Args...)> task(func);
std::future<Result> future = task.get_future();
std::thread t(std::move(task), args...);
t.detach();
return future;
}
另一方面,std::launch::deferred
執行策略不會建立新的執行緒。它的工作原理是在 std::future<T>
物件內部維護一個額外的狀態。當使用者呼叫 std::future<T>::get
的時候,get
成員函式才會執行傳入的函式。如果沒有人呼叫 std::future<T>::get
,std::async(std::launch::deferred, ...)
就不會執行傳入的函式。
舉例來說:在 std::launch::deferred
模式下,下面的程式碼一定會先列印 first line
之後才會列印 second line
。如果刪除 f.get()
那行,整個程式一定不會列印 second line
。
#include <chrono>
#include <future>
#include <iostream>
#include <thread>
int compute(int a, int b) {
std::cout << "this must be the second line\n" << std::flush;
return 42 + a + b;
}
int main() {
std::future<int> f = std::async(std::launch::deferred, compute, 3, 4);
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "this must be the first line\n" << std::flush;
std::cout << f.get() << std::endl;
return 0;
}
參考資料
- cppreference.com, std::promise
- cppreference.com, std::future
- cppreference.com, std::packaged_task
- cppreference.com, std::async
[1] | 嚴格地說,多次呼叫 std::future<T>::get() 或 std::promise<T>::set_value() 是未定義行為(Undefined Behavior)。不過 C++ 標準鼓勵 C++ 實作者拋出 std::future_error 例外。 |