모두의 코드
씹어먹는 C++ - <15 - 5. C++ 쓰레드풀(ThreadPool) 만들기>

작성일 : 2019-05-19 이 글은 56815 번 읽혔습니다.

이번 강좌에서는

  • ThreadPool 을 만들기

에 대해 다룹니다.

안녕하세요 여러분! 이번 강좌에서는 여태까지 배운 내용들을 총 활용해서 쓰레드풀(ThreadPool)을 만들어보겠습니다. 이 쓰레드풀 구현은 여기를 기반으로 작성하였습니다.

쓰레드풀이란, 쓰레드들을 위한 직업 소개소라고 보시면 됩니다. 여러 개의 쓰레드들이 대기하고 있다가, 할 일이 들어오게 되면, 대기하고 있던 쓰레드들 중 하나가 이를 받아서 실행하게 됩니다.

예를 들어서 서버의 경우, 클라이언트 (사용자) 에서 요청이 들어오면 해당 요청에 대한 처리를 쓰레드풀에 추가만 하면 됩니다. 그러면 나중에 쓰레드들 중 하나가 처리를 하게 되겠지요. 물론 모든 쓰레드들이 다 다른 것들을 처리하고 있어도 괜찮습니다.

보통 이를 구현하는 아이디어는 간단합니다. 처리해야 될 작업들을 큐(queue) 에 추가하는 것입니다. 큐는 그냥 링크드리스트 라고 생각하면 편한데, push 를 하게 되면 큐 맨 뒤에 작업을 추가하게 됩니다. 그 다음에 pop 을 하면 맨 앞에 있는 작업을 빼버리게 됩니다.

참고로 C++ 에서 제공하는 queue 의 경우 pop 을 하면 맨 앞의 원소를 제거하지만 해당 원소를 리턴하지 않습니다. 해당 원소에 접근하기 위해서는 front 를 호출해야 합니다.

큐를 사용하면 가장 먼저 추가된 작업을 가장 먼저 처리를 시작할 수 있습니다. 다시 말에 가장 오래된 작업 요청을 먼저 처리하는 방식이라고 보면 됩니다. 가장 상식적인 방식이기도 한데, 때때론 가장 최근에 추가된 작업 요청을 먼저 처리해야 할 때도 있습니다. 이 경우 queue 대신에 다른 자료 구조를 이용하는 것이 좋습니다.

클래스 설계 하기

그렇다면 먼저 이 ThreadPool 클래스에 무엇이 필요할지 생각해봅시다.

먼저 당연하게도 쓰레드들을 잔뜩 보관할 컨테이너가 필요 합니다.

// 총 Worker 쓰레드의 개수.
size_t num_threads_;
// Worker 쓰레드를 보관하는 벡터.
std::vector<std::thread> worker_threads_;

위와 같이 쓰레드들을 보관하는 worker_threads_ 라는 벡터를 만듭시다. 참고로 우리의 쓰레드풀에서 돌아가는 쓰레드들을 편의상 Worker 쓰레드 라고 부르도록 하겠습다. num_threads_ 는 전체 쓰레드의 개수를 보관하는 멤버 변수 입니다. 물론 해당 값은 worker_threads_.size() 와 같겠지요.

그렇다면 이제 작업 들을 어떻게 저장할지 생각해야 합니다. 쓰레드플 사용자는 실행을 원하는 함수들을 쓰레드풀에 전달할 것입니다. 하지만 C++ 에는 안타깝게도 일반적인 타입의 함수 포인터를 저장할 수 있는 컨테이너는 없습니다.

따라서 일단은 void 형의 인자를 받지 않는 함수를 전달한다고 가정하겠습니다. 강좌 뒷부분에서 어떻게 하면 임의의 타입을 받는 함수들도 처리할 수 있을지 다룰 것입니다.

작업을 보관할 컨테이너는 아래와 같습니다.

// 할일들을 보관하는 job 큐.
std::queue<std::function<void()>> jobs_;

앞서 말했듯이 작업을 보관하는 컨테이너는 큐를 사용한다고 하였습니다. 큐를 사용해서 가장 오래 전에 추가된 작업 을 쉽게 알아낼 수 있습니다.

