Один производитель, несколько параллельных потребителей, реализованных с помощью волокон оптоволокна - PullRequest
0 голосов
/ 03 мая 2020

Я пытаюсь написать один производитель, несколько потребительских конвейеров, где потребители работают в параллельных потоках. Или найти или поделиться простым примером такого рода. С относительно простым кодом в Go вывод ясно показывает, что потребители работают параллельно. Я думал, что это может быть похоже с волокнами Boost 1.73, но я не могу выйти за пределы этого кода, который (неудивительно) работает последовательно:

#include <boost/fiber/buffered_channel.hpp>
#include <boost/fiber/fiber.hpp>

static void process(int item) {
    std::cout << "consumer processing " << item << std::endl;
    auto wasteOfTime = 0.;
    for (auto s = 0.; s < item; s += 1e-7) {
        wasteOfTime += sin(s);
    }
    if (wasteOfTime != 42) {
        std::cout << "consumer processed " << item << std::endl;
    }
}

static const std::uint32_t workers = 3;

int main() {
    boost::fibers::buffered_channel<int> channel { 2 };

    boost::fibers::fiber consumer[workers];
    for (int i = 0; i < workers; ++i) {
        consumer[i] = boost::fibers::fiber([&channel]() {
            for (auto item : channel) {
                process(item);
            }
        });
    }

    auto producer = boost::fibers::fiber([&channel]() {
        std::cout << "producer starting" << std::endl;
        channel.push(1);
        channel.push(2);
        channel.push(3);
        channel.close();
        std::cout << "producer ending" << std::endl;
    });

    producer.join();
    for (int i = 0; i < workers; ++i) {
        consumer[i].join();
    }
    return 0;
}

Я попытался вставить много вариантов фрагментов кода, чтобы получить рабочие потоки до планируют волокна, но они всегда выполняются последовательно или не выполняются вообще. Код из вопрос об обратной задаче кажется шагом в правильном направлении, хотя и намного сложнее, чем Go, но (при компиляции с -DM_1_PI = 3.14) эта программа также просто бездействует для меня .

1 Ответ

0 голосов
/ 04 мая 2020

Оказывается, я неверно истолковал фрагмент кода boost для планирования волокон. Мне кажется, это работает так:

#include <boost/fiber/barrier.hpp>
#include <boost/fiber/buffered_channel.hpp>
#include <boost/fiber/fiber.hpp>
#include <boost/fiber/operations.hpp>
#include <boost/fiber/algo/work_stealing.hpp>
#include <optional>

static void process(int item) {
    std::cout << "processing " << item << std::endl;
    auto wasteOfTime = 0.;
    for (auto s = 0.; s < 1; s += 3e-9) {
        wasteOfTime += sin(s);
    }
    if (wasteOfTime != 42) {
        std::cout << "processed " << item << std::endl;
    }
}

static const std::uint32_t N_CONSUMING_FIBERS = 3;

void task() {
    boost::fibers::buffered_channel<int> channel{ 2 };

    boost::fibers::fiber fibers[1 + N_CONSUMING_FIBERS];
    fibers[0] = boost::fibers::fiber([&channel]() {
        std::cout << "producer starting" << std::endl;
        channel.push(1);
        channel.push(2);
        channel.push(3);
        channel.push(4);
        channel.push(5);
        channel.close();
        std::cout << "producer ending" << std::endl;
        });

    auto start = boost::fibers::barrier(N_CONSUMING_FIBERS);
    for (int i = 1; i <= N_CONSUMING_FIBERS; ++i) {
        fibers[i] = boost::fibers::fiber([&start, &channel]() {
            start.wait();
            for (auto item : channel) {
                process(item);
            }
            });
    }

    for (int i = 0; i <= N_CONSUMING_FIBERS; ++i) {
        fibers[i].join();
    }
}

class FiberExecutor {
    static std::vector<std::thread > extra_threads;
    static std::optional<boost::fibers::barrier> barrier;

public:
    static void init() {
        auto const N_THREADS = std::thread::hardware_concurrency();
        barrier.emplace(N_THREADS);
        extra_threads.reserve(N_THREADS - 1);
        for (auto i = decltype(N_THREADS){0}; i < N_THREADS - 1; ++i) {
            extra_threads.emplace_back([N_THREADS]() {
                boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(N_THREADS);
                barrier->wait(); // start only after everyone has scheduled
                barrier->wait(); // end after main thread says to
                });
        }
        boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(N_THREADS);
        barrier->wait(); // start only after everyone has scheduled
    }

    static void exit() {
        barrier->wait();
        for (auto& thread : extra_threads) {
            thread.join();
        }
    }
};

std::vector<std::thread> FiberExecutor::extra_threads;
std::optional<boost::fibers::barrier> FiberExecutor::barrier;

int Main() {
    FiberExecutor::init();

    std::cout << "Going once\n";
    task();
    std::cout << "Going twice\n";
    task();

    FiberExecutor::exit();
    return 0;
}

Остерегайтесь, это может быть неверно. Он также не планирует надежное планирование для всех потоков (по крайней мере, для Windows).

На самом деле это почти то же самое, что и код в вопрос об обратной задаче , но что нужно обновление. Кроме того, я считаю, что использование условной переменной вместо барьеров не является надежным, потому что основной поток может работать впереди других потоков.

Я не моделировал инициализацию и очистку как класс, чтобы избежать иллюзия, которую вы могли бы создать несколько раз (последовательно или параллельно). Конструктор work_stealing использует глобальные переменные . Вы не можете завершить задачу, основанную на оптоволокне, а затем запустить другую задачу, основанную на оптоволокне, даже если вы все кодируете во временных потоках и не портите основной поток.

...