[Modern C++] 15.5. 쓰레드 풀(ThreadPool)

윤정민·2023년 8월 13일
0

C++

목록 보기
40/46

1. 쓰레드풀

  • 여러 개의 쓰레드들이 대기하고 있다가, 할 일이 들어오면, 대기하고 있던 쓰레드들 중 하나가 이를 받아서 실행하는 디자인 패턴
  • 보통 처리해야 될 작업들을 큐(queue)에 추가하는 방식으로 사용
    • 가장 먼저 추가된 작업을 가장 먼저 시작하기 위함

2. ThreadPool 설계

  • 쓰레드들을 보관할 컨테이너

    • worker_threads_: 쓰레드들을 보관할 벡터
    • num_threads: 쓰레드의 개수
    size_t num_threads; // 총 Worker 쓰레드의 개수.
    std::vector<std::thread> worker_threads_; // Worker 쓰레드를 보관하는 벡터.
  • 작업을 보관할 컨테이너

    • 함수 포인터를 저장할 수 있는 컨테이너는 c++에 없으니 void 형의 인자를 받지않는 함수를 전달한다고 가정
    std::queue<std::function<void()>> jobs_;
  • queue의 보호

    • queue는 멀티 쓰레드 환경에서 안전하지 않기 때문에 race condition에서 보호할 장치들이 필요
    • 생성자 소비자 패턴을 생각하면 됨
    std::condition_variable cv_job_q_;
     std::mutex m_job_q_;
  • 쓰레드들 종료

    • 쓰레드들을 종료시킬 조건을 나타내는 멤버변수를 선언
      bool stop_all;

3. ThreadPool 구현

  • 생성자:worker_threads_에 쓰레드를 시작시켜줌

    • num_threads개의 쓰레드를 생성
      • 각 쓰레드들은 ThreadPool에 정의된 WorkerThread 함수를 실행
    ThreadPool::ThreadPool(size_t num_threads) : num_threads_(num_threads), stop_all(false) 
    {
    	worker_threads_.reserve(num_threads_);
      for (size_t i = 0; i < num_threads_; ++i) {
        worker_threads_.emplace_back([this]() { this->WorkerThread(); });
      }
    }
  • WorkerThread(): 쓰레드로 작업을 처리

    • cv_job_q_에서 jobs_에 원소가 있거나, stop_all이 설정될때 까지 기다림
      • 처리할 일이 있다면 jobs_.front()를 통해 가장 오래전에 추가된 작업을 얻어 해당 작업을 수행
    • 모든 작업들이 설정되어 있고 jobs_에 대기중인 작업이 없다면 쓰레드를 종료
    void ThreadPool::WorkerThread() {
      while (true) {
        std::unique_lock<std::mutex> lock(m_job_q_);
        cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
        if (stop_all && this->jobs_.empty()) {
          return;
        }
    
        // 맨 앞의 job 을 뺀다.
        std::function<void()> job = std::move(jobs_.front());
        jobs_.pop();
        lock.unlock();
    
        // 해당 job 을 수행한다 :)
        job();
      }
    }
  • EnqueueJob(std::function<void()> job) : 작업을 추가

    • stop_all이 설정된 상태라면 더이상 작업을 추가하면 안되기 때문에 예외를 던짐
    • stop_all이 아니라면 작업을 추가한 뒤 자고있는 쓰레드를 깨워줌
    void ThreadPool::EnqueueJob(std::function<void()> job) {
      if (stop_all) {
        throw std::runtime_error("ThreadPool 사용 중지됨");
      }
      {
        std::lock_guard<std::mutex> lock(m_job_q_);
        jobs_.push(std::move(job));
      }
      cv_job_q_.notify_one();
    }
  • 소멸자: stop_all을 설정한 뒤, 모든 Worker쓰레드에 알려줌

    • 모든 쓰레드들은 join
    ThreadPool::~ThreadPool() 
    {
      stop_all = true;
      cv_job_q_.notify_all();
    
      for (auto& t : worker_threads_) {
        t.join();
      }
    }

3.1.전체 코드

#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

namespace ThreadPool {
class ThreadPool {
 public:
  ThreadPool(size_t num_threads);
  ~ThreadPool();

  // job 을 추가한다.
  void EnqueueJob(std::function<void()> job);

 private:
  // 총 Worker 쓰레드의 개수.
  size_t num_threads_;
  // Worker 쓰레드를 보관하는 벡터.
  std::vector<std::thread> worker_threads_;
  // 할일들을 보관하는 job 큐.
  std::queue<std::function<void()>> jobs_;
  // 위의 job 큐를 위한 cv 와 m.
  std::condition_variable cv_job_q_;
  std::mutex m_job_q_;

  // 모든 쓰레드 종료
  bool stop_all;