해당 큐는 모든 작업 쓰레드들에서 접근 가능한 큐 입니다. 또한, 쓰레드풀 사용자들도 작업들을 각기 다른 쓰레드들에서 쓰레드풀에 추가할 수 도 있습니다. 하지만 queue 는 멀티 쓰레드 환경에서 안전하지 않기 때문에 이 queuerace condition 에서 보호할 장치들이 필요합니다.

std::condition_variable cv_job_q_;
std::mutex m_job_q_;

cv_job_q_m_job_q_ 는 생산자-소비자 패턴을 구현할 때 사용됩니다. 여기서 생산자 역할은 쓰레드풀을 사용하는 사용자들이고 (jobs_ 에 작업을 추가하는 사람들), 소비자들은 Worker 쓰레드들이겠지요.

마지막으로 Worker 쓰레드들을 종료시킬 조건을 나타내는 멤버 변수인

// 모든 쓰레드 종료
bool stop_all;

가 필요 합니다. Worker 쓰레드들은 기본적으로 jobs_ 들을 처리하는 동안 무한 루프를 돌고 있는데, 위 stop_all 이 설정 된다면 무한 루프를 빠져나가게 됩니다.

ThreadPool 첫 번째 버전

그렇다면 ThreadPool 의 구현을 먼저 살펴보도록 하겠습니다. 먼저 생성자는 간단합니다. worker_threads_ 에 쓰레드를 시작시켜주기만 하면 됩니다.

ThreadPool::ThreadPool(size_t num_threads)
    : num_threads_(num_threads), stop_all(false) {
  worker_threads_.reserve(num_threads_);
  for (size_t i = 0; i < num_threads_; ++i) {
    worker_threads_.emplace_back([this]() { this->WorkerThread(); });
  }
}

위와 같이 num_threads_ 개의 쓰레드를 생성하게 됩니다. 이 때 각 쓰레드들은 ThreadPool 에 정의된 WorkerThread 함수를 실행하게 됩니다. 참고로, 외부에서 멤버 함수에 접근하기 위해서는 이전에 이야기 하였듯이 mem_fn 으로 감싸거나, 람다 함수를 이용하면 되는데 여기서는 간단히 멤버 함수를 사용하였습니다.

물론 람다 안에서 멤버 함수에 접근하기 위해서는 this 를 전달해줘야 합니다. 그리고 람다 함수 안에서 this->WorkerThread() 를 통해 멤버 함수를 실행할 수 있습니다.

그렇다면 WorkerThread 에서는 무슨 일을 해야 할까요? 간단합니다. jobs_ 에 작업이 추가될 때 까지 대기하고 있다가, 작업이 추가되면 받아서 처리하면 됩니다. 따라서 아래와 같이 구현할 수 있습니다.

void ThreadPool::WorkerThread() {
  while (true) {
    std::unique_lock<std::mutex> lock(m_job_q_);
    cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
    if (stop_all && this->jobs_.empty()) {
      return;
    }

    // 맨 앞의 job 을 뺀다.
    std::function<void()> job = std::move(jobs_.front());
    jobs_.pop();
    lock.unlock();

    // 해당 job 을 수행한다 :)
    job();
  }
}

조건 변수 cv_job_q_ 에서 jobs_ 에 원소가 있거나, stop_all 이 설정될때 까지 기다립니다. 만약에 모든 작업들이 설정되어 있고 jobs_ 에 대기중인 작업이 없을 때 비로소 쓰레드를 종료하게 됩니다 (일이 없을 때 까지 퇴근을 못하는 슬픈 현실을 감안한 구현입니다.)

처리할 일이 있다면 간단히 jobs_.front() 를 통해 가장 오래전에 추가된 작업을 얻은 뒤에 해당 작업을 실행하면 됩니다.

그렇다면 작업을 추가하는 함수를 어떨까요?

void ThreadPool::EnqueueJob(std::function<void()> job) {
  if (stop_all) {
    throw std::runtime_error("ThreadPool 사용 중지됨");
  }
  {
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push(std::move(job));
  }
  cv_job_q_.notify_one();
}

