C++ Thread

SOEUN CHOI·2022년 7월 6일
0

C++_study

목록 보기
14/15

씹어먹는 C++

16장 C++ 쓰레드 719p-825p


운영체제

  • 프로세스
    운영체제에서 실행되는 프로그램의 최소 단위
    CPU 코어에서 실행

  • 스케줄러 scheduler
    여러 프로세스를 돌리기 위해 컨텍스트 스위칭
    어떤 프로그램을 실행시키고, 얼마 동안 실행 시키고, 스위치 시킬지 결정

쓰레드

CPU 코어에서 돌아가는 프로그램 단위
CPU 의 코어 하나에서는 한 번에 한 개의 쓰레드의 명령을 실행
한 개의 프로세스는 최소 한 개 쓰레드로 이루어져 있으며, 여러 개의 쓰레드로 구성도 가능

  • 쓰레드와 프로세스의 가장 큰 차이점
    프로세스는 서로의 메모리를 접근할 수 없음
    그러나
    같은 프로세스 내에 쓰레드 끼리는 메모리를 공유

멀티 쓰레드 (multithread) 프로그램

여러개의 쓰레드로 구성된 프로그램

  • 병렬화(parallelize)
    어떠한 작업을 여러개의 다른 쓰레드를 이용해서 좀 더 빠르게 수행 가능
    불가능 한 경우도 있다 ex) 피보나치 수열
    연산들 간의 의존 관계가 많을 수로 병렬화 힘듦
    다른 연산의 결과와 관계 없이 독립적으로 수행시 병렬화 쉬움
    • A가 B에 의존(dependent)
      어떠한 연산 (연산 A) 을 수행하기 위해
      다른 연산 (연산 B)의 결과가 필요
  • 대기시간이 긴 작업
    CPU 시간을 낭비하지 않고 효율적 작업 처리 가능

C++ Thread

example code

//헤더 파일 <thread>

#include <iostream>
#include <thread>
using std::thread;

void func1() {
  for (int i = 0; i < 10; i++) {
  	std::cout << "쓰레드 1 작동중! \n";
  }
}
void func2() {
  for (int i = 0; i < 10; i++) {
  	std::cout << "쓰레드 2 작동중! \n";
  }
}
void func3() {
  for (int i = 0; i < 10; i++) {
  	std::cout << "쓰레드 3 작동중! \n";
  }
}
int main() {
  //thread 객체 생성
  thread t1(func1);
  thread t2(func2);
  thread t3(func3);
  
  t1.join();
  t2.join();
  t3.join();
  //or t1.detach();
}

랜덤하게 쓰레드 # 작동중!이 출력됨

운영체제 스케줄링에 따라 다름 랜덤하게 각 쓰레드 실행

운영체제가 쓰레드들을 어떤코어에 할당하고,
또 어떤 순서로 스케쥴 할지는 그 때 그 때 마다 상황에 맞게 바뀌기 때문에 그 결과를 정확히 예측할 수 없음.

  • join()
    해당하는 쓰레드들이 실행을 종료하면 리턴하는 함수
    t1 이 종료하기 전 까지 리턴하지 않음.

    사용하지 않으면
    쓰레드들의 내용이 실행되기전에
    main 함수가 종료되어서
    쓰레드 객체들 (t1, t2, t3)의 소멸자가 호출됨

    join 되거나 detach 되지 않는 쓰레드들의 소멸자가 호출시엔 예외 발생

  • detach()
    main은 종료되고
    쓰레드는 알아서 백그라운드에서 돌아감.

쓰레드에 인자 전달

포인터의 형태로 전달
리턴 값이 없기 때문.

#include <cstdio>
#include <iostream>
#include <thread>
#include <vector>
using std::thread;
using std::vector;

