Thread, Mutex, 생성자 & 소비자 패턴

하루공부·2024년 1월 20일
0

C++

목록 보기
13/25
post-thumbnail

C++ 아이콘 제작자: Darius Dan - Flaticon


  • 프로그램의 최소 단위는 프로세스 - 보통 1개의 프로그램을 1개의 프로세스로 의미하는 경우가 많다.
  • 이 프로세스는 cpu의 코어에서 실행된다.

    코어에는 Context switching이라는 기술이 있다.
    프로그램이 1개 실행하다가 다른 프로그램으로 스위칭되어 실행되고 또 스위칭하고 이걸 반복한다.

    • cpu 코어에서 돌아가는 프로그램 단위를 쓰레드라고 한다.
      cpu의 코어 1개는 1번에 1개의 쓰레드에게 명령한다.

      cpu에서 프로세스(프로그램)이 1개 실행될 때 쓰레드를 1개 아니면 여래개로 구성될 수 있다. ==> 여러개로 구성된다면 멀티 쓰레드 프로그램이다.

  • 쓰레드와 프로세스의 가장 큰 차이점은 프로세스들은 서로 메모리를 공유하지 않는다.

    프로세스 , 프로그램은 서로 메모리를 공유하지 않지만 같은 프로세스 안에 있는 쓰레드 끼리는 공유가 가능하다.


  • 왜 멀티 쓰레드를 사용하냐??

    1. 병렬화
      • 여러개의 쓰레드로 작업을 빠르게 수행한다.
        10000번을 수행하는 작업이 있을데 10개의 쓰레드가 1000개씩 나누어 작업을 하면 더 빠르다.

        하지만 병렬화를 적용 못 하는 작업이 있다.

        • 바로 연산들 간의 의존 관계가 있을 때.
          ==> 어떠한 A연산이 이루어지기 위해선 B연산이 이루어져야함( 그 값을 사용하기 때문)
          그렇다면 B연산이 끝날 때 까지 A연산은 못하고 기다려야함
          ==> 병렬화 하지 않는 것과 차이가 없음.

          이것을 A가 B에 의존한다라고 한다.
          연산간 의존 관계가 많을수록 병렬화가 어렵다.

    1. 오래 걸리는 작업이 있을 때

      A작업을 하는데 많은 시간이 걸린다.
      그 시간이면 많은 연산을 할 수 있을 것이다.

      멀티 쓰레드를 사용한다면 기다리는 시간없이 cpu를 최대 활용이 가능하다.


  • cpu 연산

    cpu는 데이터를 보통 메모리가 아닌 레지스터에서 가지고 온다.
    메모리에서 가져오면 시간이 걸리기 때문에 자주 사용하는 데이터를 가까운 레지스터에 저장하여 연산.
    만약 레지스터에 필요한 데이터가 없을 때는 메모리에서 가져온뒤 레지스터에 저장하여 연산.



std::thread

  • C++11 부터 추가된 쓰레드 thread 헤더 파일 필요함
  • thread 객체를 생성하는 순간 끝 ex) thread t1(func1);

    생성된 t1( = 쓰레드) 객체에 인자로 함수 func1을 전달해 쓰레드에서 실행한다.

     thread t1(func1);
     thread t2(func2);
     thread t3(func3); // 각각의 함수가 다른 쓰레드에서 실행됨.
  • 한 가지 중요한 사실은 이 쓰레드 들이 CPU 코어에 어떻게 할당되고 또 언제 컨텍스트 스위치를 할 지는 전적으로 운영체제의 마음에 달려있다
    • 쓰레드 3개를 만들었다고 각기 다른 코어에 할당될지 1개의 코어에 할당되어 컨텍스트 스위칭을 할지 모른다
    • 운영체제가 쓰레드들을 어떤 코어에 할당하고, 또 어떤 순서로 스케쥴 할지는
      그 때 그 때 마다 상황에 맞게 바뀌기 때문에 그결과를 정확히 예측할 수 없다.

  • 자원이 할당된 쓰레드는 작업이 끝난후 자원을 해제해야한다
    ==>그렇지 않으면 자원이 낭비한체 존재한다
    ==> join함수로 할당된 자원을 해제함.

쓰레드의 상태에는 detached 상태와 joinable 상태가 있다.

기본적으로 joinable로 설정된다.


join

  • join은 해당하는 쓰레드들(joinable)이 실행을 종료하면 리턴하는 함수 ex) t1.join();
t1.join();
t2.join();
t3.join();

