Самый быстрый способ хранить и извлекать большой поток небольших неструктурированных сообщений - PullRequest
4 голосов
/ 05 мая 2020

Я разрабатываю приложение IOT, которое требует от меня обработки множества небольших неструктурированных сообщений (это означает, что их поля могут изменяться со временем - одни могут появляться, а другие исчезать). Эти сообщения обычно имеют от 2 до 15 полей, значения которых относятся к базовым c типам данных (целые / длинные, строки, логические). Эти сообщения очень хорошо вписываются в формат данных JSON (или msgpack).

Очень важно, чтобы сообщения обрабатывались в порядке их поступления (поймите: они должны обрабатываться одним потоком - там невозможно распараллелить эту часть). У меня есть собственный logi c для обработки этих сообщений в реальном времени (пропускная способность относительно невелика, максимум несколько сотен тысяч сообщений в секунду), но возрастает потребность в том, чтобы движок мог имитировать / воспроизводить предыдущие периоды. путем воспроизведения истории сообщений. Хотя изначально он не был написан для этой цели, мой механизм обработки событий (написанный на Go) вполне мог обрабатывать десятки (может быть, несколько сотен) миллионов сообщений в секунду, если бы я мог кормить его историческими данными. на достаточной скорости.

В этом как раз и проблема. Я хранил много (сотни миллиардов) этих сообщений в течение длительного периода времени (несколько лет), пока что в формате msgpack с разделителями (https://github.com/msgpack/msgpack-python#streaming -unpacking ). В этом и других параметрах (см. Ниже) я смог измерить пиковую скорость синтаксического анализа ~ 2 млн сообщений в секунду (на Macbook Pro 2019 года только синтаксический анализ), что далеко от насыщения дискового ввода-вывода.

Даже не говоря о вводе-выводе, выполнение следующих действий:

import json
message = {
    'meta1': "measurement",
    'location': "NYC",
    'time': "20200101",
    'value1': 1.0,
    'value2': 2.0,
    'value3': 3.0,
    'value4': 4.0
}
json_message = json.dumps(message)

%%timeit
json.loads(json_message)

дает мне время синтаксического анализа 3 микросекунды / сообщение, что немного выше 300 тыс. Сообщений / секунду. Сравнивая с u json, quick json и / или json вместо модуля стандартной библиотеки json, я смог получить пиковые скорости 1 микросекунды / сообщение (с u json), то есть около 1M сообщений в секунду.

Msgpack немного лучше:

import msgpack
message = {
    'meta1': "measurement",
    'location': "NYC",
    'time': "20200101",
    'value1': 1.0,
    'value2': 2.0,
    'value3': 3.0,
    'value4': 4.0
}
msgpack_message = msgpack.packb(message)

%%timeit
msgpack.unpackb(msgpack_message)

Дает мне время обработки ~ 750 нс / сообщение (около 100 нс / поле), то есть около 1,3 млн сообщений в секунду. Сначала я думал, что C ++ может быть намного быстрее. Вот пример использования nlohmann / json, хотя это напрямую не сопоставимо с msgpack:

#include <iostream>
#include "json.hpp"

using json = nlohmann::json;

const std::string message = "{\"value\": \"hello\"}";

int main() {
  auto jsonMessage = json::parse(message);
  for(size_t i=0; i<1000000; ++i) {
    jsonMessage = json::parse(message);
  }
  std::cout << jsonMessage["value"] << std::endl; // To avoid having the compiler optimize the loop away. 
};

Компиляция с помощью clang 11.0.3 (std = c ++ 17, -O3 ), это выполняется за ~ 1,4 с на том же Macbook, то есть скорость анализа ~ 700 тыс. сообщений в секунду с сообщениями еще меньшего размера, чем в примере Python. Я знаю, что nlohmann / json может быть довольно медленным и смог получить скорость синтаксического анализа около 2M сообщений в секунду, используя DOM API simd json.

Это все еще слишком медленно для моего случая использования. Я открыт для всех предложений по повышению скорости анализа сообщений с потенциальными приложениями на Python, C ++, Java (или на любом другом языке JVM) или Go.

Примечания:

  • Меня не обязательно заботит размер сообщений на диске (считайте это плюсом, если метод хранения, который вы предлагаете, эффективен с точки зрения памяти).
  • Все, что мне нужно, это модель ключ-значение для базовых c типы данных - мне не нужны вложенные словари или списки.
  • Преобразование существующих данных вообще не проблема. Я просто ищу что-нибудь оптимизированное для чтения.
  • Мне не обязательно разбирать всю вещь в структуру или настраиваемый объект, только для доступа к некоторым полям, когда они мне нужны (обычно мне нужна небольшая часть полей каждого сообщения) - это нормально, если это сопровождается штрафом, если штраф не снижает пропускную способность всего приложения.
  • Я открыт для нестандартных / слегка небезопасных решений.
  • Любой формат, который я выберу использование должно быть естественно разделено, в том смысле, что сообщения будут последовательно записываться в файл (в настоящее время я использую один файл в день, что достаточно для моего варианта использования). В прошлом у меня были проблемы с сообщениями с неправильными разделителями (см. WriteDelimitedTo в Java Protobuf API - потеря одного байта, и весь файл испорчен).

Вещи, которые я уже изучал:

  • JSON: экспериментировал с Rapid json, simd json, nlohmann / json, et c ...)
  • Плоские файлы с разделителями msgpack (см. этот API: https://github.com/msgpack/msgpack-python#streaming -распаковка ): то, что я в настоящее время использую для хранения сообщений.
  • Буферы протокола: немного быстрее, но не совсем подходят для неструктурированной природы данные.

Спасибо !!

1 Ответ

2 голосов
/ 09 мая 2020

Я предполагаю, что сообщения содержат только несколько именованных атрибутов базовых c типов (определенных во время выполнения) и что эти базовые c типы представляют собой, например, строки, целые числа и числа с плавающей запятой.

Для Чтобы реализация была быстрой, лучше:

  • избегать синтаксического анализа текста (медленного, потому что он последовательный и заполнен условными выражениями);
  • избегать проверки неправильного формата сообщений (не требуется здесь, так как все они должны быть правильно сформированы);
  • максимально избегать выделения памяти;
  • работать с блоками сообщений.

Таким образом, нам сначала нужно разработать простой и быстрый протокол двоичных сообщений :

Двоичное сообщение содержит количество своих атрибутов (закодированных в 1 байт), за которым следует список атрибутов. Каждый атрибут содержит строку с префиксом его размера (закодированного в 1 байт), за которым следует тип атрибута (индекс типа в std :: variant, закодированный 1 байтом), а также значение атрибута (размер- строка с префиксом, 64-битное целое или 64-битное число с плавающей запятой).

Каждое закодированное сообщение представляет собой поток байтов, который может уместиться в большом буфере (выделяется один раз и повторно используется для нескольких входящих сообщений) .

Вот код для декодирования сообщения из необработанного двоичного буфера:

#include <unordered_map>
#include <variant>
#include <climits>

// Define the possible types here
using AttrType = std::variant<std::string_view, int64_t, double>;

// Decode the `msgData` buffer and write the decoded message into `result`.
// Assume the message is not ill-formed!
// msgData must not be freed or modified while the resulting map is being used.
void decode(const char* msgData, std::unordered_map<std::string_view, AttrType>& result)
{
    static_assert(CHAR_BIT == 8);

    const size_t attrCount = msgData[0];
    size_t cur = 1;

    result.clear();

    for(size_t i=0 ; i<attrCount ; ++i)
    {
        const size_t keyLen = msgData[cur];
        std::string_view key(msgData+cur+1, keyLen);
        cur += 1 + keyLen;
        const size_t attrType = msgData[cur];
        cur++;

        // A switch could be better if there is more types
        if(attrType == 0) // std::string_view
        {
            const size_t valueLen = msgData[cur];
            std::string_view value(msgData+cur+1, valueLen);
            cur += 1 + valueLen;

            result[key] = std::move(AttrType(value));
        }
        else if(attrType == 1) // Native-endian 64-bit integer
        {
            int64_t value;

            // Required to not break the strict aliasing rule
            std::memcpy(&value, msgData+cur, sizeof(int64_t));
            cur += sizeof(int64_t);

            result[key] = std::move(AttrType(value));
        }
        else // IEEE-754 double
        {
            double value;

            // Required to not break the strict aliasing rule
            std::memcpy(&value, msgData+cur, sizeof(double));
            cur += sizeof(double);

            result[key] = std::move(AttrType(value));
        }
    }
}

Вероятно, вам также потребуется написать функцию кодирования (на основе той же идеи).

Вот пример использования (на основе вашего кода, связанного с json):

const char* message = "\x01\x05value\x00\x05hello";

void bench()
{
    std::unordered_map<std::string_view, AttrType> decodedMsg;
    decodedMsg.reserve(16);

    decode(message, decodedMsg);

    for(size_t i=0; i<1000*1000; ++i)
    {
        decode(message, decodedMsg);
    }

    visit([](const auto& v) { cout << "Result: " << v << endl; }, decodedMsg["value"]);
}

На моей машине (с процессором Intel i7-9700KF) и на основе вашего теста я получаю 2,7 млн ​​сообщений / с с кодом с использованием библиотеки nlohmann json и 35,4 млн сообщений / с с новым кодом.

Обратите внимание, что этот код может быть намного быстрее . Действительно, большая часть времени уходит на эффективное хеширование и распределение. Вы можете смягчить проблему, используя более быструю реализацию ha sh -map (например, boost :: container :: flat_map или ska :: bytell_hash_map) и / или используя специальный распределитель. Альтернативой является создание собственной тщательно настроенной реализации ha sh -map. Другой альтернативой является использование вектора пар ключ-значение и линейный поиск для выполнения поиска (это должно быть быстро, потому что ваши сообщения не должны иметь много атрибутов и потому, что вы сказали, что вам нужна небольшая часть атрибутов для каждого сообщения. ). Однако чем больше сообщения, тем медленнее декодирование. Таким образом, вам может потребоваться использовать параллелизм для более быстрого декодирования блоков сообщений. При всем этом можно достичь более 100 млн сообщений в секунду.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...