void worker(vector<int>::iterator start, vector<int>::iterator end, int* result) {
  int sum = 0;
  for (auto itr = start; itr < end; ++itr) {
  	sum += *itr;
  }
  *result = sum;
  
  // 쓰레드의 id 를 구한다.
  thread::id this_id = std::this_thread::get_id();
  printf("쓰레드 %x 에서 %d 부터 %d 까지 계산한 결과 : %d \n", this_id, *start,*(end - 1), sum);
}
int main() {
  vector<int> data(10000);
  for (int i = 0; i < 10000; i++) {
  	data[i] = i;
  }
  
  // 각 쓰레드에서 계산된 부분 합들을 저장하는 벡터
  vector<int> partial_sums(4);
  
  vector<thread> workers;
  for (int i = 0; i < 4; i++) {
  workers.push_back(thread(worker, data.begin() + i * 2500, data.begin() + (i + 1) * 2500, &partial_sums[i]));
  }
  
  for (int i = 0; i < 4; i++) {
  	workers[i].join();
  }
  
  int total = 0;
  for (int i = 0; i < 4; i++) {
  	total += partial_sums[i];
  }
  std::cout << "전체 합 : " << total << std::endl;
}

쓰레드 a754700 에서 0 부터 2499 까지 계산한 결과 : 3123750
쓰레드 9752700 에서 5000 부터 7499 까지 계산한 결과 : 15623750
쓰레드 9f53700 에서 2500 부터 4999 까지 계산한 결과 : 9373750
쓰레드 8f51700 에서 7500 부터 9999 까지 계산한 결과 : 21873750
전체 합 : 49995000

printf / cout

  • printf
    "..." 안에 있는 문자열을 출력할 때, 컨텍스트 스위치가 되더라도 다른 쓰레드들이 그 사이에 메세지를 집어넣지 못하게 막음

하지만

  • cout
    << 사이 제외하고 전체 한줄이 출력되는 동안 중간에 다른 쓰레드가 출력하지 못하게 막는 것이 보장 안됨

쓰레드로 메모리 동시 접근 시 주의

  • 경쟁 상태 (race condtion)
    서로 다른 쓰레드에서 같은 메모리를 공유할 때 발생할 수 있는 문제

  • CPU 연산 처리
    CPU 의 레지스터(register)에 데이터를 기록한 다음에 연산 수행

    모든 데이터들은 메모리에 저장되어 있고, 연산 할 때 할 때 마다 메모리에서 레지스터로 값을가져온 뒤에, 빠르게 연산을 하고, 다시 메모리에 가져다 놓는 식으로 작동

스케줄러에 따라 다르게 쓰레드가 실행되므로
멀티 쓰레드 프로그램의 경우 프로그램 실행 마다 그 결과가 달라질 수 있습니다.
이게 무슨 말일까요?
제대로 프로그램을 만들지 않았을 경우 디버깅이 겁나 어렵다는 뜻입니다.
ㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋ

뮤텍스 (metex)

mutex
영어의 상호 배제 (mutual exclusion)

한번에 한 쓰레드만 해당 메모리 접근을 시키기 위해 사용

한번에 한 쓰레드에서만 m 의 사용 권한 갖음
m.lock() 시 해당 뮤텍스 m 사용 권한을 갖는다
m.unlock() 시 m을 반환
만약 사용시 m.lock을 하면 무한정 대기

  • 임계 영역(critical section)
    m.lock() 과 m.unlock() 사이에 한 쓰레드만이 유일하게 실행할 수 있는 코드 부분
  • lock_guard
    뮤텍스를 인자로 받아서 생성하게 되는데,
    이 때 생성자에서 뮤텍스를 lock
    그리고 lock_guard 가 소멸될 때 알아서 lock 했던 뮤텍스를 unlock

example code

#include <iostream>
#include <mutex> // mutex 를 사용하기 위해 필요
#include <thread>
#include <vector>

void worker(int& result, std::mutex& m) {
  for (int i = 0; i < 10000; i++) {
    m.lock();
    result += 1;
    m.unlock();
  }
}
int main() {
  int counter = 0;
  std::mutex m; // 우리의 mutex 객체
  std::vector<std::thread> workers;
  for (int i = 0; i < 4; i++) {
    workers.push_back(std::thread(worker, std::ref(counter), std::ref(m)));
  }
  for (int i = 0; i < 4; i++) {
  	workers[i].join();
  }
  std::cout << "Counter 최종 값 : " << counter << std::endl;
}

