Используйте отдельный поток для выполнения списка команд в c ++ - PullRequest
1 голос
/ 11 июля 2020

У меня есть класс DataClass, в котором есть данные, которые должны быть generated и cleaned. Я не знаю, в какое время во время выполнения программы у меня появятся новые данные, и я пытался использовать несколько потоков, чтобы позволить основному потоку работать, пока данные обрабатываются.

это DataClass:

class DataClass
{
public:
    unsigned int state{0};

    
    void Generate()
    {
        using namespace std::chrono_literals;
        std::this_thread::sleep_for(3s);
        state = 1;
    }

    void Clean()
    {
        using namespace std::chrono_literals;
        std::this_thread::sleep_for(1s);
        state = 2;
    }
};

Я разделяю каждый объект DataClass на два std::deque, один с теми, которые необходимо сгенерировать, а другой с теми, которые необходимо очистить.

std::deque<DataClass*> dataToGenerate;
std::deque<DataClass*> dataToClean;

Я использую две функции CleanerFunction и GeneratorFunction, которые будут обрабатывать содержимое двух списков.

GeneratorFunction:

void GeneratorFunction()
{
    while (!dataToGenerate.empty())
    {
        auto* c = dataToGenerate.front();
        c->Generate();
        dataToGenerate.pop_front();
        dataToClean.push_back(c);
        std::cout << "Generated one Data Piece." << std::endl;
    }
}

( Более чистый похож).

И в основной функции я запускаю два потока для двух функций. Но потоки останавливаются мгновенно, так как два списка пусты, и поэтому мне каждый раз нужно создавать новый. Я изучил condition_variables, но не могу заставить их работать должным образом, поскольку все примеры, которые я нашел в Интернете, отличались от этого сценария. Насколько я понял, если я использую while (! DataToGenerate.empty ()), я буду выполнять эту строку, всегда заполняя поток без какой-либо реальной причины, поэтому я не хотел использовать этот метод.

Есть ли способ, которым я мог бы приостановить каждый поток, пока списки больше не станут пустыми, а затем запустить поток?

1 Ответ

0 голосов
/ 12 июля 2020

Соотношение XY этого вопроса кажется довольно высоким. Я бы прочитал об организации очереди и блокировке задач и перефразирую ваше решение с учетом этого.

В частности, вы найдете это близко к шаблону производителя / потребителя.

Queue

Начнем с минимальной общей c очереди блокировки:

template <typename T>
struct Queue {
    Queue(size_t max = 50) : _max(max) {}

    size_t enqueue(T v) {
        std::unique_lock lk(_mx);
        _cond.wait(lk, [this] { return (_max == 0) || (_storage.size() < _max); });

        _storage.push_back(std::move(v));
        _cond.notify_one();
        return _storage.size(); // NOTE: very racy load indicator
    }

    template <typename Duration>
    std::optional<T> dequeue(Duration d) {
        std::unique_lock lk(_mx);

        if (_cond.wait_for(lk, d, [this] { return !_storage.empty(); })) {
            auto top = std::move(_storage.front());

            _storage.pop_front();
            _cond.notify_one();

            return top;
        }

        return std::nullopt;
    }

  private:
    size_t _max;
    mutable std::mutex _mx;
    mutable std::condition_variable _cond;
    std::deque<T> _storage;
};

Это никогда не блокируется при удалении из очереди (так что вы можете обнаруживать и обрабатывать пустую очередь). В принципе, он не будет блокироваться при постановке в очередь, если не будет достигнут определенный предел. Установите предел 0, чтобы иметь неограниченную очередь.

Теперь вы можете иметь любое количество очередей с любым количеством производителей / потребителей, с которыми вы sh. Например:

Program Logi c

struct DataClass {
    int id;
    unsigned int state{ 0 };

    DataClass(int id) : id(id) {}

    void Generate() { sleep_for(3s); state = 1; }
    void Clean()    { sleep_for(1s); state = 2; }
};

Теперь давайте создадим программу с 4 потоками генератора и 2 потоками очистки, контролирующими 2 очереди (genTasks и cleanTasks).

struct Program {
    Program() {
        auto worker_id = 1;
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
    }

    size_t createWork(DataClass task) {
        return genTasks.enqueue(std::move(task));
    }

    ~Program() {
        _shutdown = true;
        for (auto& th: _workers) 
            if (th.joinable()) th.join();
    }

  private:
    Queue<DataClass> genTasks, cleanTasks;
    std::atomic_bool _shutdown { false };
    std::list<std::thread> _workers;

    void generate_worker(int worker_id) {
        while (!_shutdown) {
            while (auto task = genTasks.dequeue(1s)) {
                std::cout << "Worker #" << worker_id << " Generate: " << task->id << std::endl;
                task->Generate();
                cleanTasks.enqueue(std::move(*task));
            }
        }
        std::cout << "Worker #" << worker_id << " Exit generate_worker" << std::endl;
    }

    void clean_worker(int worker_id) {
        while (!_shutdown) {
            while (auto task = cleanTasks.dequeue(1s)) {
                std::cout << "Worker #" << worker_id << " Clean: " << task->id << std::endl;
                task->Clean();
                std::cout << "Worker #" << worker_id << " Done: " << task->id << std::endl;
            }
        }
        std::cout << "Worker #" << worker_id << " Exit clean_worker" << std::endl;
    }
};

