Оказывается, я неверно истолковал фрагмент кода boost для планирования волокон. Мне кажется, это работает так:
#include <boost/fiber/barrier.hpp>
#include <boost/fiber/buffered_channel.hpp>
#include <boost/fiber/fiber.hpp>
#include <boost/fiber/operations.hpp>
#include <boost/fiber/algo/work_stealing.hpp>
#include <optional>
static void process(int item) {
std::cout << "processing " << item << std::endl;
auto wasteOfTime = 0.;
for (auto s = 0.; s < 1; s += 3e-9) {
wasteOfTime += sin(s);
}
if (wasteOfTime != 42) {
std::cout << "processed " << item << std::endl;
}
}
static const std::uint32_t N_CONSUMING_FIBERS = 3;
void task() {
boost::fibers::buffered_channel<int> channel{ 2 };
boost::fibers::fiber fibers[1 + N_CONSUMING_FIBERS];
fibers[0] = boost::fibers::fiber([&channel]() {
std::cout << "producer starting" << std::endl;
channel.push(1);
channel.push(2);
channel.push(3);
channel.push(4);
channel.push(5);
channel.close();
std::cout << "producer ending" << std::endl;
});
auto start = boost::fibers::barrier(N_CONSUMING_FIBERS);
for (int i = 1; i <= N_CONSUMING_FIBERS; ++i) {
fibers[i] = boost::fibers::fiber([&start, &channel]() {
start.wait();
for (auto item : channel) {
process(item);
}
});
}
for (int i = 0; i <= N_CONSUMING_FIBERS; ++i) {
fibers[i].join();
}
}
class FiberExecutor {
static std::vector<std::thread > extra_threads;
static std::optional<boost::fibers::barrier> barrier;
public:
static void init() {
auto const N_THREADS = std::thread::hardware_concurrency();
barrier.emplace(N_THREADS);
extra_threads.reserve(N_THREADS - 1);
for (auto i = decltype(N_THREADS){0}; i < N_THREADS - 1; ++i) {
extra_threads.emplace_back([N_THREADS]() {
boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(N_THREADS);
barrier->wait(); // start only after everyone has scheduled
barrier->wait(); // end after main thread says to
});
}
boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(N_THREADS);
barrier->wait(); // start only after everyone has scheduled
}
static void exit() {
barrier->wait();
for (auto& thread : extra_threads) {
thread.join();
}
}
};
std::vector<std::thread> FiberExecutor::extra_threads;
std::optional<boost::fibers::barrier> FiberExecutor::barrier;
int Main() {
FiberExecutor::init();
std::cout << "Going once\n";
task();
std::cout << "Going twice\n";
task();
FiberExecutor::exit();
return 0;
}
Остерегайтесь, это может быть неверно. Он также не планирует надежное планирование для всех потоков (по крайней мере, для Windows).
На самом деле это почти то же самое, что и код в вопрос об обратной задаче , но что нужно обновление. Кроме того, я считаю, что использование условной переменной вместо барьеров не является надежным, потому что основной поток может работать впереди других потоков.
Я не моделировал инициализацию и очистку как класс, чтобы избежать иллюзия, которую вы могли бы создать несколько раз (последовательно или параллельно). Конструктор work_stealing
использует глобальные переменные . Вы не можете завершить задачу, основанную на оптоволокне, а затем запустить другую задачу, основанную на оптоволокне, даже если вы все кодируете во временных потоках и не портите основной поток.