  // Worker 쓰레드
  void WorkerThread();
};

ThreadPool::ThreadPool(size_t num_threads)
    : num_threads_(num_threads), stop_all(false) {
  worker_threads_.reserve(num_threads_);
  for (size_t i = 0; i < num_threads_; ++i) {
    worker_threads_.emplace_back([this]() { this->WorkerThread(); });
  }
}

void ThreadPool::WorkerThread() {
  while (true) {
    std::unique_lock<std::mutex> lock(m_job_q_);
    cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
    if (stop_all && this->jobs_.empty()) {
      return;
    }

    // 맨 앞의 job 을 뺀다.
    std::function<void()> job = std::move(jobs_.front());
    jobs_.pop();
    lock.unlock();

    // 해당 job 을 수행한다 :)
    job();
  }
}

ThreadPool::~ThreadPool() {
  stop_all = true;
  cv_job_q_.notify_all();

  for (auto& t : worker_threads_) {
    t.join();
  }
}

void ThreadPool::EnqueueJob(std::function<void()> job) {
  if (stop_all) {
    throw std::runtime_error("ThreadPool 사용 중지됨");
  }
  {
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push(std::move(job));
  }
  cv_job_q_.notify_one();
}

}  // namespace ThreadPool

void work(int t, int id) {
  printf("%d start \n", id);
  std::this_thread::sleep_for(std::chrono::seconds(t));
  printf("%d end after %ds\n", id, t);
}

int main() {
  ThreadPool::ThreadPool pool(3);

  for (int i = 0; i < 10; i++) {
    pool.EnqueueJob([i]() { work(i % 3 + 1, i); });
  }
}

4. 임의 형식의 함수 전달

  • EnqueueJob함수가 임의의 형태의 함수를 받고, 그 함수의 리턴값을 보관하는 future를 리턴하도록 변경
    • void 형식 이외의 함수도 일할 수 있음

  • EnqueueJob함수 변경
    • 가변길이 템플릿으로 임의의 길이의 인자들을 받을 수 있도록 구현
    • 전달받은 함수 f의 리턴값을 가지는 future를 리턴하도록 구현
      • 함수F의 리턴값은 std::result_of를 사용해 알 수 있음
// job 을 추가한다.
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(F f, Args... args);
  • void를 저장하는 컨테이너 jobs_에 함수 넣기
    • packaged_task를 이용해 리턴값 받기
      • return_type이라는 f의 리턴타입을 보관하는 타입을 정의
      • f의 실행결과를 저장하는 packaged_taskjob객체를 정의
        • packaged_task의 생성자는 함수 만을 받기 때문에, 실제 job을 수행하기 위해 인자들을 fbind 시켜줌
      using return_type = typename std::result_of<F(Args...)>::type;
      std::packaged_task<return_type()> job(std::bind(f, args...));
    • 리턴값 저장
      • job의 실행 결과를 보관하는 job_result_future를 정의
      • jobs_job을 실행하는 람다 함수를 추가
      • job이 실행되면, f의 리턴값이 job_result_future에 들어가게 되고, 이를 쓰레드풀 사용자가 접근 가능한 구조
      std::future<return_type> job_result_future = job.get_future();
      {
        std::lock_guard<std::mutex> lock(m_job_q_);
        jobs_.push([&job]() { job(); });
      }
  • Broken promise 예외 해결
    • promiseset_value를 하기 전에 이미 promisefuture객체가 파괴되면 발생하는 예외
    • EnqueueJob함수에 정의된 job객체는 지역 변수기 때문에 EnqueueJob함수가 리턴하면 파괴되어 예외가 발생
      • shared_ptrpackaged_task를 보관
        auto job =
        std::make_shared<std::packaged_task<return_type()>>(std::bind(f, args...));
        std::future<return_type> job_result_future = job->get_future();
        {
         std::lock_guard<std::mutex> lock(m_job_q_);
         jobs_.push([job]() { (*job)(); });
        }

  • 전체 코드
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

namespace ThreadPool {
class ThreadPool {
 public:
  ThreadPool(size_t num_threads);
  ~ThreadPool();

  // job 을 추가한다.
  template <class F, class... Args>
  std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
    F f, Args... args);

 private:
  // 총 Worker 쓰레드의 개수.
  size_t num_threads_;
  // Worker 쓰레드를 보관하는 벡터.
  std::vector<std::thread> worker_threads_;
  // 할일들을 보관하는 job 큐.
  std::queue<std::function<void()>> jobs_;
  // 위의 job 큐를 위한 cv 와 m.
  std::condition_variable cv_job_q_;
  std::mutex m_job_q_;

  // 모든 쓰레드 종료
  bool stop_all;

  // Worker 쓰레드
  void WorkerThread();
};

ThreadPool::ThreadPool(size_t num_threads)
    : num_threads_(num_threads), stop_all(false) {
  worker_threads_.reserve(num_threads_);
  for (size_t i = 0; i < num_threads_; ++i) {
    worker_threads_.emplace_back([this]() { this->WorkerThread(); });
  }
}