Counter 최종 값 : 40000

데드락(deadlock)

결국 아무 쓰레드도 연산을 진행하지 못하게 되는 상황
방지 하기 위해 취득한 뮤텍스는 사용이 끝나면 반드시 반환 할 것.
소멸자에서 처리 가능(lock_guard 나 unique_lock 등을 이용)

void worker1(std::mutex& m1, std::mutex& m2) {
  for (int i = 0; i < 10000; i++) {
    std::lock_guard<std::mutex> lock1(m1);
    std::lock_guard<std::mutex> lock2(m2);
    // Do something
  }
}
void worker2(std::mutex& m1, std::mutex& m2) {
  for (int i = 0; i < 10000; i++) {
    std::lock_guard<std::mutex> lock2(m2);
    std::lock_guard<std::mutex> lock1(m1);
    // Do something
  }
}

worker1 에서 m2 를 lock 하기 위해서는 worker2 에서 m2 를 unlock 해야함.
하지만 그러기 위해서는 worker2 에서 m1 을 lock 해야 함.
그런데 이 역시 불가능.
왜냐하면 worker1 에 m1 을 lock 하고 있기 때문.
worker1 과 worker2 모두 이러지도 저러지도 못하는 데드락 상황.

해결 방법

  • 우선권
    한 쓰레드에게 우선권을 갖도록 할 수 있으나 기아 상태 발생 할 수 있음

    - 기아 상태(starvation)
    한 쓰레드가 다른 쓰레드에 비해 우위를 갖게 된다면, 한 쓰레드만 열심히 일하고 다른 쓰레드는 일할 수 없는 상태

    - try_lock
    이 함수는 만일 m1 을 lock 할 수 있다면 lock 을 하고 true 를 리턴
    lock 을 할 수 없다면 기다리지 않고 그냥 false 를 리턴

  • 중첩된 Lock 을 사용하는 것을 피해라

  • Lock 을 소유하고 있을 때 유저 코드를 호출하는 것을 피해라

  • Lock 들을 언제나 정해진 순서로 획득해라

생산자(Producer) 와 소비자(Consumer) 패턴

  • 생산자
    무언가 처리할 일을 받아오는 쓰레드를 의미

  • 소비자
    받은 일을 처리하는 쓰레드를 의미

example code

#include <chrono>  // std::chrono::miliseconds
#include <iostream>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>

void producer(std::queue<std::string>* downloaded_pages, std::mutex* m,
              int index) {
  for (int i = 0; i < 5; i++) {
    // 웹사이트를 다운로드 하는데 걸리는 시간이라 생각하면 된다.
    // 각 쓰레드 별로 다운로드 하는데 걸리는 시간이 다르다.
    std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
    std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" +
                          std::to_string(index) + ")\n";

    // data 는 쓰레드 사이에서 공유되므로 critical section 에 넣어야 한다.
    m->lock();
    downloaded_pages->push(content);
    m->unlock();
  }
}

void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m,
              int* num_processed) {
  // 전체 처리하는 페이지 개수가 5 * 5 = 25 개.
  while (*num_processed < 25) {
    m->lock();
    // 만일 현재 다운로드한 페이지가 없다면 다시 대기.
    if (downloaded_pages->empty()) {
      m->unlock();  // (Quiz) 여기서 unlock 을 안한다면 어떻게 될까요?

      // 10 밀리초 뒤에 다시 확인한다.
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
      continue;
    }

    // 맨 앞의 페이지를 읽고 대기 목록에서 제거한다.
    std::string content = downloaded_pages->front();
    downloaded_pages->pop();

    (*num_processed)++;
    m->unlock();

    // content 를 처리한다.
    std::cout << content;
    std::this_thread::sleep_for(std::chrono::milliseconds(80));
  }
}

