모두의 코드
씹어먹는 C++ - <15 - 5. C++ 쓰레드풀(ThreadPool) 만들기>
이번 강좌에서는
ThreadPool
을 만들기
에 대해 다룹니다.
안녕하세요 여러분! 이번 강좌에서는 여태까지 배운 내용들을 총 활용해서 쓰레드풀(ThreadPool)을 만들어보겠습니다. 이 쓰레드풀 구현은 여기를 기반으로 작성하였습니다.
쓰레드풀이란, 쓰레드들을 위한 직업 소개소라고 보시면 됩니다. 여러 개의 쓰레드들이 대기하고 있다가, 할 일이 들어오게 되면, 대기하고 있던 쓰레드들 중 하나가 이를 받아서 실행하게 됩니다.
예를 들어서 서버의 경우, 클라이언트 (사용자) 에서 요청이 들어오면 해당 요청에 대한 처리를 쓰레드풀에 추가만 하면 됩니다. 그러면 나중에 쓰레드들 중 하나가 처리를 하게 되겠지요. 물론 모든 쓰레드들이 다 다른 것들을 처리하고 있어도 괜찮습니다.
보통 이를 구현하는 아이디어는 간단합니다. 처리해야 될 작업들을 큐(queue
) 에 추가하는 것입니다. 큐는 그냥 링크드리스트 라고 생각하면 편한데, push 를 하게 되면 큐 맨 뒤에 작업을 추가하게 됩니다. 그 다음에 pop 을 하면 맨 앞에 있는 작업을 빼버리게 됩니다.
참고로 C++ 에서 제공하는 queue
의 경우 pop 을 하면 맨 앞의 원소를 제거하지만 해당 원소를 리턴하지 않습니다. 해당 원소에 접근하기 위해서는 front
를 호출해야 합니다.
큐를 사용하면 가장 먼저 추가된 작업을 가장 먼저 처리를 시작할 수 있습니다. 다시 말에 가장 오래된 작업 요청을 먼저 처리하는 방식이라고 보면 됩니다. 가장 상식적인 방식이기도 한데, 때때론 가장 최근에 추가된 작업 요청을 먼저 처리해야 할 때도 있습니다. 이 경우 queue
대신에 다른 자료 구조를 이용하는 것이 좋습니다.
클래스 설계 하기
그렇다면 먼저 이 ThreadPool
클래스에 무엇이 필요할지 생각해봅시다.
먼저 당연하게도 쓰레드들을 잔뜩 보관할 컨테이너가 필요 합니다.
// 총 Worker 쓰레드의 개수. size_t num_threads_; // Worker 쓰레드를 보관하는 벡터. std::vector<std::thread> worker_threads_;
위와 같이 쓰레드들을 보관하는 worker_threads_
라는 벡터를 만듭시다. 참고로 우리의 쓰레드풀에서 돌아가는 쓰레드들을 편의상 Worker
쓰레드 라고 부르도록 하겠습다. num_threads_
는 전체 쓰레드의 개수를 보관하는 멤버 변수 입니다. 물론 해당 값은 worker_threads_.size()
와 같겠지요.
그렇다면 이제 작업 들을 어떻게 저장할지 생각해야 합니다. 쓰레드플 사용자는 실행을 원하는 함수들을 쓰레드풀에 전달할 것입니다. 하지만 C++ 에는 안타깝게도 일반적인 타입의 함수 포인터를 저장할 수 있는 컨테이너는 없습니다.
따라서 일단은 void
형의 인자를 받지 않는 함수를 전달한다고 가정하겠습니다. 강좌 뒷부분에서 어떻게 하면 임의의 타입을 받는 함수들도 처리할 수 있을지 다룰 것입니다.
작업을 보관할 컨테이너는 아래와 같습니다.
// 할일들을 보관하는 job 큐. std::queue<std::function<void()>> jobs_;
앞서 말했듯이 작업을 보관하는 컨테이너는 큐를 사용한다고 하였습니다. 큐를 사용해서 가장 오래 전에 추가된 작업 을 쉽게 알아낼 수 있습니다.
해당 큐는 모든 작업 쓰레드들에서 접근 가능한 큐 입니다. 또한, 쓰레드풀 사용자들도 작업들을 각기 다른 쓰레드들에서 쓰레드풀에 추가할 수 도 있습니다. 하지만 queue
는 멀티 쓰레드 환경에서 안전하지 않기 때문에 이 queue
를 race condition
에서 보호할 장치들이 필요합니다.
std::condition_variable cv_job_q_; std::mutex m_job_q_;
cv_job_q_
와 m_job_q_
는 생산자-소비자 패턴을 구현할 때 사용됩니다. 여기서 생산자 역할은 쓰레드풀을 사용하는 사용자들이고 (jobs_
에 작업을 추가하는 사람들), 소비자들은 Worker
쓰레드들이겠지요.
마지막으로 Worker
쓰레드들을 종료시킬 조건을 나타내는 멤버 변수인
// 모든 쓰레드 종료 bool stop_all;
가 필요 합니다. Worker
쓰레드들은 기본적으로 jobs_
들을 처리하는 동안 무한 루프를 돌고 있는데, 위 stop_all
이 설정 된다면 무한 루프를 빠져나가게 됩니다.
ThreadPool 첫 번째 버전
그렇다면 ThreadPool
의 구현을 먼저 살펴보도록 하겠습니다. 먼저 생성자는 간단합니다. worker_threads_
에 쓰레드를 시작시켜주기만 하면 됩니다.
ThreadPool::ThreadPool(size_t num_threads) : num_threads_(num_threads), stop_all(false) { worker_threads_.reserve(num_threads_); for (size_t i = 0; i < num_threads_; ++i) { worker_threads_.emplace_back([this]() { this->WorkerThread(); }); } }
위와 같이 num_threads_
개의 쓰레드를 생성하게 됩니다. 이 때 각 쓰레드들은 ThreadPool
에 정의된 WorkerThread
함수를 실행하게 됩니다. 참고로, 외부에서 멤버 함수에 접근하기 위해서는 이전에 이야기 하였듯이 mem_fn
으로 감싸거나, 람다 함수를 이용하면 되는데 여기서는 간단히 멤버 함수를 사용하였습니다.
물론 람다 안에서 멤버 함수에 접근하기 위해서는 this
를 전달해줘야 합니다. 그리고 람다 함수 안에서 this->WorkerThread()
를 통해 멤버 함수를 실행할 수 있습니다.
그렇다면 WorkerThread
에서는 무슨 일을 해야 할까요? 간단합니다. jobs_
에 작업이 추가될 때 까지 대기하고 있다가, 작업이 추가되면 받아서 처리하면 됩니다. 따라서 아래와 같이 구현할 수 있습니다.
void ThreadPool::WorkerThread() { while (true) { std::unique_lock<std::mutex> lock(m_job_q_); cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; }); if (stop_all && this->jobs_.empty()) { return; } // 맨 앞의 job 을 뺀다. std::function<void()> job = std::move(jobs_.front()); jobs_.pop(); lock.unlock(); // 해당 job 을 수행한다 :) job(); } }
조건 변수 cv_job_q_
에서 jobs_
에 원소가 있거나, stop_all
이 설정될때 까지 기다립니다. 만약에 모든 작업들이 설정되어 있고 jobs_
에 대기중인 작업이 없을 때 비로소 쓰레드를 종료하게 됩니다 (일이 없을 때 까지 퇴근을 못하는 슬픈 현실을 감안한 구현입니다.)
처리할 일이 있다면 간단히 jobs_.front()
를 통해 가장 오래전에 추가된 작업을 얻은 뒤에 해당 작업을 실행하면 됩니다.
그렇다면 작업을 추가하는 함수를 어떨까요?
void ThreadPool::EnqueueJob(std::function<void()> job) { if (stop_all) { throw std::runtime_error("ThreadPool 사용 중지됨"); } { std::lock_guard<std::mutex> lock(m_job_q_); jobs_.push(std::move(job)); } cv_job_q_.notify_one(); }
크게 복잡하지 않습니다. 일단 이미 stop_all
이 설정된 상태라면 더이상 작업을 추가하면 안되기에 예외를 던지도록 하였습니다. 그렇지 않을 경우 간단히 작업을 추가한 뒤에 자고 있는 쓰레드 하나만 깨워주면 됩니다.
마지막으로 소멸자는 아래와 같습니다.
ThreadPool::~ThreadPool() { stop_all = true; cv_job_q_.notify_all(); for (auto& t : worker_threads_) { t.join(); } }
stop_all
을 설정한 뒤에, 모든 Worker
쓰레드들에 알려줍니다. 그 후 모든 쓰레드들을 join
하면 됩니다.
전체 코드를 보면 아래와 같습니다.
전체 구현 (1)
#include <chrono> #include <condition_variable> #include <cstdio> #include <functional> #include <mutex> #include <queue> #include <thread> #include <vector> namespace ThreadPool { class ThreadPool { public: ThreadPool(size_t num_threads); ~ThreadPool(); // job 을 추가한다. void EnqueueJob(std::function<void()> job); private: // 총 Worker 쓰레드의 개수. size_t num_threads_; // Worker 쓰레드를 보관하는 벡터. std::vector<std::thread> worker_threads_; // 할일들을 보관하는 job 큐. std::queue<std::function<void()>> jobs_; // 위의 job 큐를 위한 cv 와 m. std::condition_variable cv_job_q_; std::mutex m_job_q_; // 모든 쓰레드 종료 bool stop_all; // Worker 쓰레드 void WorkerThread(); }; ThreadPool::ThreadPool(size_t num_threads) : num_threads_(num_threads), stop_all(false) { worker_threads_.reserve(num_threads_); for (size_t i = 0; i < num_threads_; ++i) { worker_threads_.emplace_back([this]() { this->WorkerThread(); }); } } void ThreadPool::WorkerThread() { while (true) { std::unique_lock<std::mutex> lock(m_job_q_); cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; }); if (stop_all && this->jobs_.empty()) { return; } // 맨 앞의 job 을 뺀다. std::function<void()> job = std::move(jobs_.front()); jobs_.pop(); lock.unlock(); // 해당 job 을 수행한다 :) job(); } } ThreadPool::~ThreadPool() { stop_all = true; cv_job_q_.notify_all(); for (auto& t : worker_threads_) { t.join(); } } void ThreadPool::EnqueueJob(std::function<void()> job) { if (stop_all) { throw std::runtime_error("ThreadPool 사용 중지됨"); } { std::lock_guard<std::mutex> lock(m_job_q_); jobs_.push(std::move(job)); } cv_job_q_.notify_one(); } } // namespace ThreadPool void work(int t, int id) { printf("%d start \n", id); std::this_thread::sleep_for(std::chrono::seconds(t)); printf("%d end after %ds\n", id, t); } int main() { ThreadPool::ThreadPool pool(3); for (int i = 0; i < 10; i++) { pool.EnqueueJob([i]() { work(i % 3 + 1, i); }); } }
성공적으로 컴파일 하였다면
와 같이 잘 실행됨을 알 수 있습니다.
쓰레드풀에 작업을 추가하는 것은 아래와 같습니다.
pool.EnqueueJob([i]() { work(i % 3 + 1, i); });
앞서 쓰레드풀이 받는 함수의 형태가 리턴 타입이 void
이고 인자를 받지 않는다고 하였습니다. 따라서 work
함수를 그대로 전달할 수 는 없습니다. 왜냐하면 int
타입 인자 두 개를 받기 때문이지요. 하지만 크게 문제될 것은 없습니다. 위와 같이 void()
형태의 람다 함수로 감싸서 전달하면 되기 때문이지요.
임의의 함수 받기
안타깝게도 현재 구현한 ThreadPool
의 경우 부족한 점이 하나 있습니다. 바로 우리가 전달한 함수가 어떠한 값을 리턴할 때 입니다. 물론 그 함수에 포인터로 리턴값을 저장할 변수를 전달하면 되기는 합니다. 하지만, 기존의 future
처럼 그 값이 설정될 때 까지 기다리는 것은 불가능 합니다.
따라서 더 나은 구조로는 EnqueueJob
함수가 임의의 형태의 함수를 받고, 그 함수의 리턴값을 보관하는 future
를 리턴하는 꼴이면 더 좋을 것 같습니다.
// job 을 추가한다. template <class F, class... Args> std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(F f, Args... args);
이를 반영한 것이 바로 위 EnqueueJob
함수 입니다. 엄청 복잡해 보이지만 차근차근 뜯어보면 간단합니다.
template <class F, class... Args>
위 class...
은 가변 길이 템플릿으로 임의의 길이의 인자들을 받을 수 있습니다. 예를 들어서
EnqueueJob(func, 1, 2, 3);
와 같이 함수를 호출하였을 때 첫 번째 인자인 func
는 f
에 들어가게 되고, 나머지 1, 2, 3
이 args...
부분에 들어가게 됩니다. 그렇다면 이 EnqueueJob
함수는 무엇을 리턴할까요?
간단히 생각해보면 전달받은 함수 f
의 리턴값을 가지는 future
를 리턴해야 할 것입니다. 함수 F
의 리턴값은 std::result_of
를 사용하면 알 수 있습니다.
typename std::result_of<F(Args...)>::type // f 의 리턴값
따라서 EnqueueJob
의 정의는 그냥
// job 을 추가한다. template <class F, class... Args> std::future</* f 의 리턴 타입*/> EnqueueJob(F f, Args... args);
이라고 생각하시면 됩니다.
그런데 임의의 함수와 원소들을 받을 수 있다고 해서, 이를 컨테이너에 추가할 수 있다는 것은 아닙니다. 어떻게 하면 해당 함수의 실행을 void()
꼴의 함수만 저장할 수 있는 컨테이너에 넣을 수 있을까요?
그야 간단합니다. 그냥
jobs_.push([f, args...]() { f(args...); });
을 한다면 Worker
쓰레드 안에서 f(args...)
를 실행 할 수 있습니다. 그런데 이와 같은 형태는 한 가지 문제점이 있습니다. 바로 f(args...)
의 리턴값을 얻을 길이 없어진다는 것입니다.
하지만 우리는 이전 강좌를 통해 비동기적으로 실행되는 함수의 리턴값 (더 나아가 예외 까지) 받아내는 법을 알고 있습니다. 바로 packaged_task
를 이용하는 것입니다!
using return_type = typename std::result_of<F(Args...)>::type; std::packaged_task<return_type()> job(std::bind(f, args...));
편의상 return_type
라는 f
의 리턴타입을 보관하는 타입을 정의하였고, 그 밑에 f
의 실행 결과를 저장하는 packaged_task
인 job
객체를 정의하였습니다.
한 가지 중요한 점은 packaged_task
의 생성자는 함수 만을 받기 때문에, 실제 job
을 수행하기 위해서는 job(args...)
와 같이 호출하거나, 아니면 위 처럼 그냥 인자들을 f
에 bind
시켜주면 됩니다. 우리의 경우 bind
를 하는 것으로 선택하였습니다.
std::future<return_type> job_result_future = job.get_future(); { std::lock_guard<std::mutex> lock(m_job_q_); jobs_.push([&job]() { job(); }); }
그 후에 job
의 실행 결과를 보관하는 job_result_future
를 정의하였고, 마지막으로 jobs_
에 job
을 실행하는 람다 함수를 추가하였습니다. job
이 실행된다면, f
의 리턴값이 job_result_future
에 들어가게 되고, 이는 쓰레드풀 사용자가 접근할 수 있게 됩니다.
#include <chrono> #include <condition_variable> #include <cstdio> #include <functional> #include <future> #include <mutex> #include <queue> #include <thread> #include <vector> namespace ThreadPool { class ThreadPool { public: ThreadPool(size_t num_threads); ~ThreadPool(); // job 을 추가한다. template <class F, class... Args> std::future<typename std::result_of<F(Args...)>::type> EnqueueJob( F f, Args... args); private: // 총 Worker 쓰레드의 개수. size_t num_threads_; // Worker 쓰레드를 보관하는 벡터. std::vector<std::thread> worker_threads_; // 할일들을 보관하는 job 큐. std::queue<std::function<void()>> jobs_; // 위의 job 큐를 위한 cv 와 m. std::condition_variable cv_job_q_; std::mutex m_job_q_; // 모든 쓰레드 종료 bool stop_all; // Worker 쓰레드 void WorkerThread(); }; ThreadPool::ThreadPool(size_t num_threads) : num_threads_(num_threads), stop_all(false) { worker_threads_.reserve(num_threads_); for (size_t i = 0; i < num_threads_; ++i) { worker_threads_.emplace_back([this]() { this->WorkerThread(); }); } } void ThreadPool::WorkerThread() { while (true) { std::unique_lock<std::mutex> lock(m_job_q_); cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; }); if (stop_all && this->jobs_.empty()) { return; } // 맨 앞의 job 을 뺀다. std::function<void()> job = std::move(jobs_.front()); jobs_.pop(); lock.unlock(); // 해당 job 을 수행한다 :) job(); } } ThreadPool::~ThreadPool() { stop_all = true; cv_job_q_.notify_all(); for (auto& t : worker_threads_) { t.join(); } } template <class F, class... Args> std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob( F f, Args... args) { if (stop_all) { throw std::runtime_error("ThreadPool 사용 중지됨"); } using return_type = typename std::result_of<F(Args...)>::type; std::packaged_task<return_type()> job(std::bind(f, args...)); std::future<return_type> job_result_future = job.get_future(); { std::lock_guard<std::mutex> lock(m_job_q_); jobs_.push([&job]() { job(); }); } cv_job_q_.notify_one(); return job_result_future; } } // namespace ThreadPool int work(int t, int id) { printf("%d start \n", id); std::this_thread::sleep_for(std::chrono::seconds(t)); printf("%d end after %ds\n", id, t); return t + id; } int main() { ThreadPool::ThreadPool pool(3); std::vector<std::future<int>> futures; for (int i = 0; i < 10; i++) { futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i)); } for (auto& f : futures) { printf("result : %d \n", f.get()); } }
성공적으로 컴파일 후 실행하였다면 아래와 같이 런타임 오류가 발생합니다.
실행 결과
0 start 2 start terminate called after throwing an instance of '4 start std::future_error' what(): std::future_error: Broken promise [1] 28513 abort (core dumped) ./threadpool
보시다시피 Broken promise 예외가 던져졌습니다. Broken promise
예외는 promise
에 set_value
를 하기 전에 이미 promise
의 future
객체가 파괴되었다면 발생하는 예외 입니다. 그렇다면 왜 future
객체가 파괴되었을까요? 그 이유는 간단합니다.
std::packaged_task<return_type()> job(std::bind(f, args...));
EnqueueJob
함수에 정의된 job
객체는 지역 변수 입니다. 즉, EnqueueJob
함수가 리턴하면 파괴되는 객체입니다. 따라서 [&job]() { job(); }
안에서 job
을 접근할 때 이미 그 객체는 파괴되고 없어져있을 것입니다.
이 문제를 해결하는 방법으로 크게 두 가지를 생각해볼 수 있습니다.
packaged_task
를 따로 컨테이너에 저장해서 보관한다.shared_ptr
에packaged_task
를 보관한다.
(1) 번 방식의 경우 더 이상 packaged_task
를 사용하지 않을 때에도 컨테이너에 남아있다는 문제가 있습니다. 하지만 (2) 의 경우 packaged_task
를 사용하는 것이 없을 때 알아서 shared_ptr
가 객체를 소멸시켜주므로 훨씬 관리하기 편합니다. 따라서 후자를 택하도록 하겠습니다. 이를 구현하면 아래와 같습니다.
auto job = std::make_shared<std::packaged_task<return_type()>>(std::bind(f, args...)); std::future<return_type> job_result_future = job->get_future(); { std::lock_guard<std::mutex> lock(m_job_q_); jobs_.push([job]() { (*job)(); }); }
위와 같이 간단히 make_shared
를 통해서 shared_ptr
을 생성하였고, 대신에 람다 함수에 shared_ptr
의 복사본을 전달해서 람다 함수 안에서도 packaged_task
의 shared_ptr
하나를 붙들고 있게 되었습니다.
따라서 job
을 실행하는 시점에서도 packged_task
객체는 계속 살아있게 됩니다.
#include <chrono> #include <condition_variable> #include <cstdio> #include <functional> #include <future> #include <mutex> #include <queue> #include <thread> #include <vector> namespace ThreadPool { class ThreadPool { public: ThreadPool(size_t num_threads); ~ThreadPool(); // job 을 추가한다. template <class F, class... Args> std::future<typename std::result_of<F(Args...)>::type> EnqueueJob( F f, Args... args); private: // 총 Worker 쓰레드의 개수. size_t num_threads_; // Worker 쓰레드를 보관하는 벡터. std::vector<std::thread> worker_threads_; // 할일들을 보관하는 job 큐. std::queue<std::function<void()>> jobs_; // 위의 job 큐를 위한 cv 와 m. std::condition_variable cv_job_q_; std::mutex m_job_q_; // 모든 쓰레드 종료 bool stop_all; // Worker 쓰레드 void WorkerThread(); }; ThreadPool::ThreadPool(size_t num_threads) : num_threads_(num_threads), stop_all(false) { worker_threads_.reserve(num_threads_); for (size_t i = 0; i < num_threads_; ++i) { worker_threads_.emplace_back([this]() { this->WorkerThread(); }); } } void ThreadPool::WorkerThread() { while (true) { std::unique_lock<std::mutex> lock(m_job_q_); cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; }); if (stop_all && this->jobs_.empty()) { return; } // 맨 앞의 job 을 뺀다. std::function<void()> job = std::move(jobs_.front()); jobs_.pop(); lock.unlock(); // 해당 job 을 수행한다 :) job(); } } ThreadPool::~ThreadPool() { stop_all = true; cv_job_q_.notify_all(); for (auto& t : worker_threads_) { t.join(); } } template <class F, class... Args> std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob( F f, Args... args) { if (stop_all) { throw std::runtime_error("ThreadPool 사용 중지됨"); } using return_type = typename std::result_of<F(Args...)>::type; auto job = std::make_shared<std::packaged_task<return_type()>>(std::bind(f, args...)); std::future<return_type> job_result_future = job->get_future(); { std::lock_guard<std::mutex> lock(m_job_q_); jobs_.push([job]() { (*job)(); }); } cv_job_q_.notify_one(); return job_result_future; } } // namespace ThreadPool int work(int t, int id) { printf("%d start \n", id); std::this_thread::sleep_for(std::chrono::seconds(t)); printf("%d end after %ds\n", id, t); return t + id; } int main() { ThreadPool::ThreadPool pool(3); std::vector<std::future<int>> futures; for (int i = 0; i < 10; i++) { futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i)); } for (auto& f : futures) { printf("result : %d \n", f.get()); } }
성공적으로 컴파일 하였다면
와 같이 잘 나옵니다.
완벽한 전달
자 이제 거의 다 왔습니다. 우리의 EnqueueJob
함수의 경우 다 좋지만 한 가지 문제점이 있는데 바로
ThreadPool::EnqueueJob(F f, Args... args);
위와 같이 인자들의 복사본을 받는다는 것입니다. 하지만 이는 불필요한 복사를 야기하므로 완벽한 전달 패턴을 사용하는 것이 좋겠습니다.
이는 크게 어렵지 않습니다. 먼저 EnqueueJob
함수의 인자들을 우측값 레퍼런스로 바꾼 뒤에;
template <class F, class... Args> std::future<typename std::result_of<F(Args...)>::type> EnqueueJob( F&& f, Args&&... args);
bind
함수에 forward 로 인자를 전달해주면 됩니다.
auto job = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...));
그렇다면 불필요한 복사 없이 Enqueue
함수에 인자들을 완벽히 전달할 수 있게 됩니다. 따라서 최종 우리의 ThreadPool
은 아래와 같습니다.
최종 ThreadPool 구현 버전
#include <chrono> #include <condition_variable> #include <cstdio> #include <functional> #include <future> #include <mutex> #include <queue> #include <thread> #include <vector> namespace ThreadPool { class ThreadPool { public: ThreadPool(size_t num_threads); ~ThreadPool(); // job 을 추가한다. template <class F, class... Args> std::future<typename std::result_of<F(Args...)>::type> EnqueueJob( F&& f, Args&&... args); private: // 총 Worker 쓰레드의 개수. size_t num_threads_; // Worker 쓰레드를 보관하는 벡터. std::vector<std::thread> worker_threads_; // 할일들을 보관하는 job 큐. std::queue<std::function<void()>> jobs_; // 위의 job 큐를 위한 cv 와 m. std::condition_variable cv_job_q_; std::mutex m_job_q_; // 모든 쓰레드 종료 bool stop_all; // Worker 쓰레드 void WorkerThread(); }; ThreadPool::ThreadPool(size_t num_threads) : num_threads_(num_threads), stop_all(false) { worker_threads_.reserve(num_threads_); for (size_t i = 0; i < num_threads_; ++i) { worker_threads_.emplace_back([this]() { this->WorkerThread(); }); } } void ThreadPool::WorkerThread() { while (true) { std::unique_lock<std::mutex> lock(m_job_q_); cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; }); if (stop_all && this->jobs_.empty()) { return; } // 맨 앞의 job 을 뺀다. std::function<void()> job = std::move(jobs_.front()); jobs_.pop(); lock.unlock(); // 해당 job 을 수행한다 :) job(); } } ThreadPool::~ThreadPool() { stop_all = true; cv_job_q_.notify_all(); for (auto& t : worker_threads_) { t.join(); } } template <class F, class... Args> std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob( F&& f, Args&&... args) { if (stop_all) { throw std::runtime_error("ThreadPool 사용 중지됨"); } using return_type = typename std::result_of<F(Args...)>::type; auto job = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...)); std::future<return_type> job_result_future = job->get_future(); { std::lock_guard<std::mutex> lock(m_job_q_); jobs_.push([job]() { (*job)(); }); } cv_job_q_.notify_one(); return job_result_future; } } // namespace ThreadPool // 사용 예시 int work(int t, int id) { printf("%d start \n", id); std::this_thread::sleep_for(std::chrono::seconds(t)); printf("%d end after %ds\n", id, t); return t + id; } int main() { ThreadPool::ThreadPool pool(3); std::vector<std::future<int>> futures; for (int i = 0; i < 10; i++) { futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i)); } for (auto& f : futures) { printf("result : %d \n", f.get()); } }
성공적으로 컴파일 하였다면
와 같이 잘 실행됩니다 :)
자 그럼 이것으로 이번 강좌를 마치도록 하겠습니다.
C++ 강좌도 점점 마무리를 향해 가는것 같습니다. 다음 강좌들에서는 이전 강좌들에서 채 다루지 못했던 C++ 11 에서 새로 추가된 문법 요소와, 더 나아가 몇몇 새로운 라이브러리들을 다룰 예정입니다.
댓글을 불러오는 중입니다..