t2가 이미 먼저 종료 되었다면 t1.join이 끝나고 t2.join 할 때 이미 종료된 상태라면 바로 함수가 리턴됨

  • 만약에 join 을 하지 않는다면 컴파일 에러가 뜬다
  • C++ 표준에 따르면, join 또는 detach 되지 않는 쓰레드들의 소멸자가 호출된다면 예외를 발생시키도록 명시되어 있다.

detach

  • detach()함수로 쓰레드 상태를 바꾼다. ==> detached 상태
  • detached의 쓰레드는 join을 안해도 끝날 때 자원이 알아서 해제된다.
    ==> 사실 해당 쓰레드는 백그라운드에서 돌아간다.

그러면 모든 쓰레드를 detached로 두면 안됨??

그런데 작업이 알아서 진행해서 알아서 혼자 종료해서 해당 작업 결과물을 얻고 싶어도
이미 종료되었다면 그 값을 얻지 못한다.

메인 쓰레드에서 분리되어 백그라운드에서 작업을하는데 이게 언제 종료될지 모른다.

joinable 상태 쓰레드는 종료될 때 까지 기다려서 그 자원을 우리가 활용하고 우리가 직접
join함수로 자원을 해제하기 때문에 원하는 작업을 하고 쓰레드를 종료 시킴



  • 기본적으로 프로세스가 종료될 때, 해당 프로세스 안에 있는 모든 쓰레드들은 종료 여부와 상관없이 자동으로 종료된다.
  • 다음 예시 1 ~ 10000까지 합 구하기

    #include <cstdio>
    #include <iostream>
    #include <thread>
    #include <vector>
    using std::thread;
    using std::vector;
    void worker(vector<int>::iterator start, vector<int>::iterator end, int* result) {
    int sum = 0;
    for (auto itr = start; itr < end; ++itr) {
    sum += *itr; }
    *result = sum;
    thread::id this_id = std::this_thread::get_id(); // 쓰레드의 id 를 구한다.
    printf("쓰레드 %x 에서 %d 부터 %d 까지 계산한 결과 : %d \n", this_id, *start, *(end - 1), sum); }
    int main() {
    vector<int> data(10000);
    for (int i = 0; i < 10000; i++) {
    data[i] = i; }
    vector<int> partial_sums(4); // 각 쓰레드에서 계산된 부분 합들을 저장하는 벡터
    vector<thread> workers;
    for (int i = 0; i < 4; i++) {
    workers.push_back(thread(worker, data.begin() + i * 2500, data.begin() + (i + 1) * 2500, &partial_sums[i])); }
    for (int i = 0; i < 4; i++) {
    workers[i].join(); }
    int total = 0;
    for (int i = 0; i < 4; i++) {
    total += partial_sums[i]; }
    std::cout << "전체 합 : " << total << std::endl; }
  • 쓰레드는 리턴값 이란것이 없기 때문에 만일 어떠한 결과를 반환하고 싶다면 포인터의 형태로 전달하면 됨


  • 쓰레들을 함수에 전달하여 사용하고 싶으면 bind처럼 사용하면 된다.
    workers.push_back(thread(worker, data.begin() + i * 2500, data.begin() + (i + 1) *  2500, &partial_sums[i])); }
    ==> thread(worker, data.begin() + i * 2500, data.begin() + (i + 1) * 2500, &partial_sums[i])
    ==> thread의 생성자의 첫 번째 인자로 함수를 전달하고 해당 함수의 인자를 이어서 전달하면 끝.

  • 중간에 std::cout가 아닌 printf함수를 사용했는데 왜????

    std::cout 를 사용해보면 값이 이상하게 나온다
    <<를 실행하는 과정에서 계속 실행되는 쓰레드들이 바뀌면서 결과도 같이 섞여서 출력된 것

    반면 printf의 경우 " " 안에 있는 것을 출력할 때 컨텍스트 스위칭이 일어나도 다른 쓰레드가 그 사이에 결과를 집어넣지 못하게 막는다.


  • 앞서 쓰레들 끼리 서로 메모리를 공유한다고 했다. 그럼 같은 메모리를 서로 접근한다면 어떻게 될까???
    ==> 실제로 1~ 1000 을 4개 쓰레드에서 나누어 저장하고 4개의 값을 합쳐 출력하면 이상한 값이 나온다.

    이는 race condition(경쟁 상태)가 나타나서 그렇다.

    왜 경쟁 상태가 일어나는가?
    쓰레드1 에서 작업을 하는데 그 도중에 쓰레드2에서 작업을 해버려 결과값이 저장된다.
    근데 쓰레드2가 끝나고 나서 쓰레드1이 작업을 끝내 쓰레드2의 값위에 덮어버린다.

==> mutex로 해결해야 한다.



