MongoDB Java API для медленного чтения - PullRequest
0 голосов
/ 30 августа 2018

Мы читаем из местной MongoDB все документы из коллекций, и производительность не очень блестящая.

Нам нужно сбросить все данные, не беспокойтесь о том, почему, просто поверьте, что это действительно необходимо и обходного пути не существует.

У нас есть 4 миллиона документов, которые выглядят так:

{
    "_id":"4d094f58c96767d7a0099d49",
    "exchange":"NASDAQ",
    "stock_symbol":"AACC",
    "date":"2008-03-07",
    "open":8.4,
    "high":8.75,
    "low":8.08,
    "close":8.55,
    "volume":275800,
    "adj close":8.55
}

И мы используем этот пока тривиальный код для чтения:

MongoClient mongoClient = MongoClients.create();
MongoDatabase database = mongoClient.getDatabase("localhost");
MongoCollection<Document> collection = database.getCollection("test");

MutableInt count = new MutableInt();
long start = System.currentTimeMillis();
collection.find().forEach((Block<Document>) document -> count.increment() /* actually something more complicated */ );
long start = System.currentTimeMillis();

Мы читаем всю коллекцию за 16 секунд (250 тыс. Строк / сек), что совсем не впечатляет при работе с небольшими документами. Имейте в виду, что мы хотим загрузить 800 миллионов строк. Совокупность, уменьшение карты или аналогичные значения невозможны.

Это так быстро, как получает MongoDB, или есть другие способы быстрее загружать документы (другие методы, перемещение Linux, больше оперативной памяти, настройки ...)?

Ответы [ 5 ]

0 голосов
/ 10 сентября 2018

Во-первых, как прокомментировал @ xtreme-biker, производительность сильно зависит от вашего оборудования. В частности, мой первый совет будет проверять, работаете ли вы на виртуальной машине или на собственном хосте. В моем случае с виртуальной машиной CentOS на i7 с SDD-диском я могу читать 123 000 документов в секунду, но точно такой же код, работающий на хосте Windows на том же диске, читает до 387 000 документов в секунду.

Далее, давайте предположим, что вам действительно нужно прочитать полную коллекцию. Это означает, что вы должны выполнить полное сканирование. И давайте предположим, что вы не можете изменить конфигурацию сервера MongoDB, а только оптимизировать свой код.

Тогда все сводится к тому, что

collection.find().forEach((Block<Document>) document -> count.increment());

на самом деле.

Быстрое развертывание MongoCollection.find () показывает, что он действительно делает это:

ReadPreference readPref = ReadPreference.primary();
ReadConcern concern = ReadConcern.DEFAULT;
MongoNamespace ns = new MongoNamespace(databaseName,collectionName);
Decoder<Document> codec = new DocumentCodec();
FindOperation<Document> fop = new FindOperation<Document>(ns,codec);
ReadWriteBinding readBinding = new ClusterBinding(getCluster(), readPref, concern);
QueryBatchCursor<Document> cursor = (QueryBatchCursor<Document>) fop.execute(readBinding);
AtomicInteger count = new AtomicInteger(0);
try (MongoBatchCursorAdapter<Document> cursorAdapter = new MongoBatchCursorAdapter<Document>(cursor)) {
    while (cursorAdapter.hasNext()) {
        Document doc = cursorAdapter.next();
        count.incrementAndGet();
    }
}

Здесь FindOperation.execute() довольно быстрый (до 10 мс), и большую часть времени проводит внутри цикла while и, в частности, внутри частного метода QueryBatchCursor.getMore()

getMore() вызывает DefaultServerConnection.command() и его время расходуется в основном на две операции: 1) выборка строковых данных с сервера и 2) преобразование строковых данных в BsonDocument .

Оказывается, что Mongo довольно умен в отношении того, сколько сетевых обходов он сделает, чтобы получить большой набор результатов. Сначала он извлекает 100 результатов с помощью команды firstBatch, а затем выбирает большие партии, а nextBatch - это размер пакета, в зависимости от размера коллекции, до предела.

Итак, под лесом что-то вроде этого произойдет, чтобы получить первую партию.

