Добавить std :: packaged_task в существующий поток? - PullRequest
0 голосов
/ 26 октября 2018

Есть ли стандартный способ добавить std::packaged_task в существующий поток?Перед запуском задачи должно произойти нетривиальное количество накладных расходов, поэтому я хочу сделать это один раз, а затем продолжить выполнение потока и ожидать выполнения задач.Я хочу иметь возможность использовать фьючерсы, чтобы при желании получить результат задачи и перехватывать исключения.

Моя реализация до C ++ 11 требует, чтобы мои задачи наследовали от абстрактного базового класса с Run() метод (немного больно, я не могу использовать лямбды) и наличие std::deque коллекции тех, которые я добавляю в основной поток и исключаю из него в рабочем потоке.Я должен защитить эту коллекцию от одновременного доступа и дать сигнал рабочему потоку, что нужно что-то делать, чтобы он не вращался и не спал.При вызове чего-либо возвращается объект «результата» с объектом синхронизации, ожидающим завершения задачи, и значение результата.Все работает хорошо, но пришло время для обновления, если есть что-то лучше.

1 Ответ

0 голосов
/ 31 октября 2018

Вот пул потоков игрушек:

template<class T>
struct threaded_queue {
  using lock = std::unique_lock<std::mutex>;
  void push_back( T t ) {
    {
      lock l(m);
      data.push_back(std::move(t));
    }
    cv.notify_one();
  }
  boost::optional<T> pop_front() {
    lock l(m);
    cv.wait(l, [this]{ return abort || !data.empty(); } );
    if (abort) return {};
    auto r = std::move(data.back());
    data.pop_back();
    return std::move(r);
  }
  void terminate() {
    {
      lock l(m);
      abort = true;
      data.clear();
    }
    cv.notify_all();
  }
  ~threaded_queue()
  {
    terminate();
  }
private:
  std::mutex m;
  std::deque<T> data;
  std::condition_variable cv;
  bool abort = false;
};
struct thread_pool {
  thread_pool( std::size_t n = 1 ) { start_thread(n); }
  thread_pool( thread_pool&& ) = delete;
  thread_pool& operator=( thread_pool&& ) = delete;
  ~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks
  template<class F, class R=std::result_of_t<F&()>>
  std::future<R> queue_task( F task ) {
    std::packaged_task<R()> p(std::move(task));
    auto r = p.get_future();
    tasks.push_back( std::move(p) );
    return r;
  }
  template<class F, class R=std::result_of_t<F&()>>
  std::future<R> run_task( F task ) {
    if (threads_active() >= total_threads()) {
      start_thread();
    }
    return queue_task( std::move(task) );
  }
  void terminate() {
    tasks.terminate();
  }
  std::size_t threads_active() const {
    return active;
  }
  std::size_t total_threads() const {
    return threads.size();
  }
  void clear_threads() {
    terminate();
    threads.clear();
  }
  void start_thread( std::size_t n = 1 ) {
    while(n-->0) {
      threads.push_back(
        std::async( std::launch::async,
          [this]{
            while(auto task = tasks.pop_front()) {
              ++active;
              try{
                (*task)();
              } catch(...) {
                --active;
                throw;
              }
              --active;
            }
          }
        )
      );
    }
  }
private:
  std::vector<std::future<void>> threads;
  threaded_queue<std::packaged_task<void()>> tasks;
  std::atomic<std::size_t> active;
};

скопировано с другого ответа моего.

A thread_pool с 1 нитью в значительной степени соответствует вашему описанию.

Выше - только игрушка, реальный пул потоков, я бы заменил std::packaged_task<void()> на move_only_function<void()>, и это все, для чего я его использую. (A packaged_task<void()> может держать packaged_task<R()> забавно, если неэффективно).

Вам придется подумать о закрытии и составить план. Приведенный выше код блокируется, если вы пытаетесь выключить его без предварительной очистки потоков.

...