Использование пула потоков для распараллеливания функции замедляет ее: почему? - PullRequest
2 голосов
/ 23 апреля 2019

Я работаю над базой данных, а не над RocksDB. У меня есть функция find, которая принимает запрос в параметре, перебирает все документы в базе данных и возвращает документы, соответствующие запросу. Я хочу распараллелить эту функцию, чтобы работа распределялась по нескольким потокам.

Чтобы достичь этого, я попытался использовать ThreadPool : я переместил код цикла в лямбду и добавил задачу в пул потоков для каждого документа. После цикла каждый результат обрабатывается основным потоком.

Текущая версия (одиночная тема):

void
EmbeDB::find(const bson_t& query,
             DocumentPtrCallback callback,
             int32_t limit,
             const bson_t* projection)
{
    int32_t count = 0;
    bson_error_t error;
    uint32_t num_query_keys = bson_count_keys(&query);
    mongoc_matcher_t* matcher = num_query_keys != 0
        ? mongoc_matcher_new(&query, &error)
        : nullptr;

    if (num_query_keys != 0 && matcher == nullptr)
    {
        callback(&error, nullptr);
        return;
    }

    bson_t document;
    rocksdb::Iterator* it = _db->NewIterator(rocksdb::ReadOptions());
    for (it->SeekToFirst(); it->Valid(); it->Next())
    {
        const char* bson_data = (const char*)it->value().data();
        int bson_length = it->value().size();
        std::vector<char> decrypted_data;
        if (encryptionEnabled())
        {
            decrypted_data.resize(bson_length);
            bson_length = decrypt_data(bson_data, bson_length, decrypted_data.data(), _encryption_method, _encryption_key, _encryption_iv);
            bson_data = decrypted_data.data();
        }
        bson_init_static(&document, (const uint8_t*)bson_data, bson_length);

        if (num_query_keys == 0 || mongoc_matcher_match(matcher, &document))
        {
            ++count;

            if (projection != nullptr)
            {
                bson_error_t error;
                bson_t projected;
                bson_init(&projected);

                mongoc_matcher_projection_execute_noop(
                    &document,
                    projection,
                    &projected,
                    &error,
                    NULL
                );

                callback(nullptr, &projected);
            }
            else
            {
                callback(nullptr, &document);
            }

            if (limit >= 0 && count >= limit)
            {
                break;
            }
        }
    }
    delete it;

    if (matcher)
    {
        mongoc_matcher_destroy(matcher);
    }
}

Новая версия (многопоточность):

void
EmbeDB::find(const bson_t& query,
             DocumentPtrCallback callback,
             int32_t limit,
             const bson_t* projection)
{
    int32_t count = 0;
    bool limit_reached = limit == 0;
    bson_error_t error;
    uint32_t num_query_keys = bson_count_keys(&query);
    mongoc_matcher_t* matcher = num_query_keys != 0
        ? mongoc_matcher_new(&query, &error)
        : nullptr;

    if (num_query_keys != 0 && matcher == nullptr)
    {
        callback(&error, nullptr);
        return;
    }

    auto process_document = [this, projection, num_query_keys, matcher](const char* bson_data, int bson_length) -> bson_t*
    {
        std::vector<char> decrypted_data;
        if (encryptionEnabled())
        {
            decrypted_data.resize(bson_length);
            bson_length = decrypt_data(bson_data, bson_length, decrypted_data.data(), _encryption_method, _encryption_key, _encryption_iv);
            bson_data = decrypted_data.data();
        }

        bson_t* document = new bson_t();

        bson_init_static(document, (const uint8_t*)bson_data, bson_length);

        if (num_query_keys == 0 || mongoc_matcher_match(matcher, document))
        {
            if (projection != nullptr)
            {
                bson_error_t error;
                bson_t* projected = new bson_t();
                bson_init(projected);

                mongoc_matcher_projection_execute_noop(
                    document,
                    projection,
                    projected,
                    &error,
                    NULL
                );

                delete document;

                return projected;
            }
            else
            {
                return document;
            }
        }
        else
        {
            delete document;

            return nullptr;
        }

    };

    const int WORKER_COUNT = std::max(1u, std::thread::hardware_concurrency());

    ThreadPool pool(WORKER_COUNT);
    std::vector<std::future<bson_t*>> futures;

    bson_t document;
    rocksdb::Iterator* db_it = _db->NewIterator(rocksdb::ReadOptions());
    for (db_it->SeekToFirst(); db_it->Valid(); db_it->Next())
    {
        const char* bson_data = (const char*)db_it->value().data();
        int bson_length = db_it->value().size();

        futures.push_back(pool.enqueue(process_document, bson_data, bson_length));
    }
    delete db_it;

    for (auto it = futures.begin(); it != futures.end(); ++it)
    {
        bson_t* result = it->get();

        if (result)
        {
            count += 1;

            if (limit < 0 || count < limit)
            {
                callback(nullptr, result);
            }

            delete result;
        }
    }

    if (matcher)
    {
        mongoc_matcher_destroy(matcher);
    }
}

  • С простыми документами и запросами однопотоковая версия обрабатывает 1 миллион документов за 0,5 секунды на моем компьютере.
  • С теми же документами и запросами, многопоточная версия обрабатывает 1 миллион документов за 3,3 секунды .

Удивительно, но многопоточная версия намного медленнее. Более того, я измерил время выполнения и 75% времени провел в цикле for . Так что в основном линия futures.push_back(pool.enqueue(process_document, bson_data, bson_length)); занимает 75% времени.

Я сделал следующее:

  • Я проверил значение WORKER_COUNT, на моей машине оно равно 6.
  • Я пытался добавить futures.reserve(1000000), думая, что, возможно, ошибка была связана с перераспределением вектора, но это ничего не изменило.
  • Я попытался удалить динамическое распределение памяти (bson_t* document = new bson_t();), это не изменило результат значительно.

Итак, мой вопрос: есть ли что-то, что я сделал неправильно, чтобы многопоточная версия была медленнее однопоточной версии?

В настоящее время я понимаю, что операции синхронизации пула потоков (когда задачи ставятся в очередь и удаляются из очереди) просто занимают большую часть времени, и решением будет изменение структуры данных. Мысли?

1 Ответ

2 голосов
/ 23 апреля 2019

Распараллеливание имеет накладные расходы.

Для обработки каждого документа в однопоточной версии требуется около 500 наносекунд.Необходимо выполнить большую бухгалтерию, чтобы делегировать работу пулу потоков (как для делегирования работы, так и для ее синхронизации впоследствии), и для всей этой бухгалтерии вполне может потребоваться более 500 наносекунд на задание.

Если ваш код верен, то бухгалтерия занимает около 2800 наносекунд на задание.Чтобы значительно ускорить распараллеливание, вам нужно разбить работу на большие куски.

Я рекомендую пытаться обрабатывать документы партиями по 1000 штук за раз.Каждое будущее, вместо того, чтобы соответствовать только одному документу, будет соответствовать 1000 документов.

Другие оптимизации

Если возможно, избегайте ненужного копирования.Если что-то скопировано, посмотрите, можете ли вы захватить это по ссылке, а не по значению.

...