mutex

  • 같은 변수를 가지고 같은 작업을 동시에 여러 쓰레드가 진행하여 위 같은 문제가 일어났다.
  • 어떠한 작업에 대해서 1번에 1개의 쓰레드만 실행가능하게 만드는 mutex가 있다. (mutex 헤더파일)
#include <iostream>
#include <mutex> // mutex 를 사용하기 위해 필요
#include <thread>
#include <vector>
void worker(int& result, std::mutex& m) {
for (int i = 0; i < 10000; i++) {
m.lock();
result += 1;
m.unlock();}}
int main() {
int counter = 0;
std::mutex m; // 우리의 mutex 객체
std::vector<std::thread> workers;
for (int i = 0; i < 4; i++) {
workers.push_back(std::thread(worker, std::ref(counter), std::ref(m)));}
for (int i = 0; i < 4; i++) {
workers[i].join();}
std::cout << "Counter 최종 값 : " << counter << std::endl;}
  • mutex를 정의하고 쓰레드에서 작업되는 곳에서 lock()과 unlock()으로 1번에 1개의 쓰레드만 접근 가능하게 만든다.

    mutex의 lock()함수는 일종의 권한을 얻어오는 것

    • 1개의 쓰레드가 lock()을 사용하면 해당 쓰레드가 권한을 갖고 작업을 하는 중에는 다른 쓰레드가 해당 작업을 수행하지 못한다.
    • 작업이 끝나면 unlock()함수로 mutex객체를 반환해줘야 한다.

      mutex를 반환해주기 전까지 다른 쓰레드들은 무한정 기다린다.
      그래서 권한을 갖는 쓰레드만이 작업이 가능하다.

      • 그렇게 반환된 mutex로 다시 다른 쓰레드가 lock()으로 권한을 가져 작업을 한다.

==> lock()과 unlock()의 사이에서 오직 1개의 쓰레드만 실행하는 코드를 critical section라고 한다.


lock_guard

  • 데드락은 mutex를 얻은 쓰레드가 unlock()을 하지 않아 생기는 상황이다.
    ==> 작업이 끝나도 모든 쓰레드가 계속 기다려 작업을 하지 못한다.
  • unique_ptr 스마트 포인트도 비슷하게 자원 할당후 해제를 해야할 때 객체가 소멸될 때 스스로 소멸자로 해제를 하도록 한다.

  • 비슷한 역할을 하는것이 std::lock_guard<>이다.

std::lock_guard<std::mutex> lock(m) // m은 mutex 객체
  • lock_guard는 인자로 mutex객체를 받아 생성을 한다.
  • 이때 lock_guard의 생성자가 해당 mutex를 lock을 한다.
  • 작업이 끝나 해당 작업구역을 탈출할 때 lock_guard가 소멸되면서 unlock()을 수행한다.

  • 다음 예시를 보자
