Я разрабатываю приложение IOT, которое требует от меня обработки множества небольших неструктурированных сообщений (это означает, что их поля могут изменяться со временем - одни могут появляться, а другие исчезать). Эти сообщения обычно имеют от 2 до 15 полей, значения которых относятся к базовым c типам данных (целые / длинные, строки, логические). Эти сообщения очень хорошо вписываются в формат данных JSON (или msgpack).
Очень важно, чтобы сообщения обрабатывались в порядке их поступления (поймите: они должны обрабатываться одним потоком - там невозможно распараллелить эту часть). У меня есть собственный logi c для обработки этих сообщений в реальном времени (пропускная способность относительно невелика, максимум несколько сотен тысяч сообщений в секунду), но возрастает потребность в том, чтобы движок мог имитировать / воспроизводить предыдущие периоды. путем воспроизведения истории сообщений. Хотя изначально он не был написан для этой цели, мой механизм обработки событий (написанный на Go) вполне мог обрабатывать десятки (может быть, несколько сотен) миллионов сообщений в секунду, если бы я мог кормить его историческими данными. на достаточной скорости.
В этом как раз и проблема. Я хранил много (сотни миллиардов) этих сообщений в течение длительного периода времени (несколько лет), пока что в формате msgpack с разделителями (https://github.com/msgpack/msgpack-python#streaming -unpacking ). В этом и других параметрах (см. Ниже) я смог измерить пиковую скорость синтаксического анализа ~ 2 млн сообщений в секунду (на Macbook Pro 2019 года только синтаксический анализ), что далеко от насыщения дискового ввода-вывода.
Даже не говоря о вводе-выводе, выполнение следующих действий:
import json
message = {
'meta1': "measurement",
'location': "NYC",
'time': "20200101",
'value1': 1.0,
'value2': 2.0,
'value3': 3.0,
'value4': 4.0
}
json_message = json.dumps(message)
%%timeit
json.loads(json_message)
дает мне время синтаксического анализа 3 микросекунды / сообщение, что немного выше 300 тыс. Сообщений / секунду. Сравнивая с u json, quick json и / или json вместо модуля стандартной библиотеки json
, я смог получить пиковые скорости 1 микросекунды / сообщение (с u json), то есть около 1M сообщений в секунду.
Msgpack немного лучше:
import msgpack
message = {
'meta1': "measurement",
'location': "NYC",
'time': "20200101",
'value1': 1.0,
'value2': 2.0,
'value3': 3.0,
'value4': 4.0
}
msgpack_message = msgpack.packb(message)
%%timeit
msgpack.unpackb(msgpack_message)
Дает мне время обработки ~ 750 нс / сообщение (около 100 нс / поле), то есть около 1,3 млн сообщений в секунду. Сначала я думал, что C ++ может быть намного быстрее. Вот пример использования nlohmann / json, хотя это напрямую не сопоставимо с msgpack:
#include <iostream>
#include "json.hpp"
using json = nlohmann::json;
const std::string message = "{\"value\": \"hello\"}";
int main() {
auto jsonMessage = json::parse(message);
for(size_t i=0; i<1000000; ++i) {
jsonMessage = json::parse(message);
}
std::cout << jsonMessage["value"] << std::endl; // To avoid having the compiler optimize the loop away.
};
Компиляция с помощью clang 11.0.3 (std = c ++ 17, -O3 ), это выполняется за ~ 1,4 с на том же Macbook, то есть скорость анализа ~ 700 тыс. сообщений в секунду с сообщениями еще меньшего размера, чем в примере Python. Я знаю, что nlohmann / json может быть довольно медленным и смог получить скорость синтаксического анализа около 2M сообщений в секунду, используя DOM API simd json.
Это все еще слишком медленно для моего случая использования. Я открыт для всех предложений по повышению скорости анализа сообщений с потенциальными приложениями на Python, C ++, Java (или на любом другом языке JVM) или Go.
Примечания:
- Меня не обязательно заботит размер сообщений на диске (считайте это плюсом, если метод хранения, который вы предлагаете, эффективен с точки зрения памяти).
- Все, что мне нужно, это модель ключ-значение для базовых c типы данных - мне не нужны вложенные словари или списки.
- Преобразование существующих данных вообще не проблема. Я просто ищу что-нибудь оптимизированное для чтения.
- Мне не обязательно разбирать всю вещь в структуру или настраиваемый объект, только для доступа к некоторым полям, когда они мне нужны (обычно мне нужна небольшая часть полей каждого сообщения) - это нормально, если это сопровождается штрафом, если штраф не снижает пропускную способность всего приложения.
- Я открыт для нестандартных / слегка небезопасных решений.
- Любой формат, который я выберу использование должно быть естественно разделено, в том смысле, что сообщения будут последовательно записываться в файл (в настоящее время я использую один файл в день, что достаточно для моего варианта использования). В прошлом у меня были проблемы с сообщениями с неправильными разделителями (см. WriteDelimitedTo в Java Protobuf API - потеря одного байта, и весь файл испорчен).
Вещи, которые я уже изучал:
- JSON: экспериментировал с Rapid json, simd json, nlohmann / json, et c ...)
- Плоские файлы с разделителями msgpack (см. этот API: https://github.com/msgpack/msgpack-python#streaming -распаковка ): то, что я в настоящее время использую для хранения сообщений.
- Буферы протокола: немного быстрее, но не совсем подходят для неструктурированной природы данные.
Спасибо !!