크게 복잡하지 않습니다. 일단 이미 stop_all 이 설정된 상태라면 더이상 작업을 추가하면 안되기에 예외를 던지도록 하였습니다. 그렇지 않을 경우 간단히 작업을 추가한 뒤에 자고 있는 쓰레드 하나만 깨워주면 됩니다.

마지막으로 소멸자는 아래와 같습니다.

ThreadPool::~ThreadPool() {
  stop_all = true;
  cv_job_q_.notify_all();

  for (auto& t : worker_threads_) {
    t.join();
  }
}

stop_all 을 설정한 뒤에, 모든 Worker 쓰레드들에 알려줍니다. 그 후 모든 쓰레드들을 join 하면 됩니다.

전체 코드를 보면 아래와 같습니다.

전체 구현 (1)

#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

namespace ThreadPool {
class ThreadPool {
 public:
  ThreadPool(size_t num_threads);
  ~ThreadPool();

  // job 을 추가한다.
  void EnqueueJob(std::function<void()> job);

 private:
  // 총 Worker 쓰레드의 개수.
  size_t num_threads_;
  // Worker 쓰레드를 보관하는 벡터.
  std::vector<std::thread> worker_threads_;
  // 할일들을 보관하는 job 큐.
  std::queue<std::function<void()>> jobs_;
  // 위의 job 큐를 위한 cv 와 m.
  std::condition_variable cv_job_q_;
  std::mutex m_job_q_;

  // 모든 쓰레드 종료
  bool stop_all;

  // Worker 쓰레드
  void WorkerThread();
};

ThreadPool::ThreadPool(size_t num_threads)
    : num_threads_(num_threads), stop_all(false) {
  worker_threads_.reserve(num_threads_);
  for (size_t i = 0; i < num_threads_; ++i) {
    worker_threads_.emplace_back([this]() { this->WorkerThread(); });
  }
}

void ThreadPool::WorkerThread() {
  while (true) {
    std::unique_lock<std::mutex> lock(m_job_q_);
    cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
    if (stop_all && this->jobs_.empty()) {
      return;
    }

    // 맨 앞의 job 을 뺀다.
    std::function<void()> job = std::move(jobs_.front());
    jobs_.pop();
    lock.unlock();

    // 해당 job 을 수행한다 :)
    job();
  }
}

ThreadPool::~ThreadPool() {
  stop_all = true;
  cv_job_q_.notify_all();

  for (auto& t : worker_threads_) {
    t.join();
  }
}

void ThreadPool::EnqueueJob(std::function<void()> job) {
  if (stop_all) {
    throw std::runtime_error("ThreadPool 사용 중지됨");
  }
  {
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push(std::move(job));
  }
  cv_job_q_.notify_one();
}

}  // namespace ThreadPool

void work(int t, int id) {
  printf("%d start \n", id);
  std::this_thread::sleep_for(std::chrono::seconds(t));
  printf("%d end after %ds\n", id, t);
}

int main() {
  ThreadPool::ThreadPool pool(3);

  for (int i = 0; i < 10; i++) {
    pool.EnqueueJob([i]() { work(i % 3 + 1, i); });
  }
}

성공적으로 컴파일 하였다면

와 같이 잘 실행됨을 알 수 있습니다.

쓰레드풀에 작업을 추가하는 것은 아래와 같습니다.

pool.EnqueueJob([i]() { work(i % 3 + 1, i); });

앞서 쓰레드풀이 받는 함수의 형태가 리턴 타입이 void 이고 인자를 받지 않는다고 하였습니다. 따라서 work 함수를 그대로 전달할 수 는 없습니다. 왜냐하면 int 타입 인자 두 개를 받기 때문이지요. 하지만 크게 문제될 것은 없습니다. 위와 같이 void() 형태의 람다 함수로 감싸서 전달하면 되기 때문이지요.

임의의 함수 받기

안타깝게도 현재 구현한 ThreadPool 의 경우 부족한 점이 하나 있습니다. 바로 우리가 전달한 함수가 어떠한 값을 리턴할 때 입니다. 물론 그 함수에 포인터로 리턴값을 저장할 변수를 전달하면 되기는 합니다. 하지만, 기존의 future 처럼 그 값이 설정될 때 까지 기다리는 것은 불가능 합니다.

