Boost asio thread_pool join не ждет завершения задач - PullRequest
3 голосов
/ 20 апреля 2020

Рассмотрим функции

#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>

void foo(const uint64_t begin, uint64_t *result)
{
    uint64_t prev[] = {begin, 0};
    for (uint64_t i = 0; i < 1000000000; ++i)
    {
        const auto tmp = (prev[0] + prev[1]) % 1000;
        prev[1] = prev[0];
        prev[0] = tmp;
    }
    *result = prev[0];
}

void batch(boost::asio::thread_pool &pool, const uint64_t a[])
{
    uint64_t r[] = {0, 0};
    boost::asio::post(pool, boost::bind(foo, a[0], &r[0]));
    boost::asio::post(pool, boost::bind(foo, a[1], &r[1]));

    pool.join();
    std::cerr << "foo(" << a[0] << "): " << r[0] << " foo(" << a[1] << "): " << r[1] << std::endl;
}

, где foo - простая «чистая» функция, которая выполняет вычисление для begin и записывает результат в указатель *result. Эта функция вызывается с разных входов от batch. Здесь может быть полезна отправка каждого вызова другому ядру ЦП.

Теперь предположим, что пакетная функция вызывается несколько 10 000 раз. Поэтому пул потоков был бы хорош, который распределяется между всеми последовательными пакетными вызовами.

Попытка сделать это (для простоты только 3 вызова)

int main(int argn, char **)
{
    boost::asio::thread_pool pool(2);

    const uint64_t a[] = {2, 4};
    batch(pool, a);

    const uint64_t b[] = {3, 5};
    batch(pool, b);

    const uint64_t c[] = {7, 9};
    batch(pool, c);
}

приводит к результату

foo (2): 2 foo (4): 4
foo (3): 0 foo (5): 0
foo (7): 0 foo (9): 0

Где все три строки появляются одновременно, тогда как вычисление foo занимает ~ 3 с. Я предполагаю, что только первый join действительно ждет, пока пул завершит все задания. Другие имеют недействительные результаты. (Не инициализированные значения) Каков наилучший способ повторного использования пула потоков?

1 Ответ

1 голос
/ 23 апреля 2020

Лучшая практика - не использовать пул повторно (какой будет пул, если вы продолжаете создавать новые пулы?).

Если вы хотите быть уверены, что вы "синхронизируете" партии вместе, Я бы предложил использовать when_all на фьючерсах:

Live On Coliru

#define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

uint64_t foo(uint64_t begin) {
    uint64_t prev[] = {begin, 0};
    for (uint64_t i = 0; i < 1000000000; ++i) {
        const auto tmp = (prev[0] + prev[1]) % 1000;
        prev[1] = prev[0];
        prev[0] = tmp;
    }
    return prev[0];
}

void batch(boost::asio::thread_pool &pool, const uint64_t a[2])
{
    using T = boost::packaged_task<uint64_t>;

    T tasks[] {
        T(boost::bind(foo, a[0])),
        T(boost::bind(foo, a[1])),
    };

    auto all = boost::when_all(
        tasks[0].get_future(),
        tasks[1].get_future());

    for (auto& t : tasks)
        post(pool, std::move(t));

    auto [r0, r1] = all.get();
    std::cerr << "foo(" << a[0] << "): " << r0.get() << " foo(" << a[1] << "): " << r1.get() << std::endl;
}

int main() {
    boost::asio::thread_pool pool(2);

    const uint64_t a[] = {2, 4};
    batch(pool, a);

    const uint64_t b[] = {3, 5};
    batch(pool, b);

    const uint64_t c[] = {7, 9};
    batch(pool, c);
}

Печать

foo(2): 2 foo(4): 4
foo(3): 503 foo(5): 505
foo(7): 507 foo(9): 509

Я бы рассмотрел

  • обобщение
  • очереди сообщений

Обобщение

Сделайте его несколько более гибким, не жестко кодируя размеры пакетов. В конце концов, размер пула уже фиксирован, нам не нужно «убедиться, что пакеты подходят» или что-то в этом роде:

Live On Coliru

#define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/future.hpp>

struct Result { uint64_t begin, result; };

Result foo(uint64_t begin) {
    uint64_t prev[] = {begin, 0};
    for (uint64_t i = 0; i < 1000000000; ++i) {
        const auto tmp = (prev[0] + prev[1]) % 1000;
        prev[1] = prev[0];
        prev[0] = tmp;
    }
    return { begin, prev[0] };
}

