Узкое место производительности агрегации карт Jet: как избежать сериализации локальных записей? - PullRequest
0 голосов
/ 27 августа 2018

Резюме: Я пытаюсь использовать реактивный трубопровод для агрегации при высокой нагрузке. Я обнаружил, что подавляющее большинство времени выполнения занято сериализацией и десериализацией записей моей карты через поток данных (они реализуют DataSerializable). Я думал, что смысл распределения операции агрегирования по узлам заключается в том, что они имеют прямой доступ к распределенным элементам в куче.

Pipeline:

pipeline.drawFrom(source)
.aggregate(aggregate)
.drainTo(sink);

Источник:

Sources.<Key, Record>map("mapname")

Совокупный:

AggregateOperation1<Entry<Key, Record>, T, Result>=
AggregateOperation
                .withCreate(() -> {
                    Accumulator a = new Accumulator(this);
                    a.initialize();
                    return a;
                }).<Entry<Key, Record>>andAccumulate(
                        (acc, row) -> acc.apply(row))
                .andCombine(
                        (left, right) -> left.combine(right))
                .andFinish(acc -> acc.finish());

Я обнаружил, что при выполнении конвейерного задания источник считывает / записывает записи на карте:

Record.readData
        at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:158)
        at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:105)
        at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:50)
        at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:48)
        at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:191)
        at com.hazelcast.query.impl.CachedQueryEntry.getValue(CachedQueryEntry.java:75)
        at Accumulator.apply(Accumulator.java:102)

код в Accumulator.apply (из шага "andAccumulate"), который вызывает это, выглядит как

private void apply(Entry<IntArr, Record> entry) {
    Record record = entry.getValue();
    ...
}

Как создать источник данных карты Jet, который дает локальным записям или значениям AggregateOperation карты накапливать вызовы на каждом узле и потоке, не вызывая сериализацию? Есть что-то конкретное, что я делаю, чтобы заставить себя так себя вести? Я считаю, что кластер настроен на использование резервных записей для агрегирования; это вызвало бы это? (Я не подтвердил, что это происходит на каждой записи)

EDIT: Это мой текущий MapConfig:

MapConfig mapConfig = new MapConfig(mapName)
    .setStatisticsEnabled(true)
    .setReadBackupData(true)
    .setInMemoryFormat(InMemoryFormat.OBJECT);

Как я понимаю из http://docs.hazelcast.org/docs/3.10.4/manual/html-single/index.html#setting-in-memory-format InMemoryFormat.OBJECT должен дать IMap команду сохранять значения в их объективной (десериализованной) форме.

1 Ответ

0 голосов
/ 28 августа 2018
  1. Hazelcast IMap сохраняет данные в сериализованной форме. Когда вы получаете его из источника, вы получаете экземпляр Map.Entry, который лениво десериализует свой ключ / значение по запросу. Это происходит в вашем entry.getValue() звонке.

  2. Ваш конвейер запрашивает агрегирование данных: выходные данные представляют собой один элемент, который отражает все входные данные. Чтобы достичь этого результата, Jet должен отправить все частичные результаты одному члену, где он вызывает ваш метод Accumulator.combine для их объединения. Влияние ser / de на этом этапе должно быть незначительным по сравнению с приведенным выше.

...