queue
)에 추가하는 방식으로 사용쓰레드들을 보관할 컨테이너
worker_threads_
: 쓰레드들을 보관할 벡터num_threads
: 쓰레드의 개수size_t num_threads; // 총 Worker 쓰레드의 개수.
std::vector<std::thread> worker_threads_; // Worker 쓰레드를 보관하는 벡터.
작업을 보관할 컨테이너
std::queue<std::function<void()>> jobs_;
queue
의 보호
queue
는 멀티 쓰레드 환경에서 안전하지 않기 때문에 race condition
에서 보호할 장치들이 필요std::condition_variable cv_job_q_;
std::mutex m_job_q_;
쓰레드들 종료
bool stop_all;
생성자:worker_threads_
에 쓰레드를 시작시켜줌
num_threads
개의 쓰레드를 생성ThreadPool
에 정의된 WorkerThread
함수를 실행ThreadPool::ThreadPool(size_t num_threads) : num_threads_(num_threads), stop_all(false)
{
worker_threads_.reserve(num_threads_);
for (size_t i = 0; i < num_threads_; ++i) {
worker_threads_.emplace_back([this]() { this->WorkerThread(); });
}
}
WorkerThread()
: 쓰레드로 작업을 처리
cv_job_q_
에서 jobs_
에 원소가 있거나, stop_all
이 설정될때 까지 기다림jobs_.front()
를 통해 가장 오래전에 추가된 작업을 얻어 해당 작업을 수행jobs_
에 대기중인 작업이 없다면 쓰레드를 종료void ThreadPool::WorkerThread() {
while (true) {
std::unique_lock<std::mutex> lock(m_job_q_);
cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
if (stop_all && this->jobs_.empty()) {
return;
}
// 맨 앞의 job 을 뺀다.
std::function<void()> job = std::move(jobs_.front());
jobs_.pop();
lock.unlock();
// 해당 job 을 수행한다 :)
job();
}
}
EnqueueJob(std::function<void()> job)
: 작업을 추가
stop_all
이 설정된 상태라면 더이상 작업을 추가하면 안되기 때문에 예외를 던짐stop_all
이 아니라면 작업을 추가한 뒤 자고있는 쓰레드를 깨워줌void ThreadPool::EnqueueJob(std::function<void()> job) {
if (stop_all) {
throw std::runtime_error("ThreadPool 사용 중지됨");
}
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push(std::move(job));
}
cv_job_q_.notify_one();
}
소멸자: stop_all
을 설정한 뒤, 모든 Worker
쓰레드에 알려줌
join
ThreadPool::~ThreadPool()
{
stop_all = true;
cv_job_q_.notify_all();
for (auto& t : worker_threads_) {
t.join();
}
}
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
namespace ThreadPool {
class ThreadPool {
public:
ThreadPool(size_t num_threads);
~ThreadPool();
// job 을 추가한다.
void EnqueueJob(std::function<void()> job);
private:
// 총 Worker 쓰레드의 개수.
size_t num_threads_;
// Worker 쓰레드를 보관하는 벡터.
std::vector<std::thread> worker_threads_;
// 할일들을 보관하는 job 큐.
std::queue<std::function<void()>> jobs_;
// 위의 job 큐를 위한 cv 와 m.
std::condition_variable cv_job_q_;
std::mutex m_job_q_;
// 모든 쓰레드 종료
bool stop_all;
// Worker 쓰레드
void WorkerThread();
};
ThreadPool::ThreadPool(size_t num_threads)
: num_threads_(num_threads), stop_all(false) {
worker_threads_.reserve(num_threads_);
for (size_t i = 0; i < num_threads_; ++i) {
worker_threads_.emplace_back([this]() { this->WorkerThread(); });
}
}
void ThreadPool::WorkerThread() {
while (true) {
std::unique_lock<std::mutex> lock(m_job_q_);
cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
if (stop_all && this->jobs_.empty()) {
return;
}
// 맨 앞의 job 을 뺀다.
std::function<void()> job = std::move(jobs_.front());
jobs_.pop();
lock.unlock();
// 해당 job 을 수행한다 :)
job();
}
}
ThreadPool::~ThreadPool() {
stop_all = true;
cv_job_q_.notify_all();
for (auto& t : worker_threads_) {
t.join();
}
}
void ThreadPool::EnqueueJob(std::function<void()> job) {
if (stop_all) {
throw std::runtime_error("ThreadPool 사용 중지됨");
}
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push(std::move(job));
}
cv_job_q_.notify_one();
}
} // namespace ThreadPool
void work(int t, int id) {
printf("%d start \n", id);
std::this_thread::sleep_for(std::chrono::seconds(t));
printf("%d end after %ds\n", id, t);
}
int main() {
ThreadPool::ThreadPool pool(3);
for (int i = 0; i < 10; i++) {
pool.EnqueueJob([i]() { work(i % 3 + 1, i); });
}
}
EnqueueJob
함수가 임의의 형태의 함수를 받고, 그 함수의 리턴값을 보관하는 future
를 리턴하도록 변경EnqueueJob
함수 변경f
의 리턴값을 가지는 future
를 리턴하도록 구현F
의 리턴값은 std::result_of
를 사용해 알 수 있음// job 을 추가한다.
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(F f, Args... args);
void
를 저장하는 컨테이너 jobs_
에 함수 넣기packaged_task
를 이용해 리턴값 받기return_type
이라는 f
의 리턴타입을 보관하는 타입을 정의f
의 실행결과를 저장하는 packaged_task
인 job
객체를 정의packaged_task
의 생성자는 함수 만을 받기 때문에, 실제 job
을 수행하기 위해 인자들을 f
에 bind
시켜줌using return_type = typename std::result_of<F(Args...)>::type;
std::packaged_task<return_type()> job(std::bind(f, args...));
job
의 실행 결과를 보관하는 job_result_future
를 정의jobs_
에 job
을 실행하는 람다 함수를 추가job
이 실행되면, f
의 리턴값이 job_result_future
에 들어가게 되고, 이를 쓰레드풀 사용자가 접근 가능한 구조std::future<return_type> job_result_future = job.get_future();
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push([&job]() { job(); });
}
Broken promise
예외 해결promise
에 set_value
를 하기 전에 이미 promise
의 future
객체가 파괴되면 발생하는 예외EnqueueJob
함수에 정의된 job
객체는 지역 변수기 때문에 EnqueueJob
함수가 리턴하면 파괴되어 예외가 발생shared_ptr
에 packaged_task
를 보관auto job =
std::make_shared<std::packaged_task<return_type()>>(std::bind(f, args...));
std::future<return_type> job_result_future = job->get_future();
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push([job]() { (*job)(); });
}
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
namespace ThreadPool {
class ThreadPool {
public:
ThreadPool(size_t num_threads);
~ThreadPool();
// job 을 추가한다.
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
F f, Args... args);
private:
// 총 Worker 쓰레드의 개수.
size_t num_threads_;
// Worker 쓰레드를 보관하는 벡터.
std::vector<std::thread> worker_threads_;
// 할일들을 보관하는 job 큐.
std::queue<std::function<void()>> jobs_;
// 위의 job 큐를 위한 cv 와 m.
std::condition_variable cv_job_q_;
std::mutex m_job_q_;
// 모든 쓰레드 종료
bool stop_all;
// Worker 쓰레드
void WorkerThread();
};
ThreadPool::ThreadPool(size_t num_threads)
: num_threads_(num_threads), stop_all(false) {
worker_threads_.reserve(num_threads_);
for (size_t i = 0; i < num_threads_; ++i) {
worker_threads_.emplace_back([this]() { this->WorkerThread(); });
}
}
void ThreadPool::WorkerThread() {
while (true) {
std::unique_lock<std::mutex> lock(m_job_q_);
cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
if (stop_all && this->jobs_.empty()) {
return;
}
// 맨 앞의 job 을 뺀다.
std::function<void()> job = std::move(jobs_.front());
jobs_.pop();
lock.unlock();
// 해당 job 을 수행한다 :)
job();
}
}
ThreadPool::~ThreadPool() {
stop_all = true;
cv_job_q_.notify_all();
for (auto& t : worker_threads_) {
t.join();
}
}
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
F f, Args... args) {
if (stop_all) {
throw std::runtime_error("ThreadPool 사용 중지됨");
}
using return_type = typename std::result_of<F(Args...)>::type;
auto job =
std::make_shared<std::packaged_task<return_type()>>(std::bind(f, args...));
std::future<return_type> job_result_future = job->get_future();
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push([job]() { (*job)(); });
}
cv_job_q_.notify_one();
return job_result_future;
}
} // namespace ThreadPool
int work(int t, int id) {
printf("%d start \n", id);
std::this_thread::sleep_for(std::chrono::seconds(t));
printf("%d end after %ds\n", id, t);
return t + id;
}
int main() {
ThreadPool::ThreadPool pool(3);
std::vector<std::future<int>> futures;
for (int i = 0; i < 10; i++) {
futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i));
}
for (auto& f : futures) {
printf("result : %d \n", f.get());
}
}
EnqueueJob
의 인자를 완벽한 전달로 변경EnqueueJob
함수의 인자들을 우측값 레퍼런스로 바꿈template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
F&& f, Args&&... args);
bind
함수에 forward
로 인자를 전달auto job = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
namespace ThreadPool {
class ThreadPool {
public:
ThreadPool(size_t num_threads);
~ThreadPool();
// job 을 추가한다.
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
F&& f, Args&&... args);
private:
// 총 Worker 쓰레드의 개수.
size_t num_threads_;
// Worker 쓰레드를 보관하는 벡터.
std::vector<std::thread> worker_threads_;
// 할일들을 보관하는 job 큐.
std::queue<std::function<void()>> jobs_;
// 위의 job 큐를 위한 cv 와 m.
std::condition_variable cv_job_q_;
std::mutex m_job_q_;
// 모든 쓰레드 종료
bool stop_all;
// Worker 쓰레드
void WorkerThread();
};
ThreadPool::ThreadPool(size_t num_threads)
: num_threads_(num_threads), stop_all(false) {
worker_threads_.reserve(num_threads_);
for (size_t i = 0; i < num_threads_; ++i) {
worker_threads_.emplace_back([this]() { this->WorkerThread(); });
}
}
void ThreadPool::WorkerThread() {
while (true) {
std::unique_lock<std::mutex> lock(m_job_q_);
cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
if (stop_all && this->jobs_.empty()) {
return;
}
// 맨 앞의 job 을 뺀다.
std::function<void()> job = std::move(jobs_.front());
jobs_.pop();
lock.unlock();
// 해당 job 을 수행한다 :)
job();
}
}
ThreadPool::~ThreadPool() {
stop_all = true;
cv_job_q_.notify_all();
for (auto& t : worker_threads_) {
t.join();
}
}
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
F&& f, Args&&... args) {
if (stop_all) {
throw std::runtime_error("ThreadPool 사용 중지됨");
}
using return_type = typename std::result_of<F(Args...)>::type;
auto job = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> job_result_future = job->get_future();
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push([job]() { (*job)(); });
}
cv_job_q_.notify_one();
return job_result_future;
}
} // namespace ThreadPool
// 사용 예시
int work(int t, int id) {
printf("%d start \n", id);
std::this_thread::sleep_for(std::chrono::seconds(t));
printf("%d end after %ds\n", id, t);
return t + id;
}
int main() {
ThreadPool::ThreadPool pool(3);
std::vector<std::future<int>> futures;
for (int i = 0; i < 10; i++) {
futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i));
}
for (auto& f : futures) {
printf("result : %d \n", f.get());
}
}