C ++ темы и обещания - PullRequest
       12

C ++ темы и обещания

1 голос
/ 01 декабря 2019

У меня есть потокобезопасная очередь, которая заполнена работой основного потока;когда я запускаю свои рабочие потоки, они выталкивают задачу из рабочей очереди, но также могут отправить новую задачу обратно в рабочую очередь. Краткий обзор кода:

auto work_queue = safe_queue{};

static void handle_task(T task) {
  // process the task
  // might push a new task to a work queue using work_queue.push()
}

int main() {
  //some work is done to prepopulate work_queue

  auto handle_work = [](){
    while (!work_queue.empty) {
      T task = work_queue.pop();
      handle_task(task);
    }
  };

  std::vector<std::thread> threads;
  for (int i = 0; i < NUM_OF_THREADS; i++) {
    threads.push_back(std::thread(processing));
  }

  std::for_each(threads.begin(), threads.end(), [](std::thread &t) {
    t.join();
  }
}

Я понимаю, что этот код не будет работать правильно, поскольку в некоторых случаях очередь может быть пустой, в то время как некоторые рабочие потоки обрабатывают работу, когда приходит другой поток, не находитработа и выходы (хотя задачи обработки потоков могут отодвинуть новую работу в очередь). У меня вопрос, как предотвратить преждевременное завершение потоков? Будет ли это работать с использованием std::promise<void>, чтобы позволить потокам общаться с другими потоками, которые они все еще могут работать? Если так, как это будет работать с несколькими потоками (я новичок в c ++ и использовал обещания только с одним потоком)?

Ответы [ 2 ]

1 голос
/ 01 декабря 2019

Я попробовал что-то совершенно другое с std::condition_variable:

#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <optional>
#include <algorithm>

constexpr int NUM_OF_THREADS = 5;
std::condition_variable input_cv;
std::condition_variable callback_cv;
std::optional<int> data {};
std::mutex m;
std::mutex callback_mutex;
int finished_threads = 0;

static void handle_task(int i)
{
}

struct worker 
{
    const int num;
    worker () = delete;
    void operator()() {
        while(true){
            std::unique_lock<std::mutex> lk(m);
            ++finished_threads; // protected by m
            callback_cv.notify_one(); // wake up main thread if it sleeps
            input_cv.wait(lk);
            if(!data)
                return;
            --finished_threads; // protected by m
            int local_data = *data;
            lk.unlock();

            handle_task(local_data);
        }
    }
};


struct safe_queue // dummy
{
    int pop () const { return 0; }
    bool empty() const {return true;}
};

void main_thread ()
{
    std::vector<std::thread> workers;
    safe_queue work_queue;
    for(int i = 0; i < NUM_OF_THREADS; ++i)
    {
        workers.emplace_back(worker{i});
    }

    do
    {
        do
        {
            {
                std::lock_guard<std::mutex> guard(m);
                data.emplace(work_queue.pop());
            }
            input_cv.notify_one();
        } while(!work_queue.empty() && finished_threads > 0)
        // If no thread has finished, we can wait for the next one to finish.

        std::unique_lock<std::mutex> lk(callback_mutex);
        callback_cv.wait(lk); // We wait on some thread to have finished
    }while(finished_threads < NUM_OF_THREADS && !work_queue.empty()); // In either case, there remains something to do.

    data = {};
    input_cv.notify_all();

    std::for_each(begin(workers), end(workers), [](std::thread &t) { t.join();});
}


int main()
{

    std::thread t(main_thread);

    t.join();

}

Не уверен, что это лучше, но это, конечно, сложнее ^^.

1 голос
/ 01 декабря 2019

Я не думаю, что std::promise<void> может быть использовано здесь, поскольку это скорее единичная вещь. После того, как результат установлен в будущем, он не может быть сброшен, и, следовательно, мы не можем дважды ждать одно и то же обещание.

Можно сделать следующее (счетчик должен быть безопасным для потоков, но я был ленивымчтобы сделать это прямо сейчас):

int finished_threads = 0;
auto handle_work = [&finished_threads](){
    bool this_finished = false;
    while (finished_threads < NUM_OF_THREADS) {
      while (!work_queue.empty) {
        if(this_finished) {
          this_finished = false;
          --finished_threads;  // evil
        }
        T task = work_queue.pop();
        handle_task(task);
      }
      if(!this_finished) {
        this_finished = true;
        ++finished_threads; // evil
      }
    }
  };

Это должно завершиться только после того, как все потоки закончили (как в: Они больше не обрабатывают задачу, и ее нет в очереди). Тогда никакое новое задание больше не будет помещено в очередь. Локальная переменная минимизирует доступ к разделяемой памяти.

Обратите внимание, что мой опыт в многопоточном программировании очень ограничен.

...