Соотношение XY этого вопроса кажется довольно высоким. Я бы прочитал об организации очереди и блокировке задач и перефразирую ваше решение с учетом этого.
В частности, вы найдете это близко к шаблону производителя / потребителя.
Queue
Начнем с минимальной общей c очереди блокировки:
template <typename T>
struct Queue {
Queue(size_t max = 50) : _max(max) {}
size_t enqueue(T v) {
std::unique_lock lk(_mx);
_cond.wait(lk, [this] { return (_max == 0) || (_storage.size() < _max); });
_storage.push_back(std::move(v));
_cond.notify_one();
return _storage.size(); // NOTE: very racy load indicator
}
template <typename Duration>
std::optional<T> dequeue(Duration d) {
std::unique_lock lk(_mx);
if (_cond.wait_for(lk, d, [this] { return !_storage.empty(); })) {
auto top = std::move(_storage.front());
_storage.pop_front();
_cond.notify_one();
return top;
}
return std::nullopt;
}
private:
size_t _max;
mutable std::mutex _mx;
mutable std::condition_variable _cond;
std::deque<T> _storage;
};
Это никогда не блокируется при удалении из очереди (так что вы можете обнаруживать и обрабатывать пустую очередь). В принципе, он не будет блокироваться при постановке в очередь, если не будет достигнут определенный предел. Установите предел 0, чтобы иметь неограниченную очередь.
Теперь вы можете иметь любое количество очередей с любым количеством производителей / потребителей, с которыми вы sh. Например:
Program Logi c
struct DataClass {
int id;
unsigned int state{ 0 };
DataClass(int id) : id(id) {}
void Generate() { sleep_for(3s); state = 1; }
void Clean() { sleep_for(1s); state = 2; }
};
Теперь давайте создадим программу с 4 потоками генератора и 2 потоками очистки, контролирующими 2 очереди (genTasks и cleanTasks).
struct Program {
Program() {
auto worker_id = 1;
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
_workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
}
size_t createWork(DataClass task) {
return genTasks.enqueue(std::move(task));
}
~Program() {
_shutdown = true;
for (auto& th: _workers)
if (th.joinable()) th.join();
}
private:
Queue<DataClass> genTasks, cleanTasks;
std::atomic_bool _shutdown { false };
std::list<std::thread> _workers;
void generate_worker(int worker_id) {
while (!_shutdown) {
while (auto task = genTasks.dequeue(1s)) {
std::cout << "Worker #" << worker_id << " Generate: " << task->id << std::endl;
task->Generate();
cleanTasks.enqueue(std::move(*task));
}
}
std::cout << "Worker #" << worker_id << " Exit generate_worker" << std::endl;
}
void clean_worker(int worker_id) {
while (!_shutdown) {
while (auto task = cleanTasks.dequeue(1s)) {
std::cout << "Worker #" << worker_id << " Clean: " << task->id << std::endl;
task->Clean();
std::cout << "Worker #" << worker_id << " Done: " << task->id << std::endl;
}
}
std::cout << "Worker #" << worker_id << " Exit clean_worker" << std::endl;
}
};
Я добавил флаг _shutdown
для хорошей меры, хотя он не очень силен (он ждет, пока рабочие не будут бездействовать хотя бы одну секунду (dequeue(1s)
). Если вы хотите более навязчивое отключение, добавьте немного if (_shutdown) break;
операторы через рабочие циклы.
Полная демонстрация
Давайте поработаем над этим:
int main() {
Program p;
for (auto i : {1,2,3,4,5,6,7,8,9,10}) {
sleep_for((rand()%100) * 1ms);
p.createWork(i);
}
sleep_for(2.5s);
std::cout << "Load at createWork(42) is ~" << p.createWork(42) << std::endl;
sleep_for(2.5s);
std::cout << "Load at createWork(43) is ~" << p.createWork(43) << std::endl;
sleep_for(4s);
std::cout << "Initiating shutdown\n";
// Program destructor performs shutdown
}
Выводит
Live On Coliru
Worker #2 Generate: 1
Worker #1 Generate: 2
Worker #3 Generate: 3
Worker #4 Generate: 4
Worker #2 Generate: 5
Worker #5 Clean: 1
Load at createWork(42) is ~6
Worker #1 Generate: 6
Worker #6 Clean: 2
Worker #3 Generate: 7
Worker #4 Generate: 8
Worker #5 Done: 1
Worker #5 Clean: 3
Worker #6 Done: 2
Worker #6 Clean: 4
Worker #5 Done: 3
Worker #6 Done: 4
Load at createWork(43) is ~4
Worker #2 Generate: 9
Worker #5 Clean: 5
Worker #1 Generate: 10
Worker #6 Clean: 6
Worker #3 Generate: 42
Worker #4 Generate: 43
Worker #5 Done: 5
Worker #5 Clean: 7
Worker #6 Done: 6
Worker #6 Clean: 8
Worker #5 Done: 7
Worker #6 Done: 8
Worker #5 Clean: 9
Worker #6 Clean: 10
Initiating shutdown
Worker #2 Exit generate_worker
Worker #5 Done: 9
Worker #5 Clean: 42
Worker #1 Exit generate_worker
Worker #6 Done: 10
Worker #6 Clean: 43
Worker #3 Exit generate_worker
Worker #4 Exit generate_worker
Worker #5 Done: 42
Worker #6 Done: 43
Worker #5 Exit clean_worker
Worker #6 Exit clean_worker
Unfinished generate/clean tasks: 0/0
Полный листинг
Live On Coliru
#include <mutex>
#include <condition_variable>
#include <deque>
#include <optional>
template <typename T>
struct Queue {
Queue(size_t max = 50) : _max(max) {}
size_t enqueue(T v) {
std::unique_lock lk(_mx);
_cond.wait(lk, [this] { return (_max == 0) || (_storage.size() < _max); });
_storage.push_back(std::move(v));
_cond.notify_one();
return _storage.size(); // NOTE: very racy load indicator
}
template <typename Duration>
std::optional<T> dequeue(Duration d) {
std::unique_lock lk(_mx);
if (_cond.wait_for(lk, d, [this] { return !_storage.empty(); })) {
auto top = std::move(_storage.front());
_storage.pop_front();
_cond.notify_one();
return top;
}
return std::nullopt;
}
size_t size() const { // racy in multi-thread situations
std::unique_lock lk(_mx);
return _storage.size();
}
private:
size_t _max;
mutable std::mutex _mx;
mutable std::condition_variable _cond;
std::deque<T> _storage;
};
#include <chrono>
#include <thread>
#include <iostream>
#include <list>
#include <atomic>
using namespace std::chrono_literals;
static inline auto sleep_for = [](auto d) { std::this_thread::sleep_for(d); };
struct DataClass {
int id;
unsigned int state{ 0 };
DataClass(int id) : id(id) {}
//DataClass(DataClass&&) = default;
//DataClass& operator=(DataClass&&) = default;
//DataClass(DataClass const&) = delete;
void Generate() { sleep_for(3s); state = 1; }
void Clean() { sleep_for(1s); state = 2; }
};
struct Program {
Program() {
auto worker_id = 1;
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
_workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
}
size_t createWork(DataClass task) {
return genTasks.enqueue(std::move(task));
}
~Program() {
_shutdown = true;
for (auto& th: _workers)
if (th.joinable()) th.join();
std::cout << "Unfinished generate/clean tasks: " << genTasks.size() << "/" << cleanTasks.size() << "\n";
}
private:
Queue<DataClass> genTasks, cleanTasks;
std::atomic_bool _shutdown { false };
std::list<std::thread> _workers;
void generate_worker(int worker_id) {
while (!_shutdown) {
while (auto task = genTasks.dequeue(1s)) {
std::cout << "Worker #" << worker_id << " Generate: " << task->id << std::endl;
task->Generate();
cleanTasks.enqueue(std::move(*task));
}
}
std::cout << "Worker #" << worker_id << " Exit generate_worker" << std::endl;
}
void clean_worker(int worker_id) {
while (!_shutdown) {
while (auto task = cleanTasks.dequeue(1s)) {
std::cout << "Worker #" << worker_id << " Clean: " << task->id << std::endl;
task->Clean();
std::cout << "Worker #" << worker_id << " Done: " << task->id << std::endl;
}
}
std::cout << "Worker #" << worker_id << " Exit clean_worker" << std::endl;
}
};
int main() {
Program p;
for (auto i : {1,2,3,4,5,6,7,8,9,10}) {
sleep_for((rand()%100) * 1ms);
p.createWork(i);
}
sleep_for(2.5s);
std::cout << "Load at createWork(42) is ~" << p.createWork(42) << std::endl;
sleep_for(2.5s);
std::cout << "Load at createWork(43) is ~" << p.createWork(43) << std::endl;
sleep_for(4s);
std::cout << "Initiating shutdown\n";
// Program destructor performs shutdown
}