void worker1(std::mutex& m1, std::mutex& m2) {
for (int i = 0; i < 10000; i++) {
std::lock_guard<std::mutex> lock1(m1);
std::lock_guard<std::mutex> lock2(m2);// Do something}}
void worker2(std::mutex& m1, std::mutex& m2) {
for (int i = 0; i < 10000; i++) {
std::lock_guard<std::mutex> lock2(m2);
std::lock_guard<std::mutex> lock1(m1);// Do something}}
int main() {
int counter = 0;
std::mutex m1, m2; // 우리의 mutex 객체
std::thread t1(worker1, std::ref(m1), std::ref(m2));
std::thread t2(worker2, std::ref(m1), std::ref(m2));
t1.join();
t2.join();
std::cout << "끝!" << std::endl;}

worker1에서는
std::lock_guard<std::mutex> lock1(m1);
std::lock_guard<std::mutex> lock2(m2);

worker2에서는
std::lock_guard<std::mutex> lock2(m2);
std::lock_guard<std::mutex> lock1(m1);

만약에 worker1 에서 m1 을 lock 하고, worker2에서 m2 를 lock 했다
worker1 에서 m2 를 lock을 하기 위해서는 worker2에서 m2를 unlock해야 한다.
그러기 위해선 worker2는 m1을 lock을 해야 lock_guard가 unlock을 할 것이다.
==> 근데 불가능하다 왜냐면 worker1에서 m1을 lock하고 있기 때문이다.

이렇게 lock_guard를 사용해도 데드락이 발생할 수 있다.


try_lock

  • 위의 상황에서 누군가에게 우선권을 주면 해결된다.
    ==> 그럼 lock상황에서도 우위를 가진 누군가가 권한을 가져가서 작업을 한 후 순위가 낮은 친구가 작업.

  • try_lock()는 전달 받은 mutex객체를 lock() 할 수 있다면 lock() 을 하고 true를 리턴
    반대로 누군가 전달받은 객체를 lock() 하고 있다면 false 리턴

    따라서 위 상황에서 만약 worker2에서

    void worker2(std::mutex& m1, std::mutex& m2) {
    for (int i = 0; i < 10; i++) {
    while (true) {
    if (!m1.try_lock()) { // m1 이 이미 lock 되어 있다면 작업을 그냥 안함
    m2.unlock();
    continue;} ==> 게속 확인함.
    std::cout << "Worker2 Hi! " << i << std::endl;
    m1.unlock();
    m2.unlock();
    break;}}}

  • 이렇게 데드락이 발생하지 않도록 도와주는 함수들이 있다.
    하지만 그냥 처음부터 데드락이 발생되지 않도록 하자.
    • 중첩된 lock사용을 지양
    • 만인 여러개의 lock이 필요하다면 정해진 순서로 lock을 해라
      ==> 위 처럼 worker2에서 m2, m1순서가 아닌 m1, m2 순으로 lock을 했다면 ==> 데드락 발생 x


생성자 & 소비자 패턴

  • 생산자는 무언가 처리할 일을 받아오는 쓰레드
  • 소비자는 받은 일을 처리하는 쓰레드
  • 예시로 페이지를 다운 받고 그 페이지를 처리하는 생산자-소비자 패턴

    void producer(std::queue<std::string>* downloaded_pages, std::mutex* m, int index) {
    	for (int i = 0; i < 5; i++) {
    		// 웹사이트를 다운로드 하는데 걸리는 시간
    		// 각 쓰레드 별로 다운로드 하는데 걸리는 시간이 다르다.
    		std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
    		std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" + std::to_string(index) + ")\n";
    		m->lock(); // data 는 쓰레드 사이에서 공유되므로 critical section 에 넣어야 한다.
    		downloaded_pages->push(content);
    		m->unlock();
    	}
    }
    void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m, int* num_processed) {
    	while (*num_processed < 25) { // 전체 처리하는 페이지 개수가 5 * 5 = 25 개.
    		m->lock();
    		if (downloaded_pages->empty()) { // 만일 현재 다운로드한 페이지가 없다면 다시 대기.
    			m->unlock();
    			std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 10 밀리초 뒤에 다시 확인한다.
    			continue;
    		}
    		std::string content = downloaded_pages->front(); // 맨 앞의 페이지를 읽고 대기 목록에서 제거한다.
    		downloaded_pages->pop();
    		(*num_processed)++;
    		m->unlock();
    		std::cout << content;  // content 를 처리한다.
    		std::this_thread::sleep_for(std::chrono::milliseconds(80));
    	}
    }
    int main() {
    	std::queue<std::string> downloaded_pages; // 현재 다운로드한 페이지들 리스트로, 아직 처리되지 않은 것들이다.
    	std::mutex m;
    	std::vector<std::thread> producers;
    	for (int i = 0; i < 5; i++) { producers.push_back(std::thread(producer, &downloaded_pages, &m, i + 1)); }
    	int num_processed = 0;
    	std::vector<std::thread> consumers;
    	for (int i = 0; i < 3; i++) { consumers.push_back(std::thread(consumer, &downloaded_pages, &m, &num_processed)); }
    	for (int i = 0; i < 5; i++) { producers[i].join(); }
    	for (int i = 0; i < 3; i++) { consumers[i].join(); }
    }

    std::this_thread::sleep_for()는 chrono 객체를 받아서 그 시간만큼 sleep한다.
    ==> 해당 시간만큼 쓰레드에 대해 로직 실행을 멈추게 만든다.

    위 예제에서 producer에서 100ms 마다 웹사이트 정보를 큐에 추가한다.
    이 때 consumer는 downloaded_pages->empty() 으로 처리할 게 있는지 계속 10ms 마다 확인하는데 100ms에 1번 큐에 정보가 들어오니 최소 10번이나 확인을 하게 된다,.
    ==> 너무 비효율적임 ( 그만큼 cpu가 활용되지 않음)


    또 consumer의 마지막 부분에 content를 처리하는데 시간이 대충 80초로 시뮬레이션 했다.
    이 때 또다른 쓰레드가 작업을 시작할 텐데 10ms마다 unlock이 되었는지 체크함. ==> 비효율


condition_variable

  • condition_variable클래스는 다른 쓰레드가 공유 변수를 수정하고 condition_variable로 통지할 때 까지
    쓰레드를 대기 시키는 동기화 기법이다. 언제나 mutex와 연동되어 쓰레드를 안전하게 동작시킴.

    위 예제에서 조건을 만족 시킬 때 까지 sleep을 시키고 조건이 만족되면 lock을 하여 작업 수행.

    void producer(std::queue<std::string>* downloaded_pages, std::mutex* m, int index, std::condition_variable* cv) {
    	for (int i = 0; i < 5; i++) {
    		std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
    		std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" + std::to_string(index) + ")\n";
    		m->lock();
    		downloaded_pages->push(content);
    		m->unlock();
    		cv->notify_one();  // consumer 에게 content 가 준비되었음을 알린다. } }
    		void consumer(std::queue<std::string>*downloaded_pages, std::mutex * m
                          , int* num_processed, std::condition_variable * cv) {
    			while (*num_processed < 25) {
    				std::unique_lock<std::mutex> lk(*m);
    				cv->wait(lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });
    				if (*num_processed == 25) {
    					lk.unlock();
    					return;
    				}
    				std::string content = downloaded_pages->front();
    				downloaded_pages->pop();
    				(*num_processed)++;
    				lk.unlock();
    				std::cout << content;
    				std::this_thread::sleep_for(std::chrono::milliseconds(80));
    			};
    		}
    	}
    };
    int main() {
    	//내용
    	std::condition_variable cv;
    	std::vector<std::thread> producers;
    	for (int i = 0; i < 5; i++) {
    		producers.push_back(std::thread(producer, &downloaded_pages, &m, i + 1, &cv));
    	}
    	int num_processed = 0;
    	std::vector<std::thread> consumers;
    	for (int i = 0; i < 3; i++) { 
              consumers.push_back(std::thread(consumer, &downloaded_pages, &m, &num_processed, &cv)); }
    	for (int i = 0; i < 5; i++) { producers[i].join(); }
    	cv.notify_all();  // 나머지 자고 있는 쓰레드들을 모두 깨운다.
    	for (int i = 0; i < 3; i++) { consumers[i].join(); }
    }

    우선 std::condition_variable cv; 이렇게 단순하게 정의를 한다.