void ThreadPool::WorkerThread() {
  while (true) {
    std::unique_lock<std::mutex> lock(m_job_q_);
    cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
    if (stop_all && this->jobs_.empty()) {
      return;
    }

    // 맨 앞의 job 을 뺀다.
    std::function<void()> job = std::move(jobs_.front());
    jobs_.pop();
    lock.unlock();

    // 해당 job 을 수행한다 :)
    job();
  }
}

ThreadPool::~ThreadPool() {
  stop_all = true;
  cv_job_q_.notify_all();

  for (auto& t : worker_threads_) {
    t.join();
  }
}

template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
  F f, Args... args) {
  if (stop_all) {
    throw std::runtime_error("ThreadPool 사용 중지됨");
  }

  using return_type = typename std::result_of<F(Args...)>::type;
  auto job =
    std::make_shared<std::packaged_task<return_type()>>(std::bind(f, args...));
  std::future<return_type> job_result_future = job->get_future();
  {
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push([job]() { (*job)(); });
  }
  cv_job_q_.notify_one();

  return job_result_future;
}

}  // namespace ThreadPool

int work(int t, int id) {
  printf("%d start \n", id);
  std::this_thread::sleep_for(std::chrono::seconds(t));
  printf("%d end after %ds\n", id, t);
  return t + id;
}

int main() {
  ThreadPool::ThreadPool pool(3);

  std::vector<std::future<int>> futures;
  for (int i = 0; i < 10; i++) {
    futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i));
  }
  for (auto& f : futures) {
    printf("result : %d \n", f.get());
  }
}

5. 완벽한 전달

  • EnqueueJob의 인자를 완벽한 전달로 변경
    • 불필요한 복사를 방지하기 위함

  • EnqueueJob 함수의 인자들을 우측값 레퍼런스로 바꿈
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
  F&& f, Args&&... args);
  • bind 함수에 forward로 인자를 전달
auto job = std::make_shared<std::packaged_task<return_type()>>(
  std::bind(std::forward<F>(f), std::forward<Args>(args)...));

  • 전체 코드
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

namespace ThreadPool {
class ThreadPool {
 public:
  ThreadPool(size_t num_threads);
  ~ThreadPool();

  // job 을 추가한다.
  template <class F, class... Args>
  std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
    F&& f, Args&&... args);

 private:
  // 총 Worker 쓰레드의 개수.
  size_t num_threads_;
  // Worker 쓰레드를 보관하는 벡터.
  std::vector<std::thread> worker_threads_;
  // 할일들을 보관하는 job 큐.
  std::queue<std::function<void()>> jobs_;
  // 위의 job 큐를 위한 cv 와 m.
  std::condition_variable cv_job_q_;
  std::mutex m_job_q_;

  // 모든 쓰레드 종료
  bool stop_all;

  // Worker 쓰레드
  void WorkerThread();
};

ThreadPool::ThreadPool(size_t num_threads)
    : num_threads_(num_threads), stop_all(false) {
  worker_threads_.reserve(num_threads_);
  for (size_t i = 0; i < num_threads_; ++i) {
    worker_threads_.emplace_back([this]() { this->WorkerThread(); });
  }
}

void ThreadPool::WorkerThread() {
  while (true) {
    std::unique_lock<std::mutex> lock(m_job_q_);
    cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
    if (stop_all && this->jobs_.empty()) {
      return;
    }

    // 맨 앞의 job 을 뺀다.
    std::function<void()> job = std::move(jobs_.front());
    jobs_.pop();
    lock.unlock();

    // 해당 job 을 수행한다 :)
    job();
  }
}

ThreadPool::~ThreadPool() {
  stop_all = true;
  cv_job_q_.notify_all();

  for (auto& t : worker_threads_) {
    t.join();
  }
}

template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
  F&& f, Args&&... args) {
  if (stop_all) {
    throw std::runtime_error("ThreadPool 사용 중지됨");
  }

  using return_type = typename std::result_of<F(Args...)>::type;
  auto job = std::make_shared<std::packaged_task<return_type()>>(
    std::bind(std::forward<F>(f), std::forward<Args>(args)...));
  std::future<return_type> job_result_future = job->get_future();
  {
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push([job]() { (*job)(); });
  }
  cv_job_q_.notify_one();

  return job_result_future;
}

}  // namespace ThreadPool

// 사용 예시
int work(int t, int id) {
  printf("%d start \n", id);
  std::this_thread::sleep_for(std::chrono::seconds(t));
  printf("%d end after %ds\n", id, t);
  return t + id;
}

int main() {
  ThreadPool::ThreadPool pool(3);

  std::vector<std::future<int>> futures;
  for (int i = 0; i < 10; i++) {
    futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i));
  }
  for (auto& f : futures) {
    printf("result : %d \n", f.get());
  }
}
profile
그냥 하자

0개의 댓글