int main() {
  // 현재 다운로드한 페이지들 리스트로, 아직 처리되지 않은 것들이다.
  std::queue<std::string> downloaded_pages;
  std::mutex m;

  std::vector<std::thread> producers;
  for (int i = 0; i < 5; i++) {
    producers.push_back(std::thread(producer, &downloaded_pages, &m, i + 1));
  }

  int num_processed = 0;
  std::vector<std::thread> consumers;
  for (int i = 0; i < 3; i++) {
    consumers.push_back(
        std::thread(consumer, &downloaded_pages, &m, &num_processed));
  }

  for (int i = 0; i < 5; i++) {
    producers[i].join();
  }
  for (int i = 0; i < 3; i++) {
    consumers[i].join();
  }
}
  • producer 쓰레드
    웹사이트에서 페이지를 계속 다운로드 하는 역할
    다운로드한 페이지들을 downloaded_pages 라는 큐에 저장
    ->FIFO 특성을 이용하기 위해서

  • consumer 쓰레드
    downloaded_pages 가 비어있지 않을 때 까지 계속 while 루프
    sleep 시켜서 10 밀리초 뒤에 다시 확인 하도록 -> 계속 확인하면 비효율content 를 처리
    front 를 통해서 맨 앞의 원소를 얻은 뒤에,
    pop 을 호출하면 맨 앞의 원소를 큐에서 제거

condition_variable

condition_variable wait 함수에 어떤 조건이 참이 될때 까지 기다릴지 해당 조건을 인자로 전달

-notify_one
모든 쓰레드를 깨워서 조건을 검사

example code

#include <chrono>              // std::chrono::miliseconds
#include <condition_variable>  // std::condition_variable
#include <iostream>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>

void producer(std::queue<std::string>* downloaded_pages, std::mutex* m,
              int index, std::condition_variable* cv) {
  for (int i = 0; i < 5; i++) {
    // 웹사이트를 다운로드 하는데 걸리는 시간이라 생각하면 된다.
    // 각 쓰레드 별로 다운로드 하는데 걸리는 시간이 다르다.
    std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
    std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" +
                          std::to_string(index) + ")\n";

    // data 는 쓰레드 사이에서 공유되므로 critical section 에 넣어야 한다.
    m->lock();
    downloaded_pages->push(content);
    m->unlock();

    // consumer 에게 content 가 준비되었음을 알린다.
    cv->notify_one();
  }
}

void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m,
              int* num_processed, std::condition_variable* cv) {
  while (*num_processed < 25) {
    std::unique_lock<std::mutex> lk(*m);

    cv->wait(
        lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });

    if (*num_processed == 25) {
      lk.unlock();
      return;
    }

    // 맨 앞의 페이지를 읽고 대기 목록에서 제거한다.
    std::string content = downloaded_pages->front();
    downloaded_pages->pop();

    (*num_processed)++;
    lk.unlock();

    // content 를 처리한다.
    std::cout << content;
    std::this_thread::sleep_for(std::chrono::milliseconds(80));
  }
}

int main() {
  // 현재 다운로드한 페이지들 리스트로, 아직 처리되지 않은 것들이다.
  std::queue<std::string> downloaded_pages;
  std::mutex m;
  std::condition_variable cv;

  std::vector<std::thread> producers;
  for (int i = 0; i < 5; i++) {
    producers.push_back(
        std::thread(producer, &downloaded_pages, &m, i + 1, &cv));
  }

  int num_processed = 0;
  std::vector<std::thread> consumers;
  for (int i = 0; i < 3; i++) {
    consumers.push_back(
        std::thread(consumer, &downloaded_pages, &m, &num_processed, &cv));
  }

  for (int i = 0; i < 5; i++) {
    producers[i].join();
  }

  // 나머지 자고 있는 쓰레드들을 모두 깨운다.
  cv.notify_all();

  for (int i = 0; i < 3; i++) {
    consumers[i].join();
  }
}
profile
soeun choi

0개의 댓글