#28. [모던 C++ STL] future, promise, async, counting_semaphore, binary_semaphore, latch, barrier(C++11, C++20)
- [MEC++#35] thread 기반 프로그래밍 보다 async() 기반 프로그래밍을 사용하라.(async() 참고)
- 코드가 간결하다.
- [MEC++#36] 비동기성이 필수일 때에는 launch::async를 지정하라.(async() 참고)
- 그렇지 않은 경우는 쓰레드가 고갈될때
wait_for()
로 쓰레드 지연이 일어났는지 검사하고,wait()
,get()
등으로 쓰레드를 대기해야 한다.- [MEC++#38] 쓰레드 핸들 소멸자들의 다양한 행동 방식을 주의하라.
- [MEC++#39] 단발성 사건 통신에는 void future 개체를 고려하라.
- (C++11~) promise가 추가되어 비동기 처리를 위한 데이터를 저장할 수 있습니다.
- (C++11~) future가 추가되어 비동기 함수가 완료될때까지 대기하고, 데이터를 추출할 수 있습니다.
- (C++11~) shared_future가 추가되었습니다. 여러곳에서 공유할 수 있는 future 입니다.
- (C++11~) packaged_task가 추가되었습니다. promise를 캡슐화하여 비동기 함수 설정만 하면 되는 유틸리티 개체 입니다.
- (C++11~) async()가 추가되었습니다. packaged_task를 쉽게 사용할 수 있도록 만든 유틸리티 함수입니다.
- (C++20~) counting_semaphore, binary_semaphore가 추가되었습니다. 주어진
count
만큼 자원을 동시 접근할 수 있는 동기화 개체입니다.- (C++20~) latch가 추가되었습니다. 주어진
count
가0
이 될때까지 대기하는 동기화 개체입니다.- (C++20~) barrier가 추가되었습니다. 주어진 단계가 끝날때까지 대기하는 동기화 개체입니다.
- (C++20~) binary_semaphore를 이용하여 쓰레드를 동기화할 수 있습니다.
- (C++20~) atomic_flag를 이용하여 쓰레드를 동기화할 수 있습니다.
개요
일반적으로 함수 호출의 흐름은 함수가 종료하고 난뒤 다음 함수를 실행합니다. 동기적(Synchronous)으로 호출되는데요,
쓰레드를 사용하면, 쓰레드에서 호출하는 함수가 동시에 실행되다 보니 비동기적(Asynchonous)으로 호출됩니다.
그래서 쓰레드에서 호출하는 함수를 동기적으로 호출하기 위해 condition_variable을 사용했었습니다.
다음 예에서 동기 함수인 Sync()
는 비동기 함수 Async()
를 호출한 뒤 Async()
함수가 종료될때까지 대기함으로서 동기화를 구현합니다.
condition_variable m_CV;
로 조건 변수를 생성합니다.Async()
함수에서 데이터를 설정하기 전에 mutex의lock
을 겁니다. 작업이 끝나면,m_IsAsyncCompleted = true
를 설정하고notify_all()
을 호출하여,wait()
중인 곳에 알립니다.Sync()
함수에서는Async()
를 thread로 호출하여 비동기로 실행시키고,wait()
를 실행하여m_IsAsyncCompleted
가true
가 될때까지 대기합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class A {
int m_Data{0};
std::condition_variable m_CV;
bool m_IsAsyncCompleted{false}; // true면 Sync가 wait를 멈춥니다.
public:
void Async(std::mutex& mutex) {
{ // lock
std::unique_lock<std::mutex> lock{mutex};
m_Data = 1; // 데이터를 설정합니다.
m_IsAsyncCompleted = true;
} // unlock
m_CV.notify_all();
}
void Sync() {
std::mutex mutex;
std::thread worker{std::mem_fn(&A::Async), std::ref(*this), std::ref(mutex)};
std::unique_lock<std::mutex> lock{mutex};
m_CV.wait(lock, [&]() -> bool {return m_IsAsyncCompleted;}); // Async가 끝날때까지 기다립니다.
lock.unlock();
worker.join();
EXPECT_TRUE(m_Data == 1);
}
};
A a{};
a.Sync();
condition_variable과 mutex를 이용해서 비동기 처리를 할수 있습니다만, future 개체를 이용하면 좀더 단순하게 코드를 구현할 수 있습니다.
다음은 promise와 future를 이용하여 비동기 함수 구현을 한 예입니다.
- promise 개체로부터 비동기 함수로 미래에 얻을 데이터인 future 개체를 생성합니다.
wait()
함수로 데이터가 설정될때까지 기다립니다.- 쓰레드로 실행되는 비동기 함수인
Async()
에서promise.set_value()
함수로 데이터를 설정합니다. - 그러면 대기를 중단합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void Async(std::promise<int>& promise) {
promise.set_value(1); // 데이터를 설정합니다.
}
int Sync() {
std::promise<int> promise;
std::future<int> data{promise.get_future()}; // 미래에 data를 주겠다고 약속합니다.
std::thread worker{&Async, std::ref(promise)};
data.wait(); // 데이터를 줄때까지 기다립니다.
worker.join();
return data.get(); // get() 하면 future 개체에서 관리하던 값은 사라집니다.
}
EXPECT_TRUE(Sync() == 1);
(C++20~) binary_semaphore를 이용하여 쓰레드를 동기화할 수 있습니다.
(C++20~) atomic_flag를 이용하여 쓰레드를 동기화할 수 있습니다.
다음은 비동기 처리를 위한 유틸리티 개체와 함수들입니다.
항목 | 내용 |
---|---|
promise (C++11~) | 비동기 처리를 위한 데이터를 저장합니다. |
future (C++11~) | 비동기 함수가 완료될때까지 대기하고, 데이터를 추출합니다. |
shared_future (C++11~) | 여러곳에서 공유할 수 있는 future 입니다. |
packaged_task (C++11~) | promise를 캡슐화하여 비동기 함수 설정만 하면 되는 유틸리티 개체 입니다. |
async() (C++11~) | packaged_task를 쉽게 사용할 수 있도록 만든 유틸리티 함수입니다. |
launch (C++11~) |
async() 함수에서 사용하며, 비동기 작업에 대한 정책입니다. * async : 새 쓰레드를 생성한뒤 실행합니다.* deferred : 호출된 쓰레드에서 실행합니다. |
future_status |
future개체 wait_for() 시 사용하는 상태입니다. |
future_error (C++11~) |
future나 promise의 오류를 보고합니다. |
future_category (C++11~) |
future의 에러 카테고리입니다. |
future_errc (C++11~) |
future의 에러 코드입니다. |
promise
항목 | 내용 |
---|---|
operator = (C++11~) |
이동 대입합니다. 복사 대입은 delete 되었습니다. |
swap() (C++11~) |
바꿔치기 합니다. |
get_future() (C++11~) |
future 개체를 생성합니다. |
set_value() (C++11~) |
데이터를 설정합니다. |
set_value_at_thread_exit() (C++11~) |
(작성중) |
set_exception() (C++11~) |
예외를 설정합니다. 설정한 예외는 throw로 방출됩니다. |
set_exception_at_thread_exit() (C++11~) |
(작성중) |
future
항목 | 내용 |
---|---|
operator = (C++11~) |
이동 대입합니다. 복사 대입은 delete 되었습니다. |
share() (C++11~) |
shared_future로 데이터를 전송합니다. |
get() (C++11~) |
wait() 한뒤 데이터를 추출합니다. 추출한 뒤에는 데이터가 무효화 됩니다. 따라서 여러번 호출하려면 shared_future를 이용해야 합니다. |
valid() (C++11~) |
아직 get() 하지 않아 데이터가 유효한지 확인합니다. |
wait() (C++11~) |
promise에서 데이터를 설정할때까지 대기합니다. |
wait_for() (C++11~) |
주어진 기간이 지나면 대기를 해제합니다. |
wait_until() (C++11~) |
주어진 시간이 되면 대기를 해제합니다. |
shared_future
future의 경우 get()
을 하고 나면 데이터가 추출되어 무효화됩니다만, shared_future는 계속 get()
을 할 수 있습니다.
항목 | 내용 |
---|---|
operator = (C++11~) |
복사 대입과 이동 대입합니다. |
get() (C++11~) |
wait() 한뒤 데이터를 추출합니다. 이때 데이터는 future처럼 무효화되지 않습니다. |
valid() (C++11~) |
데이터가 유효한지 확인합니다. |
wait() (C++11~) |
promise에서 데이터를 설정할때까지 대기합니다. |
wait_for() (C++11~) |
주어진 기간이 지나면 대기를 해제합니다. |
wait_until() (C++11~) |
주어진 시간이 되면 대기를 해제합니다. |
packaged_task
promise를 캡슐화하여 비동기 함수 설정만 하면 되는 유틸리티 개체 입니다.
다음 예제를 보면,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int Async(int val) {
return val + 1; // promise에 설정될 데이터를 리턴합니다.
}
int Sync() {
std::packaged_task<int(int)> task(&Async);
std::future<int> data{task.get_future()}; // 미래에 data를 주겠다고 약속합니다.
std::thread worker{std::move(task), 10}; // thread에 대입시 task는 복사가 안되므로, 이동 생성합니다. Async 함수의 인자를 전달합니다.
worker.join();
return data.get(); // get() 하면 future 개체에서 관리하던 값은 사라집니다.
}
EXPECT_TRUE(Sync() == 11);
만약 Async()
함수에서 예외를 발생시키면, 다음처럼 예외 핸들링이 가능합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int Async(int val) {
// return val + 1; // promise에 설정될 데이터를 리턴합니다.
throw "MyError"; // 에외 발생
}
int Sync() {
std::packaged_task<int(int)> task(&Async);
std::future<int> data{task.get_future()}; // 미래에 data를 주겠다고 약속합니다.
std::thread worker{std::move(task), 10}; // thread에 대입시 task는 복사가 안되므로, 이동 생성합니다. Async 함수의 인자를 전달합니다.
worker.join();
return data.get(); // get() 하면 future 개체에서 관리하던 값은 사라집니다.
}
try {
EXPECT_TRUE(Sync() == 11);
}
catch(const char* err) {
std::cout << "Task Exception : " << err << std::endl;
EXPECT_TRUE(err == "MyError");
}
항목 | 내용 |
---|---|
= (C++11~) |
이동 대입합니다. 복사 대입은 delete 되었습니다. |
swap() (C++11~) |
바꿔치기 합니다. |
valid() (C++11~) |
데이터가 유효한지 확인합니다. |
get_future() (C++11~) |
future 개체를 생성합니다. |
operator () (C++11~) |
함수를 실행합니다. |
make_ready_at_thread_exit() (C++11~) |
(작성중) |
reset() (C++11~) |
(작성중) |
async()
async()는 packaged_task를 쉽게 사용할 수 있도록 만든 유틸리티 함수입니다.
다음 예제와 같이 packaged_task, thread 정의 없이 사용할 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
int Async(int val) {
return val + 1; // promise에 설정될 데이터를 리턴합니다.
}
int Sync() {
std::future<int> data{std::async(std::launch::async, &Async, 10)}; // 미래에 data를 주겠다고 약속합니다. 내부적으로 쓰레드를 가동하고 join() 합니다.
return data.get(); // get() 하면 future 개체에서 관리하던 값은 사라집니다.
}
EXPECT_TRUE(Sync() == 11);
(C++20~) counting_semaphore, binary_semaphore
counting_semaphore는 카운트를 이용하여 동기화하는 개체입니다. 카운트가 0
이면 대기하고 그렇지 않으면 진행합니다. 따라서, 초기 카운트 갯수에 따라 동시에 여러개의 쓰레드가 자원에 접근할 수 있습니다.
항목 | 내용 |
---|---|
counting_semaphore<maxCount>(initCount) (C++20~) |
최대 maxCount 이고 초기값이 initCount 인 counting_semaphore를 생성합니다. |
release() (C++20~) |
카운트를 증가시키고 자원을 사용할 수 있게 합니다. |
acquire() (C++20~) |
카운트를 감소시킵니다. 0이면 쓰레드는 대기합니다. |
try_acquire() (C++20~) |
(작성중) |
try_acquire_for() (C++20~) |
(작성중) |
try_acquire_until() (C++20~) |
(작성중) |
max (C++20~) |
카운트할 수 있는 최대값입니다. |
binary_semaphore는 counting_semaphore<1>
의 별칭입니다. 자원 사용시 1개의 쓰레드만 사용할 수 있습니다.
(C++20~) binary_semaphore를 이용한 쓰레드 동기화
binary_semaphore를 이용하면 쓰레드 동기화를 구현할 수 있습니다.
다음은 binary_semaphore을 이용하여 쓰레드 동기화를 구현한 예입니다. 초기에 count
를 0
으로 설정하고, m_IsCompleted.acquire()
에서 대기하고, Async()
함수에서 m_IsCompleted.release();
를 하면 대기를 멈추고 진행합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
std::binary_semaphore g_IsCompleted{0}; // 0입니다. acquire()에서 대기합니다.
void Async(int& data) {
data = 1; // 데이터를 설정합니다.
g_IsCompleted.release(); // 1로 증가시킵니다.
}
int Sync() {
int data{0};
std::thread worker{&Async, std::ref(data)};
g_IsCompleted.acquire(); // 카운트가 0보다 커질때까지 대기합니다.
worker.join();
return data;
}
EXPECT_TRUE(Sync() == 1);
(C++20~) latch
latch도 counting_semaphore처럼 카운트를 이용하여 동기화하는 개체입니다. counting_semaphore처럼 카운트가 0
이면 대기하고 그렇지 않으면 진행합니다.
항목 | 내용 |
---|---|
latch(initCount) (C++20~) |
initCount 인 latch를 생성합니다. |
wait() (C++20~) |
카운트가 0 이 될때까지 대기합니다. |
count_down() (C++20~) |
카운트를 감소시킵니다. |
try_wait() (C++20~) |
(작성중) |
arrive_and_wait() (C++20~) |
카운트를 감소시키고 0이 될때까지 대기합니다. |
max (C++20~) |
카운트할 수 있는 최대값입니다. |
다음 예는 latch의 사용 예입니다.
- #1 : 모든
Async()
작업에서 숫자를 감소시키고0
이 되기를 기다립니다. - #2 : 카운트가
0
이 되면End
메시지를 출력합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
std::latch g_IsAsyncComplated{5}; // 최대 5개가 동시에 진행됩니다.
std::mutex messageMutex;
// std::cout 시 쓰레드 경쟁에 출력이 뒤섞이지 않도록 뮤텍스를 사용합니다.
void SyncMessage(int i, const char* str) {
std::lock_guard<std::mutex> lock(messageMutex); // 유효 범위를 벗어나면 unlock을 호출합니다.
std::cout << std::format("Work {} : {}", i, str) << std::endl;
}
void Async(int i) {
SyncMessage(i, "First");
g_IsAsyncComplated.arrive_and_wait(); // #1. 숫자를 감소시키고 0이 될때까지 대기합니다.
SyncMessage(i, "Last"); // #2
}
std::vector<std::thread> v;
for (int i{0}; i < 5; ++i) {
v.emplace_back(&Async, i);
}
for (auto& work : v) {
work.join();
}
실행 결과는 다음과 같습니다. 각 쓰레드가 뒤섞여서 실행되지만, First - Last는 뒤섞이지 않은 것을 알 수 있습니다.
1
2
3
4
5
6
7
8
9
10
Work 0 : First
Work 1 : Start
Work 3 : First
Work 4 : First
Work 2 : First
Work 2 : Last
Work 4 : Last
Work 1 : Last
Work 0 : Last
Work 3 : Last
(C++20~) barrier
barrier는 latch와 유사하게 카운트를 이용하여 동기화하는 개체이며, latch와 다르게 여러번 사용할 수 있습니다.
항목 | 내용 |
---|---|
barrier(initCount, CompletionFunction) (C++20~) |
initCount 인 barrier를 생성합니다. |
wait() (C++20~) |
단계가 끝날때까지 대기합니다. |
arrive() (C++20~) |
카운트를 감소시킵니다. |
try_wait() (C++20~) |
(작성중) |
arrive_and_wait() (C++20~) |
카운트를 감소시키고 단계가 완료 될때까지 대기합니다. |
arrive_and_drop() (C++20~) |
다음 단계까지 감소시킵니다. |
max (C++20~) |
카운트할 수 있는 최대값입니다. |
다음 예에서 보면 모든 Async()
에서 각 단계가 끝날때까지 대기하는 것을 확인할 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
std::barrier g_IsAsyncComplated{5}; // 최대 5개가 동시에 진행됩니다.
std::mutex messageMutex;
// std::cout 시 쓰레드 경쟁에 출력이 뒤섞이지 않도록 뮤텍스를 사용합니다.
void SyncMessage(int i, const char* str) {
std::lock_guard<std::mutex> lock(messageMutex); // 유효 범위를 벗어나면 unlock을 호출합니다.
std::cout << std::format("Work {} : {}", i, str) << std::endl;
}
void Async(int i) {
SyncMessage(i, "First");
g_IsAsyncComplated.arrive_and_wait(); // #1. 숫자를 감소시키고 현 단계가 끝날때까지 대기합니다.
SyncMessage(i, "Second");
g_IsAsyncComplated.arrive_and_wait(); // #2. 숫자를 감소시키고 현 단계가 끝날때까지 대기합니다.
SyncMessage(i, "Third");
g_IsAsyncComplated.arrive_and_wait(); // #3. 숫자를 감소시키고 현 단계가 끝날때까지 대기합니다.
SyncMessage(i, "Last"); // #4
}
std::vector<std::thread> v;
for (int i{0}; i < 5; ++i) {
v.emplace_back(&Async, i);
}
for (auto& work : v) {
work.join();
}
실행 결과는 다음과 같습니다. 각 쓰레드가 뒤섞여서 실행되지만, First - Second - Third - Last는 뒤섞이지 않은 것을 알 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Work 0 : First
Work 4 : First
Work 2 : First
Work 3 : First
Work 1 : First
Work 1 : Second
Work 3 : Second
Work 4 : Second
Work 2 : Second
Work 0 : Second
Work 0 : Third
Work 4 : Third
Work 2 : Third
Work 3 : Third
Work 1 : Third
Work 1 : Last
Work 3 : Last
Work 0 : Last
Work 4 : Last
Work 2 : Last
(C++20~) atomic_flag를 이용한 쓰레드 동기화
쓰레드 동기화를 구현하기 위해
- condition_variable을 이용하거나(동기/비동기의 개요 참고),
- atomic을 이용하거나(atomic 쓰레드 동기화 참고),
- future-promise를 이용하거나(동기/비동기의 개요 참고),
- packaged_task를 이용하거나(packaged_task 참고),
- async()를 이용하거나(async() 참고),
- binary_semaphore를 이용하거나,(binary_semaphore를 이용한 쓰레드 동기화 참고),
- latch와 barrier를 이용하는 방법이 있는데요,
C++20 부터 atomic에 notify_one(), notify_all(), wait()가 추가되어 atomic_flag를 이용한 쓰레드 동기화 구현할 수 있으며, 성능도 가장 좋습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
std::atomic_flag g_IsCompleted{}; // 기본 생성자는 false로 세팅됩니다.
void Async(int& data) {
data = 1; // 데이터를 설정합니다.
g_IsCompleted.test_and_set(); // true로 설정합니다.
g_IsCompleted.notify_all(); // 값이 수정되었음을 통지합니다.
}
int Sync() {
int data{0};
std::thread worker{&Async, std::ref(data)};
g_IsCompleted.wait(false); // 값이 false 이면 대기합니다.
worker.join();
return data;
}
EXPECT_TRUE(Sync() == 1);
댓글남기기