따라서 더 나은 구조로는 EnqueueJob 함수가 임의의 형태의 함수를 받고, 그 함수의 리턴값을 보관하는 future 를 리턴하는 꼴이면 더 좋을 것 같습니다.

// job 을 추가한다.
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(F f,
                                                                  Args... args);

이를 반영한 것이 바로 위 EnqueueJob 함수 입니다. 엄청 복잡해 보이지만 차근차근 뜯어보면 간단합니다.

template <class F, class... Args>

class... 은 가변 길이 템플릿으로 임의의 길이의 인자들을 받을 수 있습니다. 예를 들어서

EnqueueJob(func, 1, 2, 3);

와 같이 함수를 호출하였을 때 첫 번째 인자인 funcf 에 들어가게 되고, 나머지 1, 2, 3args... 부분에 들어가게 됩니다. 그렇다면 이 EnqueueJob 함수는 무엇을 리턴할까요?

간단히 생각해보면 전달받은 함수 f 의 리턴값을 가지는 future 를 리턴해야 할 것입니다. 함수 F 의 리턴값은 std::result_of 를 사용하면 알 수 있습니다.

typename std::result_of<F(Args...)>::type  // f 의 리턴값

따라서 EnqueueJob 의 정의는 그냥

// job 을 추가한다.
template <class F, class... Args>
std::future</* f 의 리턴 타입*/> EnqueueJob(F f, Args... args);

이라고 생각하시면 됩니다.

그런데 임의의 함수와 원소들을 받을 수 있다고 해서, 이를 컨테이너에 추가할 수 있다는 것은 아닙니다. 어떻게 하면 해당 함수의 실행을 void() 꼴의 함수만 저장할 수 있는 컨테이너에 넣을 수 있을까요?

그야 간단합니다. 그냥

jobs_.push([f, args...]() { f(args...); });

을 한다면 Worker 쓰레드 안에서 f(args...) 를 실행 할 수 있습니다. 그런데 이와 같은 형태는 한 가지 문제점이 있습니다. 바로 f(args...) 의 리턴값을 얻을 길이 없어진다는 것입니다.

하지만 우리는 이전 강좌를 통해 비동기적으로 실행되는 함수의 리턴값 (더 나아가 예외 까지) 받아내는 법을 알고 있습니다. 바로 packaged_task 를 이용하는 것입니다!

using return_type = typename std::result_of<F(Args...)>::type;
std::packaged_task<return_type()> job(std::bind(f, args...));

편의상 return_type 라는 f 의 리턴타입을 보관하는 타입을 정의하였고, 그 밑에 f 의 실행 결과를 저장하는 packaged_taskjob 객체를 정의하였습니다.

한 가지 중요한 점은 packaged_task 의 생성자는 함수 만을 받기 때문에, 실제 job 을 수행하기 위해서는 job(args...) 와 같이 호출하거나, 아니면 위 처럼 그냥 인자들을 fbind 시켜주면 됩니다. 우리의 경우 bind 를 하는 것으로 선택하였습니다.

std::future<return_type> job_result_future = job.get_future();
{
  std::lock_guard<std::mutex> lock(m_job_q_);
  jobs_.push([&job]() { job(); });
}

그 후에 job 의 실행 결과를 보관하는 job_result_future 를 정의하였고, 마지막으로 jobs_job 을 실행하는 람다 함수를 추가하였습니다. job 이 실행된다면, f 의 리턴값이 job_result_future 에 들어가게 되고, 이는 쓰레드풀 사용자가 접근할 수 있게 됩니다.

#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

namespace ThreadPool {
class ThreadPool {
 public:
  ThreadPool(size_t num_threads);
  ~ThreadPool();

  // job 을 추가한다.
  template <class F, class... Args>
  std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
    F f, Args... args);

 private:
  // 총 Worker 쓰레드의 개수.
  size_t num_threads_;
  // Worker 쓰레드를 보관하는 벡터.
  std::vector<std::thread> worker_threads_;
  // 할일들을 보관하는 job 큐.
  std::queue<std::function<void()>> jobs_;
  // 위의 job 큐를 위한 cv 와 m.
  std::condition_variable cv_job_q_;
  std::mutex m_job_q_;