ReadPreference readPref = ReadPreference.primary();
ReadConcern concern = ReadConcern.DEFAULT;
MongoNamespace ns = new MongoNamespace(databaseName,collectionName);
FieldNameValidator noOpValidator = new NoOpFieldNameValidator();
DocumentCodec payloadDecoder = new DocumentCodec();
Constructor<CodecProvider> providerConstructor = (Constructor<CodecProvider>) Class.forName("com.mongodb.operation.CommandResultCodecProvider").getDeclaredConstructor(Decoder.class, List.class);
providerConstructor.setAccessible(true);
CodecProvider firstBatchProvider = providerConstructor.newInstance(payloadDecoder, Collections.singletonList("firstBatch"));
CodecProvider nextBatchProvider = providerConstructor.newInstance(payloadDecoder, Collections.singletonList("nextBatch"));
Codec<BsonDocument> firstBatchCodec = fromProviders(Collections.singletonList(firstBatchProvider)).get(BsonDocument.class);
Codec<BsonDocument> nextBatchCodec = fromProviders(Collections.singletonList(nextBatchProvider)).get(BsonDocument.class);
ReadWriteBinding readBinding = new ClusterBinding(getCluster(), readPref, concern);
BsonDocument find = new BsonDocument("find", new BsonString(collectionName));
Connection conn = readBinding.getReadConnectionSource().getConnection();

BsonDocument results = conn.command(databaseName,find,noOpValidator,readPref,firstBatchCodec,readBinding.getReadConnectionSource().getSessionContext(), true, null, null);
BsonDocument cursor = results.getDocument("cursor");
long cursorId = cursor.getInt64("id").longValue();

BsonArray firstBatch = cursor.getArray("firstBatch");

Затем cursorId используется для извлечения каждой следующей партии.

По моему мнению, «проблема» с реализацией драйвера заключается в том, что декодер String to JSON внедряется, а JsonReader, на который опирается метод decode (), - нет. Так происходит даже до com.mongodb.internal.connection.InternalStreamConnection, где вы уже находитесь рядом с сокетом.

Поэтому я думаю, что вряд ли можно что-то сделать, чтобы улучшить MongoCollection.find(), если вы не зайдете так глубоко, как InternalStreamConnection.sendAndReceiveAsync()

Вы не можете уменьшить количество циклов и не можете изменить способ преобразования ответа в BsonDocument. Не без обхода драйвера и написания своего собственного клиента, что, я сомневаюсь, является хорошей идеей.

PD Если вы хотите попробовать часть приведенного выше кода, вам потребуется метод getCluster (), который требует грязного взлома mongo-java-driver .

private Cluster getCluster() {
    Field cluster, delegate;
    Cluster mongoCluster = null;
    try {
        delegate = mongoClient.getClass().getDeclaredField("delegate");
        delegate.setAccessible(true);
        Object clientDelegate = delegate.get(mongoClient);
        cluster = clientDelegate.getClass().getDeclaredField("cluster");
        cluster.setAccessible(true);
        mongoCluster = (Cluster) cluster.get(clientDelegate);
    } catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) {
        System.err.println(e.getClass().getName()+" "+e.getMessage());
    }
    return mongoCluster;
}
0 голосов
/ 10 сентября 2018

По моим подсчетам, вы обрабатываете около 50 МБ / с (250 тыс. Строк / с * 0,2 КБ / строки). Это распространяется как на диск, так и на узкие места в сети. Какой тип хранилища использует MongoDB? Какую полосу пропускания вы имеете между клиентом и сервером MongoDB? Вы пробовали совмещать сервер и клиент в высокоскоростной (> = 10 Гбит / с) сети с минимальной задержкой (<1,0 мс)? Имейте в виду, что если вы используете провайдера облачных вычислений, такого как AWS или GCP, у них будут узкие места виртуализации, которые превышают физические. </p>

Вы спрашивали о настройках, которые могут помочь. Вы можете попробовать изменить параметры сжатия для соединения и для коллекции (варианты: «none», snappy и zlib). Даже если ни один из них не улучшит snappy, то, увидев разницу, которую настройка (или не делает) настройка, может помочь выяснить, какая часть системы находится под наибольшим стрессом.

Java не обладает хорошей производительностью по сравнению с C ++ или Python, поэтому вы можете переписать эту конкретную операцию на одном из этих языков и затем интегрировать ее с вашим кодом Java. Я предлагаю вам выполнить тестовый цикл, просто зацикливая данные в Python и сравнивая их с тем же в Java.

