9 분 소요

  • [MEC++#35] thread 기반 프로그래밍 보다 async() 기반 프로그래밍을 사용하라.(async() 참고)
    • 코드가 간결하다.
  • [MEC++#36] 비동기성이 필수일 때에는 launch::async를 지정하라.(async() 참고)
    • 그렇지 않은 경우는 쓰레드가 고갈될때 wait_for()로 쓰레드 지연이 일어났는지 검사하고, wait(), get() 등으로 쓰레드를 대기해야 한다.
  • [MEC++#38] 쓰레드 핸들 소멸자들의 다양한 행동 방식을 주의하라.
  • [MEC++#39] 단발성 사건 통신에는 void future 개체를 고려하라.
  • (C++11~) promise가 추가되어 비동기 처리를 위한 데이터를 저장할 수 있습니다.
  • (C++11~) future가 추가되어 비동기 함수가 완료될때까지 대기하고, 데이터를 추출할 수 있습니다.
  • (C++11~) shared_future가 추가되었습니다. 여러곳에서 공유할 수 있는 future 입니다.
  • (C++11~) packaged_task가 추가되었습니다. promise캡슐화하여 비동기 함수 설정만 하면 되는 유틸리티 개체 입니다.
  • (C++11~) async()가 추가되었습니다. packaged_task를 쉽게 사용할 수 있도록 만든 유틸리티 함수입니다.
  • (C++20~) counting_semaphore, binary_semaphore가 추가되었습니다. 주어진 count만큼 자원을 동시 접근할 수 있는 동기화 개체입니다.
  • (C++20~) latch가 추가되었습니다. 주어진 count0이 될때까지 대기하는 동기화 개체입니다.
  • (C++20~) barrier가 추가되었습니다. 주어진 단계가 끝날때까지 대기하는 동기화 개체입니다.
  • (C++20~) binary_semaphore를 이용하여 쓰레드를 동기화할 수 있습니다.
  • (C++20~) atomic_flag를 이용하여 쓰레드를 동기화할 수 있습니다.

개요

일반적으로 함수 호출의 흐름은 함수가 종료하고 난뒤 다음 함수를 실행합니다. 동기적(Synchronous)으로 호출되는데요,

쓰레드를 사용하면, 쓰레드에서 호출하는 함수가 동시에 실행되다 보니 비동기적(Asynchonous)으로 호출됩니다.

그래서 쓰레드에서 호출하는 함수를 동기적으로 호출하기 위해 condition_variable을 사용했었습니다.

다음 예에서 동기 함수인 Sync()는 비동기 함수 Async() 를 호출한 뒤 Async() 함수가 종료될때까지 대기함으로서 동기화를 구현합니다.

  1. condition_variable m_CV; 로 조건 변수를 생성합니다.
  2. Async()함수에서 데이터를 설정하기 전에 mutexlock을 겁니다. 작업이 끝나면, m_IsAsyncCompleted = true를 설정하고 notify_all()을 호출하여, wait()중인 곳에 알립니다.
  3. Sync() 함수에서는 Async()thread로 호출하여 비동기로 실행시키고, wait()를 실행하여 m_IsAsyncCompletedtrue가 될때까지 대기합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class A {
    int m_Data{0};
    std::condition_variable m_CV;
    bool m_IsAsyncCompleted{false}; // true면 Sync가 wait를 멈춥니다.
public:
    void Async(std::mutex& mutex) {

        { // lock
            std::unique_lock<std::mutex> lock{mutex};
            m_Data = 1; // 데이터를 설정합니다.
            m_IsAsyncCompleted = true; 
        } // unlock
        m_CV.notify_all(); 
    }

    void Sync() {
        std::mutex mutex; 

        std::thread worker{std::mem_fn(&A::Async), std::ref(*this), std::ref(mutex)};

        std::unique_lock<std::mutex> lock{mutex};
        m_CV.wait(lock, [&]() -> bool {return m_IsAsyncCompleted;}); // Async가 끝날때까지 기다립니다.
        lock.unlock();

        worker.join(); 

        EXPECT_TRUE(m_Data == 1);
    }           
};

A a{};
a.Sync();

condition_variablemutex를 이용해서 비동기 처리를 할수 있습니다만, future 개체를 이용하면 좀더 단순하게 코드를 구현할 수 있습니다.

다음은 promisefuture를 이용하여 비동기 함수 구현을 한 예입니다.

  1. promise 개체로부터 비동기 함수로 미래에 얻을 데이터인 future 개체를 생성합니다.
  2. wait() 함수로 데이터가 설정될때까지 기다립니다.
  3. 쓰레드로 실행되는 비동기 함수인 Async() 에서 promise.set_value() 함수로 데이터를 설정합니다.
  4. 그러면 대기를 중단합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void Async(std::promise<int>& promise) {
    promise.set_value(1); // 데이터를 설정합니다.
}

int Sync() {

    std::promise<int> promise;
    std::future<int> data{promise.get_future()}; // 미래에 data를 주겠다고 약속합니다.

    std::thread worker{&Async, std::ref(promise)};

    data.wait(); // 데이터를 줄때까지 기다립니다.
    worker.join(); 

    return data.get(); // get() 하면 future 개체에서 관리하던 값은 사라집니다.
} 

EXPECT_TRUE(Sync() == 1); 

(C++20~) binary_semaphore를 이용하여 쓰레드를 동기화할 수 있습니다.
(C++20~) atomic_flag를 이용하여 쓰레드를 동기화할 수 있습니다.

다음은 비동기 처리를 위한 유틸리티 개체와 함수들입니다.

항목 내용
promise (C++11~) 비동기 처리를 위한 데이터를 저장합니다.
future (C++11~) 비동기 함수가 완료될때까지 대기하고, 데이터를 추출합니다.
shared_future (C++11~) 여러곳에서 공유할 수 있는 future 입니다.
packaged_task (C++11~) promise캡슐화하여 비동기 함수 설정만 하면 되는 유틸리티 개체 입니다.
async() (C++11~) packaged_task를 쉽게 사용할 수 있도록 만든 유틸리티 함수입니다.
launch (C++11~) async() 함수에서 사용하며, 비동기 작업에 대한 정책입니다.
* async : 새 쓰레드를 생성한뒤 실행합니다.
* deferred : 호출된 쓰레드에서 실행합니다.
future_status future개체 wait_for()시 사용하는 상태입니다.
future_error (C++11~) futurepromise의 오류를 보고합니다.
future_category (C++11~) future의 에러 카테고리입니다.
future_errc (C++11~) future의 에러 코드입니다.

promise

항목 내용
operator = (C++11~) 이동 대입합니다.
복사 대입은 delete되었습니다.
swap() (C++11~) 바꿔치기 합니다.
get_future() (C++11~) future 개체를 생성합니다.
set_value() (C++11~) 데이터를 설정합니다.
set_value_at_thread_exit() (C++11~) (작성중)
set_exception() (C++11~) 예외를 설정합니다. 설정한 예외는 throw로 방출됩니다.
set_exception_at_thread_exit() (C++11~) (작성중)

future

항목 내용
operator = (C++11~) 이동 대입합니다.
복사 대입은 delete되었습니다.
share() (C++11~) shared_future로 데이터를 전송합니다.
get() (C++11~) wait() 한뒤 데이터를 추출합니다. 추출한 뒤에는 데이터가 무효화 됩니다. 따라서 여러번 호출하려면 shared_future를 이용해야 합니다.
valid() (C++11~) 아직 get()하지 않아 데이터가 유효한지 확인합니다.
wait() (C++11~) promise에서 데이터를 설정할때까지 대기합니다.
wait_for() (C++11~) 주어진 기간이 지나면 대기를 해제합니다.
wait_until() (C++11~) 주어진 시간이 되면 대기를 해제합니다.

shared_future

future의 경우 get()을 하고 나면 데이터가 추출되어 무효화됩니다만, shared_future는 계속 get()을 할 수 있습니다.

항목 내용
operator = (C++11~) 복사 대입이동 대입합니다.
get() (C++11~) wait() 한뒤 데이터를 추출합니다. 이때 데이터는 future처럼 무효화되지 않습니다.
valid() (C++11~) 데이터가 유효한지 확인합니다.
wait() (C++11~) promise에서 데이터를 설정할때까지 대기합니다.
wait_for() (C++11~) 주어진 기간이 지나면 대기를 해제합니다.
wait_until() (C++11~) 주어진 시간이 되면 대기를 해제합니다.

packaged_task

promise캡슐화하여 비동기 함수 설정만 하면 되는 유틸리티 개체 입니다.

다음 예제를 보면,

  1. Async() 함수는 promise에 설정될 데이터를 리턴합니다.
  2. thread 생성시 Async() 함수의 인자를 전달합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int Async(int val) {
    return val + 1; // promise에 설정될 데이터를 리턴합니다.
}

int Sync() {

    std::packaged_task<int(int)> task(&Async);

    std::future<int> data{task.get_future()}; // 미래에 data를 주겠다고 약속합니다.

    std::thread worker{std::move(task), 10}; // thread에 대입시 task는 복사가 안되므로, 이동 생성합니다. Async 함수의 인자를 전달합니다.

    worker.join(); 

    return data.get(); // get() 하면 future 개체에서 관리하던 값은 사라집니다.
} 
EXPECT_TRUE(Sync() == 11); 

만약 Async() 함수에서 예외를 발생시키면, 다음처럼 예외 핸들링이 가능합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int Async(int val) {
    // return val + 1; // promise에 설정될 데이터를 리턴합니다.
    throw "MyError"; // 에외 발생
}

int Sync() {

    std::packaged_task<int(int)> task(&Async);

    std::future<int> data{task.get_future()}; // 미래에 data를 주겠다고 약속합니다.

    std::thread worker{std::move(task), 10}; // thread에 대입시 task는 복사가 안되므로, 이동 생성합니다. Async 함수의 인자를 전달합니다.

    worker.join(); 

    return data.get(); // get() 하면 future 개체에서 관리하던 값은 사라집니다.
} 

try {
    EXPECT_TRUE(Sync() == 11); 
}
catch(const char* err) {
    std::cout << "Task Exception : " << err << std::endl;
    EXPECT_TRUE(err == "MyError");
}

항목 내용
= (C++11~) 이동 대입합니다.
복사 대입delete되었습니다.
swap() (C++11~) 바꿔치기 합니다.
valid() (C++11~) 데이터가 유효한지 확인합니다.
get_future() (C++11~) future 개체를 생성합니다.
operator () (C++11~) 함수를 실행합니다.
make_ready_at_thread_exit() (C++11~) (작성중)
reset() (C++11~) (작성중)

async()

async()packaged_task를 쉽게 사용할 수 있도록 만든 유틸리티 함수입니다.

다음 예제와 같이 packaged_task, thread 정의 없이 사용할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
int Async(int val) {
    return val + 1; // promise에 설정될 데이터를 리턴합니다.
}
int Sync() {

    std::future<int> data{std::async(std::launch::async, &Async, 10)}; // 미래에 data를 주겠다고 약속합니다. 내부적으로 쓰레드를 가동하고 join() 합니다.

    return data.get(); // get() 하면 future 개체에서 관리하던 값은 사라집니다.
} 

EXPECT_TRUE(Sync() == 11); 

(C++20~) counting_semaphore, binary_semaphore

counting_semaphore는 카운트를 이용하여 동기화하는 개체입니다. 카운트가 0이면 대기하고 그렇지 않으면 진행합니다. 따라서, 초기 카운트 갯수에 따라 동시에 여러개의 쓰레드가 자원에 접근할 수 있습니다.

항목 내용
counting_semaphore<maxCount>(initCount) (C++20~) 최대 maxCount이고 초기값이 initCountcounting_semaphore를 생성합니다.
release() (C++20~) 카운트를 증가시키고 자원을 사용할 수 있게 합니다.
acquire() (C++20~) 카운트를 감소시킵니다. 0이면 쓰레드는 대기합니다.
try_acquire() (C++20~) (작성중)
try_acquire_for() (C++20~) (작성중)
try_acquire_until() (C++20~) (작성중)
max (C++20~) 카운트할 수 있는 최대값입니다.

binary_semaphorecounting_semaphore<1>의 별칭입니다. 자원 사용시 1개의 쓰레드만 사용할 수 있습니다.

(C++20~) binary_semaphore를 이용한 쓰레드 동기화

binary_semaphore를 이용하면 쓰레드 동기화를 구현할 수 있습니다.

다음은 binary_semaphore을 이용하여 쓰레드 동기화를 구현한 예입니다. 초기에 count0으로 설정하고, m_IsCompleted.acquire()에서 대기하고, Async()함수에서 m_IsCompleted.release();를 하면 대기를 멈추고 진행합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
std::binary_semaphore g_IsCompleted{0}; // 0입니다. acquire()에서 대기합니다.

void Async(int& data) {
    data = 1; // 데이터를 설정합니다.

    g_IsCompleted.release(); // 1로 증가시킵니다.
}

int Sync() {

    int data{0};
    std::thread worker{&Async, std::ref(data)};

    g_IsCompleted.acquire(); // 카운트가 0보다 커질때까지 대기합니다.
    worker.join(); 

    return data; 
}  

EXPECT_TRUE(Sync() == 1);

(C++20~) latch

latchcounting_semaphore처럼 카운트를 이용하여 동기화하는 개체입니다. counting_semaphore처럼 카운트가 0이면 대기하고 그렇지 않으면 진행합니다.

항목 내용
latch(initCount) (C++20~) initCountlatch를 생성합니다.
wait() (C++20~) 카운트가 0이 될때까지 대기합니다.
count_down() (C++20~) 카운트를 감소시킵니다.
try_wait() (C++20~) (작성중)
arrive_and_wait() (C++20~) 카운트를 감소시키고 0이 될때까지 대기합니다.
max (C++20~) 카운트할 수 있는 최대값입니다.

다음 예는 latch의 사용 예입니다.

  1. #1 : 모든 Async()작업에서 숫자를 감소시키고 0이 되기를 기다립니다.
  2. #2 : 카운트가 0이 되면 End메시지를 출력합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
std::latch g_IsAsyncComplated{5}; // 최대 5개가 동시에 진행됩니다.
std::mutex messageMutex;

// std::cout 시 쓰레드 경쟁에 출력이 뒤섞이지 않도록 뮤텍스를 사용합니다.
void SyncMessage(int i, const char* str) {
    std::lock_guard<std::mutex> lock(messageMutex); // 유효 범위를 벗어나면 unlock을 호출합니다.

    std::cout << std::format("Work {} : {}", i, str) << std::endl;
}
void Async(int i) {
    SyncMessage(i, "First");

    g_IsAsyncComplated.arrive_and_wait(); // #1. 숫자를 감소시키고 0이 될때까지 대기합니다.

    SyncMessage(i, "Last"); // #2
}


std::vector<std::thread> v;

for (int i{0}; i < 5; ++i) {
    v.emplace_back(&Async, i);
}

for (auto& work : v) {
    work.join();
}

실행 결과는 다음과 같습니다. 각 쓰레드가 뒤섞여서 실행되지만, First - Last는 뒤섞이지 않은 것을 알 수 있습니다.

1
2
3
4
5
6
7
8
9
10
Work 0 : First
Work 1 : Start
Work 3 : First
Work 4 : First
Work 2 : First
Work 2 : Last
Work 4 : Last
Work 1 : Last
Work 0 : Last
Work 3 : Last

(C++20~) barrier

barrierlatch와 유사하게 카운트를 이용하여 동기화하는 개체이며, latch와 다르게 여러번 사용할 수 있습니다.

항목 내용
barrier(initCount, CompletionFunction) (C++20~) initCountbarrier를 생성합니다.
wait() (C++20~) 단계가 끝날때까지 대기합니다.
arrive() (C++20~) 카운트를 감소시킵니다.
try_wait() (C++20~) (작성중)
arrive_and_wait() (C++20~) 카운트를 감소시키고 단계가 완료 될때까지 대기합니다.
arrive_and_drop() (C++20~) 다음 단계까지 감소시킵니다.
max (C++20~) 카운트할 수 있는 최대값입니다.

다음 예에서 보면 모든 Async()에서 각 단계가 끝날때까지 대기하는 것을 확인할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
std::barrier g_IsAsyncComplated{5}; // 최대 5개가 동시에 진행됩니다.
std::mutex messageMutex;   

// std::cout 시 쓰레드 경쟁에 출력이 뒤섞이지 않도록 뮤텍스를 사용합니다.
void SyncMessage(int i, const char* str) {
    std::lock_guard<std::mutex> lock(messageMutex); // 유효 범위를 벗어나면 unlock을 호출합니다.

    std::cout << std::format("Work {} : {}", i, str) << std::endl;
}
void Async(int i) {
    SyncMessage(i, "First");
    g_IsAsyncComplated.arrive_and_wait(); // #1. 숫자를 감소시키고 현 단계가 끝날때까지 대기합니다.

    SyncMessage(i, "Second");
    g_IsAsyncComplated.arrive_and_wait(); // #2. 숫자를 감소시키고 현 단계가 끝날때까지 대기합니다.

    SyncMessage(i, "Third");
    g_IsAsyncComplated.arrive_and_wait(); // #3. 숫자를 감소시키고 현 단계가 끝날때까지 대기합니다.

    SyncMessage(i, "Last"); // #4
}

std::vector<std::thread> v;

for (int i{0}; i < 5; ++i) {
    v.emplace_back(&Async, i);
}

for (auto& work : v) {
    work.join();
}

실행 결과는 다음과 같습니다. 각 쓰레드가 뒤섞여서 실행되지만, First - Second - Third - Last는 뒤섞이지 않은 것을 알 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Work 0 : First
Work 4 : First
Work 2 : First
Work 3 : First
Work 1 : First
Work 1 : Second
Work 3 : Second
Work 4 : Second
Work 2 : Second
Work 0 : Second
Work 0 : Third
Work 4 : Third
Work 2 : Third
Work 3 : Third
Work 1 : Third
Work 1 : Last
Work 3 : Last
Work 0 : Last
Work 4 : Last
Work 2 : Last

(C++20~) atomic_flag를 이용한 쓰레드 동기화

쓰레드 동기화를 구현하기 위해

C++20 부터 atomicnotify_one(), notify_all(), wait()가 추가되어 atomic_flag를 이용한 쓰레드 동기화 구현할 수 있으며, 성능도 가장 좋습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
std::atomic_flag g_IsCompleted{}; // 기본 생성자는 false로 세팅됩니다.

void Async(int& data) {
    data = 1; // 데이터를 설정합니다.

    g_IsCompleted.test_and_set(); // true로 설정합니다.
    g_IsCompleted.notify_all(); // 값이 수정되었음을 통지합니다.
}

int Sync() {

    int data{0};
    std::thread worker{&Async, std::ref(data)};

    g_IsCompleted.wait(false); // 값이 false 이면 대기합니다.
    worker.join(); 

    return data; 
} 

EXPECT_TRUE(Sync() == 1);

태그:

카테고리:

업데이트:

댓글남기기