  // 모든 쓰레드 종료
  bool stop_all;

  // Worker 쓰레드
  void WorkerThread();
};

ThreadPool::ThreadPool(size_t num_threads)
    : num_threads_(num_threads), stop_all(false) {
  worker_threads_.reserve(num_threads_);
  for (size_t i = 0; i < num_threads_; ++i) {
    worker_threads_.emplace_back([this]() { this->WorkerThread(); });
  }
}

void ThreadPool::WorkerThread() {
  while (true) {
    std::unique_lock<std::mutex> lock(m_job_q_);
    cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
    if (stop_all && this->jobs_.empty()) {
      return;
    }

    // 맨 앞의 job 을 뺀다.
    std::function<void()> job = std::move(jobs_.front());
    jobs_.pop();
    lock.unlock();

    // 해당 job 을 수행한다 :)
    job();
  }
}

ThreadPool::~ThreadPool() {
  stop_all = true;
  cv_job_q_.notify_all();

  for (auto& t : worker_threads_) {
    t.join();
  }
}

template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
  F f, Args... args) {
  if (stop_all) {
    throw std::runtime_error("ThreadPool 사용 중지됨");
  }

  using return_type = typename std::result_of<F(Args...)>::type;
  std::packaged_task<return_type()> job(std::bind(f, args...));

  std::future<return_type> job_result_future = job.get_future();
  {
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push([&job]() { job(); });
  }
  cv_job_q_.notify_one();

  return job_result_future;
}

}  // namespace ThreadPool

int work(int t, int id) {
  printf("%d start \n", id);
  std::this_thread::sleep_for(std::chrono::seconds(t));
  printf("%d end after %ds\n", id, t);
  return t + id;
}

int main() {
  ThreadPool::ThreadPool pool(3);

  std::vector<std::future<int>> futures;
  for (int i = 0; i < 10; i++) {
    futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i));
  }
  for (auto& f : futures) {
    printf("result : %d \n", f.get());
  }
}

성공적으로 컴파일 후 실행하였다면 아래와 같이 런타임 오류가 발생합니다.

실행 결과

0 start 
2 start 
terminate called after throwing an instance of '4 start 
std::future_error'
  what():  std::future_error: Broken promise
[1]    28513 abort (core dumped)  ./threadpool

보시다시피 Broken promise 예외가 던져졌습니다. Broken promise 예외는 promiseset_value 를 하기 전에 이미 promisefuture 객체가 파괴되었다면 발생하는 예외 입니다. 그렇다면 왜 future 객체가 파괴되었을까요? 그 이유는 간단합니다.

std::packaged_task<return_type()> job(std::bind(f, args...));

EnqueueJob 함수에 정의된 job 객체는 지역 변수 입니다. 즉, EnqueueJob 함수가 리턴하면 파괴되는 객체입니다. 따라서 [&job]() { job(); } 안에서 job 을 접근할 때 이미 그 객체는 파괴되고 없어져있을 것입니다.

이 문제를 해결하는 방법으로 크게 두 가지를 생각해볼 수 있습니다.

  1. packaged_task 를 따로 컨테이너에 저장해서 보관한다.

  2. shared_ptrpackaged_task 를 보관한다.

(1) 번 방식의 경우 더 이상 packaged_task 를 사용하지 않을 때에도 컨테이너에 남아있다는 문제가 있습니다. 하지만 (2) 의 경우 packaged_task 를 사용하는 것이 없을 때 알아서 shared_ptr 가 객체를 소멸시켜주므로 훨씬 관리하기 편합니다. 따라서 후자를 택하도록 하겠습니다. 이를 구현하면 아래와 같습니다.

auto job =
  std::make_shared<std::packaged_task<return_type()>>(std::bind(f, args...));
std::future<return_type> job_result_future = job->get_future();
{
  std::lock_guard<std::mutex> lock(m_job_q_);
  jobs_.push([job]() { (*job)(); });
}