Я добавил флаг _shutdown для хорошей меры, хотя он не очень силен (он ждет, пока рабочие не будут бездействовать хотя бы одну секунду (dequeue(1s)). Если вы хотите более навязчивое отключение, добавьте немного if (_shutdown) break; операторы через рабочие циклы.

Полная демонстрация

Давайте поработаем над этим:

int main() {
    Program p; 

    for (auto i : {1,2,3,4,5,6,7,8,9,10}) {
        sleep_for((rand()%100) * 1ms);
        p.createWork(i);
    }

    sleep_for(2.5s);
    std::cout << "Load at createWork(42) is ~" << p.createWork(42) << std::endl;
    sleep_for(2.5s);
    std::cout << "Load at createWork(43) is ~" << p.createWork(43) << std::endl;

    sleep_for(4s);
    std::cout << "Initiating shutdown\n";
    // Program destructor performs shutdown
}

Выводит

Live On Coliru

Worker #2 Generate: 1
Worker #1 Generate: 2
Worker #3 Generate: 3
Worker #4 Generate: 4
Worker #2 Generate: 5
Worker #5 Clean: 1
Load at createWork(42) is ~6
Worker #1 Generate: 6
Worker #6 Clean: 2
Worker #3 Generate: 7
Worker #4 Generate: 8
Worker #5 Done: 1
Worker #5 Clean: 3
Worker #6 Done: 2
Worker #6 Clean: 4
Worker #5 Done: 3
Worker #6 Done: 4
Load at createWork(43) is ~4
Worker #2 Generate: 9
Worker #5 Clean: 5
Worker #1 Generate: 10
Worker #6 Clean: 6
Worker #3 Generate: 42
Worker #4 Generate: 43
Worker #5 Done: 5
Worker #5 Clean: 7
Worker #6 Done: 6
Worker #6 Clean: 8
Worker #5 Done: 7
Worker #6 Done: 8
Worker #5 Clean: 9
Worker #6 Clean: 10
Initiating shutdown
Worker #2 Exit generate_worker
Worker #5 Done: 9
Worker #5 Clean: 42
Worker #1 Exit generate_worker
Worker #6 Done: 10
Worker #6 Clean: 43
Worker #3 Exit generate_worker
Worker #4 Exit generate_worker
Worker #5 Done: 42
Worker #6 Done: 43
Worker #5 Exit clean_worker
Worker #6 Exit clean_worker
Unfinished generate/clean tasks: 0/0

Полный листинг

Live On Coliru

#include <mutex>
#include <condition_variable>
#include <deque>
#include <optional>

template <typename T>
struct Queue {
    Queue(size_t max = 50) : _max(max) {}

    size_t enqueue(T v) {
        std::unique_lock lk(_mx);
        _cond.wait(lk, [this] { return (_max == 0) || (_storage.size() < _max); });

        _storage.push_back(std::move(v));
        _cond.notify_one();
        return _storage.size(); // NOTE: very racy load indicator
    }

    template <typename Duration>
    std::optional<T> dequeue(Duration d) {
        std::unique_lock lk(_mx);

        if (_cond.wait_for(lk, d, [this] { return !_storage.empty(); })) {
            auto top = std::move(_storage.front());

            _storage.pop_front();
            _cond.notify_one();

            return top;
        }

        return std::nullopt;
    }

    size_t size() const { // racy in multi-thread situations
        std::unique_lock lk(_mx);
        return _storage.size();
    }

  private:
    size_t _max;
    mutable std::mutex _mx;
    mutable std::condition_variable _cond;
    std::deque<T> _storage;
};

#include <chrono>
#include <thread>
#include <iostream>
#include <list>
#include <atomic>
using namespace std::chrono_literals;
static inline auto sleep_for = [](auto d) { std::this_thread::sleep_for(d); };

struct DataClass {
    int id;
    unsigned int state{ 0 };

    DataClass(int id) : id(id) {}
    //DataClass(DataClass&&) = default;
    //DataClass& operator=(DataClass&&) = default;
    //DataClass(DataClass const&) = delete;

    void Generate() { sleep_for(3s); state = 1; }
    void Clean()    { sleep_for(1s); state = 2; }
};

struct Program {
    Program() {
        auto worker_id = 1;
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
        _workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
    }

    size_t createWork(DataClass task) {
        return genTasks.enqueue(std::move(task));
    }

    ~Program() {
        _shutdown = true;
        for (auto& th: _workers) 
            if (th.joinable()) th.join();
        std::cout << "Unfinished generate/clean tasks: " << genTasks.size() << "/" << cleanTasks.size() << "\n";
    }

  private:
    Queue<DataClass> genTasks, cleanTasks;
    std::atomic_bool _shutdown { false };
    std::list<std::thread> _workers;

    void generate_worker(int worker_id) {
        while (!_shutdown) {
            while (auto task = genTasks.dequeue(1s)) {
                std::cout << "Worker #" << worker_id << " Generate: " << task->id << std::endl;
                task->Generate();
                cleanTasks.enqueue(std::move(*task));
            }
        }
        std::cout << "Worker #" << worker_id << " Exit generate_worker" << std::endl;
    }

    void clean_worker(int worker_id) {
        while (!_shutdown) {
            while (auto task = cleanTasks.dequeue(1s)) {
                std::cout << "Worker #" << worker_id << " Clean: " << task->id << std::endl;
                task->Clean();
                std::cout << "Worker #" << worker_id << " Done: " << task->id << std::endl;
            }
        }
        std::cout << "Worker #" << worker_id << " Exit clean_worker" << std::endl;
    }
};

int main() {
    Program p; 

    for (auto i : {1,2,3,4,5,6,7,8,9,10}) {
        sleep_for((rand()%100) * 1ms);
        p.createWork(i);
    }

    sleep_for(2.5s);
    std::cout << "Load at createWork(42) is ~" << p.createWork(42) << std::endl;
    sleep_for(2.5s);
    std::cout << "Load at createWork(43) is ~" << p.createWork(43) << std::endl;

    sleep_for(4s);
    std::cout << "Initiating shutdown\n";
    // Program destructor performs shutdown
}
...