void batch(boost::asio::thread_pool &pool, std::vector<uint64_t> const a)
{
    using T = boost::packaged_task<Result>;
    std::vector<T> tasks;
    tasks.reserve(a.size());

    for(auto begin : a)
        tasks.emplace_back(boost::bind(foo, begin));

    std::vector<boost::unique_future<T::result_type> > futures;
    for (auto& t : tasks) {
        futures.push_back(t.get_future());
        post(pool, std::move(t));
    }

    for (auto& fut : boost::when_all(futures.begin(), futures.end()).get()) {
        auto r = fut.get();
        std::cerr << "foo(" << r.begin << "): " << r.result << " ";
    }
    std::cout << std::endl;
}

int main() {
    boost::asio::thread_pool pool(2);

    batch(pool, {2});
    batch(pool, {4, 3, 5});
    batch(pool, {7, 9});
}

Отпечатки

foo(2): 2 
foo(4): 4 foo(3): 503 foo(5): 505 
foo(7): 507 foo(9): 509 

Обобщенный2: Вариативное упрощение

Вопреки распространенному мнению (и, честно говоря, то, что обычно происходит), на этот раз мы можем использовать вариады, чтобы избавиться от всех промежуточных векторы (каждый из них):

Live On Coliru

void batch(boost::asio::thread_pool &pool, T... a)
{
    auto launch = [&pool](uint64_t begin) {
        boost::packaged_task<Result> pt(boost::bind(foo, begin));
        auto fut = pt.get_future();
        post(pool, std::move(pt));
        return fut;
    };

    for (auto& r : {launch(a).get()...}) {
        std::cerr << "foo(" << r.begin << "): " << r.result << " ";
    }

    std::cout << std::endl;
}

Если вы настаиваете на выводе результатов во времени, вы все еще можно добавить when_all в микс (требуется немного больше героизма для распаковки кортежа):

Live On Coliru

template <typename...T>
void batch(boost::asio::thread_pool &pool, T... a)
{
    auto launch = [&pool](uint64_t begin) {
        boost::packaged_task<Result> pt(boost::bind(foo, begin));
        auto fut = pt.get_future();
        post(pool, std::move(pt));
        return fut;
    };

    std::apply([](auto&&... rfut) {
        Result results[] {rfut.get()...};
        for (auto& r : results) {
            std::cerr << "foo(" << r.begin << "): " << r.result << " ";
        }
    }, boost::when_all(launch(a)...).get());

    std::cout << std::endl;
}

Оба по-прежнему выводят один и тот же результат

Очередь сообщений

Это очень естественно для повышения, и вроде как пропускает большую сложность. Если вы также хотите отчитываться по группам, вам нужно будет указать:

Live On Coliru

#include <iostream>
#include <boost/asio.hpp>
#include <memory>

struct Result { uint64_t begin, result; };

Result foo(uint64_t begin) {
    uint64_t prev[] = {begin, 0};
    for (uint64_t i = 0; i < 1000000000; ++i) {
        const auto tmp = (prev[0] + prev[1]) % 1000;
        prev[1] = prev[0];
        prev[0] = tmp;
    }
    return { begin, prev[0] };
}

using Group = std::shared_ptr<size_t>;
void batch(boost::asio::thread_pool &pool, std::vector<uint64_t> begins) {
    auto group = std::make_shared<std::vector<Result> >(begins.size());

    for (size_t i=0; i < begins.size(); ++i) {
        post(pool, [i,begin=begins.at(i),group] {
              (*group)[i] = foo(begin);
              if (group.unique()) {
                  for (auto& r : *group) {
                      std::cout << "foo(" << r.begin << "): " << r.result << " ";
                      std::cout << std::endl;
                  }
              }
          });
    }
}

int main() {
    boost::asio::thread_pool pool(2);

    batch(pool, {2});
    batch(pool, {4, 3, 5});
    batch(pool, {7, 9});
    pool.join();
}

Обратите внимание, что у этого есть одновременный доступ к group, который безопасен из-за ограничений на доступ к элементам.

Печать:

foo(2): 2 
foo(4): 4 foo(3): 503 foo(5): 505 
foo(7): 507 foo(9): 509 
...