위와 같이 간단히 make_shared 를 통해서 shared_ptr 을 생성하였고, 대신에 람다 함수에 shared_ptr 의 복사본을 전달해서 람다 함수 안에서도 packaged_taskshared_ptr 하나를 붙들고 있게 되었습니다.

따라서 job 을 실행하는 시점에서도 packged_task 객체는 계속 살아있게 됩니다.

#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

namespace ThreadPool {
class ThreadPool {
 public:
  ThreadPool(size_t num_threads);
  ~ThreadPool();

  // job 을 추가한다.
  template <class F, class... Args>
  std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
    F f, Args... args);

 private:
  // 총 Worker 쓰레드의 개수.
  size_t num_threads_;
  // Worker 쓰레드를 보관하는 벡터.
  std::vector<std::thread> worker_threads_;
  // 할일들을 보관하는 job 큐.
  std::queue<std::function<void()>> jobs_;
  // 위의 job 큐를 위한 cv 와 m.
  std::condition_variable cv_job_q_;
  std::mutex m_job_q_;

  // 모든 쓰레드 종료
  bool stop_all;

  // Worker 쓰레드
  void WorkerThread();
};

ThreadPool::ThreadPool(size_t num_threads)
    : num_threads_(num_threads), stop_all(false) {
  worker_threads_.reserve(num_threads_);
  for (size_t i = 0; i < num_threads_; ++i) {
    worker_threads_.emplace_back([this]() { this->WorkerThread(); });
  }
}

void ThreadPool::WorkerThread() {
  while (true) {
    std::unique_lock<std::mutex> lock(m_job_q_);
    cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
    if (stop_all && this->jobs_.empty()) {
      return;
    }

    // 맨 앞의 job 을 뺀다.
    std::function<void()> job = std::move(jobs_.front());
    jobs_.pop();
    lock.unlock();

    // 해당 job 을 수행한다 :)
    job();
  }
}

ThreadPool::~ThreadPool() {
  stop_all = true;
  cv_job_q_.notify_all();

  for (auto& t : worker_threads_) {
    t.join();
  }
}

template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
  F f, Args... args) {
  if (stop_all) {
    throw std::runtime_error("ThreadPool 사용 중지됨");
  }

  using return_type = typename std::result_of<F(Args...)>::type;
  auto job =
    std::make_shared<std::packaged_task<return_type()>>(std::bind(f, args...));
  std::future<return_type> job_result_future = job->get_future();
  {
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push([job]() { (*job)(); });
  }
  cv_job_q_.notify_one();

  return job_result_future;
}

}  // namespace ThreadPool

int work(int t, int id) {
  printf("%d start \n", id);
  std::this_thread::sleep_for(std::chrono::seconds(t));
  printf("%d end after %ds\n", id, t);
  return t + id;
}

int main() {
  ThreadPool::ThreadPool pool(3);

  std::vector<std::future<int>> futures;
  for (int i = 0; i < 10; i++) {
    futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i));
  }
  for (auto& f : futures) {
    printf("result : %d \n", f.get());
  }
}

성공적으로 컴파일 하였다면

와 같이 잘 나옵니다.

완벽한 전달

자 이제 거의 다 왔습니다. 우리의 EnqueueJob 함수의 경우 다 좋지만 한 가지 문제점이 있는데 바로

ThreadPool::EnqueueJob(F f, Args... args);

위와 같이 인자들의 복사본을 받는다는 것입니다. 하지만 이는 불필요한 복사를 야기하므로 완벽한 전달 패턴을 사용하는 것이 좋겠습니다.

이는 크게 어렵지 않습니다. 먼저 EnqueueJob 함수의 인자들을 우측값 레퍼런스로 바꾼 뒤에;

template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
  F&& f, Args&&... args);

bind 함수에 forward 로 인자를 전달해주면 됩니다.

auto job = std::make_shared<std::packaged_task<return_type()>>(
  std::bind(std::forward<F>(f), std::forward<Args>(args)...));

그렇다면 불필요한 복사 없이 Enqueue 함수에 인자들을 완벽히 전달할 수 있게 됩니다. 따라서 최종 우리의 ThreadPool 은 아래와 같습니다.

