Я работаю над базой данных, а не над RocksDB. У меня есть функция find
, которая принимает запрос в параметре, перебирает все документы в базе данных и возвращает документы, соответствующие запросу. Я хочу распараллелить эту функцию, чтобы работа распределялась по нескольким потокам.
Чтобы достичь этого, я попытался использовать ThreadPool : я переместил код цикла в лямбду и добавил задачу в пул потоков для каждого документа. После цикла каждый результат обрабатывается основным потоком.
Текущая версия (одиночная тема):
void
EmbeDB::find(const bson_t& query,
DocumentPtrCallback callback,
int32_t limit,
const bson_t* projection)
{
int32_t count = 0;
bson_error_t error;
uint32_t num_query_keys = bson_count_keys(&query);
mongoc_matcher_t* matcher = num_query_keys != 0
? mongoc_matcher_new(&query, &error)
: nullptr;
if (num_query_keys != 0 && matcher == nullptr)
{
callback(&error, nullptr);
return;
}
bson_t document;
rocksdb::Iterator* it = _db->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next())
{
const char* bson_data = (const char*)it->value().data();
int bson_length = it->value().size();
std::vector<char> decrypted_data;
if (encryptionEnabled())
{
decrypted_data.resize(bson_length);
bson_length = decrypt_data(bson_data, bson_length, decrypted_data.data(), _encryption_method, _encryption_key, _encryption_iv);
bson_data = decrypted_data.data();
}
bson_init_static(&document, (const uint8_t*)bson_data, bson_length);
if (num_query_keys == 0 || mongoc_matcher_match(matcher, &document))
{
++count;
if (projection != nullptr)
{
bson_error_t error;
bson_t projected;
bson_init(&projected);
mongoc_matcher_projection_execute_noop(
&document,
projection,
&projected,
&error,
NULL
);
callback(nullptr, &projected);
}
else
{
callback(nullptr, &document);
}
if (limit >= 0 && count >= limit)
{
break;
}
}
}
delete it;
if (matcher)
{
mongoc_matcher_destroy(matcher);
}
}
Новая версия (многопоточность):
void
EmbeDB::find(const bson_t& query,
DocumentPtrCallback callback,
int32_t limit,
const bson_t* projection)
{
int32_t count = 0;
bool limit_reached = limit == 0;
bson_error_t error;
uint32_t num_query_keys = bson_count_keys(&query);
mongoc_matcher_t* matcher = num_query_keys != 0
? mongoc_matcher_new(&query, &error)
: nullptr;
if (num_query_keys != 0 && matcher == nullptr)
{
callback(&error, nullptr);
return;
}
auto process_document = [this, projection, num_query_keys, matcher](const char* bson_data, int bson_length) -> bson_t*
{
std::vector<char> decrypted_data;
if (encryptionEnabled())
{
decrypted_data.resize(bson_length);
bson_length = decrypt_data(bson_data, bson_length, decrypted_data.data(), _encryption_method, _encryption_key, _encryption_iv);
bson_data = decrypted_data.data();
}
bson_t* document = new bson_t();
bson_init_static(document, (const uint8_t*)bson_data, bson_length);
if (num_query_keys == 0 || mongoc_matcher_match(matcher, document))
{
if (projection != nullptr)
{
bson_error_t error;
bson_t* projected = new bson_t();
bson_init(projected);
mongoc_matcher_projection_execute_noop(
document,
projection,
projected,
&error,
NULL
);
delete document;
return projected;
}
else
{
return document;
}
}
else
{
delete document;
return nullptr;
}
};
const int WORKER_COUNT = std::max(1u, std::thread::hardware_concurrency());
ThreadPool pool(WORKER_COUNT);
std::vector<std::future<bson_t*>> futures;
bson_t document;
rocksdb::Iterator* db_it = _db->NewIterator(rocksdb::ReadOptions());
for (db_it->SeekToFirst(); db_it->Valid(); db_it->Next())
{
const char* bson_data = (const char*)db_it->value().data();
int bson_length = db_it->value().size();
futures.push_back(pool.enqueue(process_document, bson_data, bson_length));
}
delete db_it;
for (auto it = futures.begin(); it != futures.end(); ++it)
{
bson_t* result = it->get();
if (result)
{
count += 1;
if (limit < 0 || count < limit)
{
callback(nullptr, result);
}
delete result;
}
}
if (matcher)
{
mongoc_matcher_destroy(matcher);
}
}
- С простыми документами и запросами однопотоковая версия обрабатывает 1 миллион документов за 0,5 секунды на моем компьютере.
- С теми же документами и запросами, многопоточная версия обрабатывает 1 миллион документов за 3,3 секунды .
Удивительно, но многопоточная версия намного медленнее. Более того, я измерил время выполнения и 75% времени провел в цикле for . Так что в основном линия futures.push_back(pool.enqueue(process_document, bson_data, bson_length));
занимает 75% времени.
Я сделал следующее:
- Я проверил значение
WORKER_COUNT
, на моей машине оно равно 6.
- Я пытался добавить
futures.reserve(1000000)
, думая, что, возможно, ошибка была связана с перераспределением вектора, но это ничего не изменило.
- Я попытался удалить динамическое распределение памяти (
bson_t* document = new bson_t();
), это не изменило результат значительно.
Итак, мой вопрос: есть ли что-то, что я сделал неправильно, чтобы многопоточная версия была медленнее однопоточной версии?
В настоящее время я понимаю, что операции синхронизации пула потоков (когда задачи ставятся в очередь и удаляются из очереди) просто занимают большую часть времени, и решением будет изменение структуры данных. Мысли?