unique_lock

  • lock_guard과 거의 동일하다.
    • lock_guard의 경우 생성자 말고는 따로 lock을 할 수 없는데 unique_lock은 unlock이후에 lock을 다시 할 수 있다.
    • 또한 condition_variable의 함수중 wait함수가 unique_lock을 인자로 받기 때문에 필요하다.
cv->wait(lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; }); 

condition_variabled의 wiat()함수

template <class _Predicate>
void wait(unique_lock<mutex>& _Lck, _Predicate _Pred); 

wait()가 수행되면 내부적으로 스레드에 안전하게 락을 해제하고 현재 수행 중인 스레드를 중단하고
대기 스레드 리스트에 추가

인자로 어떤 조건이 참이 될 때 까지 기다릴지 해당 조건을 넣어준다.
해당 조건이 거짓이라면 넣어준 unique_lock객체를 unlock하고 영원히 sleep하게 된다.
그리고 해당 쓰레드는 누가 깨워줄 때 까지 계속 sleep한다.
==> 1가지 중요한 점은 unlock한다는 점이다.

반면 조건이 참이라면 wait함수를 빠져나가서 다음 로직을 수행한다.


cv->notify_one();

해당 함수는 wait()함수에서 조건이 거짓인 바람에 sleep하고 있던 쓰레드들중 1개를 깨워서 다시 조건을 검사하게 만든다
그 조건이 참이라면 해당 쓰레드는 작업을 한다


cv.notify_all(); // 모든 쓰레드를 깨워서 조건을 다시 검사하게 만드는 함수

만약 아직 sleep하고 있는 쓰레드가 있으면 join 작업이 정상적으로 이루어지지 않는다.


wait_for

wait함수와 기본적인 동작은 동일하며 추가적으로 2번째 인자로 시간을 입력받는다
해당 시간이 지날 때 까지 통지가 없다면 timeout을 발생하는데 이는 쓰레드의 wait을 중지시킨다 -> 깨운다
해당 시간은 std::chrono 타입이다.
ex) auto status = cv.wait_for(ul, 10s, [] {return !con.empty(); });


wait_until

waitfor 함수와 기본적으로 동일하게 작동한다
다른점은 시간이 아닌 특정 시점의 시간을 지정해서 그 시점까지 통지를 못 받으면 timeout이 발생한다.
ex) auto status = cv
.wait_until(ul, now + 10s);


참조
공부한 내용 복습

개인 공부 기록용 블로그입니다.
틀린 부분 있으다면 지적해주시면 감사하겠습니다!!

0개의 댓글