최종 ThreadPool 구현 버전

#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

namespace ThreadPool {
class ThreadPool {
 public:
  ThreadPool(size_t num_threads);
  ~ThreadPool();

  // job 을 추가한다.
  template <class F, class... Args>
  std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
    F&& f, Args&&... args);

 private:
  // 총 Worker 쓰레드의 개수.
  size_t num_threads_;
  // Worker 쓰레드를 보관하는 벡터.
  std::vector<std::thread> worker_threads_;
  // 할일들을 보관하는 job 큐.
  std::queue<std::function<void()>> jobs_;
  // 위의 job 큐를 위한 cv 와 m.
  std::condition_variable cv_job_q_;
  std::mutex m_job_q_;

  // 모든 쓰레드 종료
  bool stop_all;

  // Worker 쓰레드
  void WorkerThread();
};

ThreadPool::ThreadPool(size_t num_threads)
    : num_threads_(num_threads), stop_all(false) {
  worker_threads_.reserve(num_threads_);
  for (size_t i = 0; i < num_threads_; ++i) {
    worker_threads_.emplace_back([this]() { this->WorkerThread(); });
  }
}

void ThreadPool::WorkerThread() {
  while (true) {
    std::unique_lock<std::mutex> lock(m_job_q_);
    cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
    if (stop_all && this->jobs_.empty()) {
      return;
    }

    // 맨 앞의 job 을 뺀다.
    std::function<void()> job = std::move(jobs_.front());
    jobs_.pop();
    lock.unlock();

    // 해당 job 을 수행한다 :)
    job();
  }
}

ThreadPool::~ThreadPool() {
  stop_all = true;
  cv_job_q_.notify_all();

  for (auto& t : worker_threads_) {
    t.join();
  }
}

template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
  F&& f, Args&&... args) {
  if (stop_all) {
    throw std::runtime_error("ThreadPool 사용 중지됨");
  }

  using return_type = typename std::result_of<F(Args...)>::type;
  auto job = std::make_shared<std::packaged_task<return_type()>>(
    std::bind(std::forward<F>(f), std::forward<Args>(args)...));
  std::future<return_type> job_result_future = job->get_future();
  {
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push([job]() { (*job)(); });
  }
  cv_job_q_.notify_one();

  return job_result_future;
}

}  // namespace ThreadPool

// 사용 예시
int work(int t, int id) {
  printf("%d start \n", id);
  std::this_thread::sleep_for(std::chrono::seconds(t));
  printf("%d end after %ds\n", id, t);
  return t + id;
}

int main() {
  ThreadPool::ThreadPool pool(3);

  std::vector<std::future<int>> futures;
  for (int i = 0; i < 10; i++) {
    futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i));
  }
  for (auto& f : futures) {
    printf("result : %d \n", f.get());
  }
}

성공적으로 컴파일 하였다면

와 같이 잘 실행됩니다 :)

자 그럼 이것으로 이번 강좌를 마치도록 하겠습니다.

C++ 강좌도 점점 마무리를 향해 가는것 같습니다. 다음 강좌들에서는 이전 강좌들에서 채 다루지 못했던 C++ 11 에서 새로 추가된 문법 요소와, 더 나아가 몇몇 새로운 라이브러리들을 다룰 예정입니다.

강좌를 보다가 조금이라도 궁금한 것이나 이상한 점이 있다면 꼭 댓글을 남겨주시기 바랍니다. 그 외에도 강좌에 관련된 것이라면 어떠한 것도 질문해 주셔도 상관 없습니다. 생각해 볼 문제도 정 모르겠다면 댓글을 달아주세요.

현재 여러분이 보신 강좌는 <씹어먹는 C++ - <15 - 5. C++ 쓰레드풀(ThreadPool) 만들기>> 입니다. 이번 강좌의 모든 예제들의 코드를 보지 않고 짤 수준까지 강좌를 읽어 보시기 전까지 다음 강좌로 넘어가지 말아주세요
댓글이 64 개 있습니다!
프로필 사진 없음
강좌에 관련 없이 궁금한 내용은 여기를 사용해주세요

    댓글을 불러오는 중입니다..