0 голосов
/ 04 сентября 2018

Вы не указали свой вариант использования, поэтому очень сложно сказать вам, как настроить ваш запрос. (Т.е .: кто захочет загружать 800-миллионный ряд за раз только для подсчета?).

Учитывая вашу схему, я думаю, что ваши данные почти только для чтения, и ваша задача связана с агрегацией данных.

Ваша текущая работа - это просто чтение данных (скорее всего, ваш драйвер будет считывать данные в пакетном режиме), затем остановка, а затем выполнение некоторых вычислений (черт, да, использование int-обертки для увеличения времени обработки еще больше), а затем повторение. Это не очень хороший подход. БД не работает магически быстро, если вы не обращаетесь к ней правильным способом.

Если вычисления не слишком сложные, я предлагаю вам использовать структуру агрегации вместо загрузки всего в вашу RAM.

Что-то, что вы должны рассмотреть, чтобы улучшить агрегацию:

  1. Разделите ваш набор данных на меньший набор. (Например: раздел date, раздел exchange ...). Добавьте индекс для поддержки этого раздела и выполните агрегацию в разделе, а затем объедините результат (Типичный подход «разделяй и властвуй»)
  2. Проектировать только необходимые поля
  3. Отфильтровать ненужный документ (если возможно)
  4. Разрешить использование диска, если вы не можете выполнить агрегирование памяти (если вы достигли предела в 100 МБ на пипилин).
  5. Используйте встроенный конвейер для ускорения вычислений (например, $count для вашего примера)

Если ваши вычисления слишком сложны, и вы не можете выразить их с помощью структуры агрегации, используйте mapReduce. Он работает по процессу mongod, и данные не нужно передавать по сети в вашу память.

Обновлено

Похоже, вы хотите выполнить обработку OLAP и застряли на шаге ETL.

Вам не нужно каждый раз загружать все данные OLTP в OLAP. Нужно только загрузить новые изменения в ваше хранилище данных. Тогда первая загрузка / выгрузка данных занимает больше времени, это нормально и приемлемо.

При первой загрузке следует учитывать следующие моменты:

  1. Divide-N-Conquer, опять же, разбивает ваши данные на меньший набор данных (с предикатом, таким как дата / биржа / биржевая бирка ...)
  2. Выполните параллельные вычисления, затем объедините ваш результат (вы должны правильно разделить ваш набор данных)
  3. Выполнять вычисления в пакетном режиме вместо обработки в forEach: загрузить раздел данных, а затем вычислить вместо одного за другим.
0 голосов
/ 04 сентября 2018

То, что я должен был сделать в вашем случае, было простым решением, и в то же время эффективным способом было максимизировать общую пропускную способность с помощью parallelCollectionScan

Позволяет приложениям использовать несколько параллельных курсоров при чтении всех документы из коллекции, тем самым увеличивая пропускную способность. Команда parallelCollectionScan возвращает документ, который содержит массив информации о курсоре.

Каждый курсор обеспечивает доступ к возвращению частичного набора документы из коллекции. Повторение каждого курсора возвращает каждый документ в коллекции. Курсоры не содержат результатов команда базы данных. Результат команды базы данных идентифицирует курсоры, но не содержат и не составляют курсоры.

Простой пример с parallelCollectionScan должен выглядеть примерно так

 MongoClient mongoClient = MongoClients.create();
 MongoDatabase database = mongoClient.getDatabase("localhost");
 Document commandResult = database.runCommand(new Document("parallelCollectionScan", "collectionName").append("numCursors", 3));
0 голосов
/ 30 августа 2018

collection.find().forEach((Block<Document>) document -> count.increment());

Эта строка может складываться много времени, так как вы перебираете в памяти более 250 тыс. Записей.

Чтобы быстро проверить, так ли это, вы можете попробовать это -

long start1 = System.currentTimeMillis();
List<Document> documents = collection.find();
System.out.println(System.currentTimeMillis() - start1);

long start2 = System.currentTimeMillis();
documents.forEach((Block<Document>) document -> count.increment());
System.out.println(System.currentTimeMillis() - start2);

Это поможет вам понять, сколько времени на самом деле требуется для получения документов из базы данных и сколько времени занимает итерация.

...