Резюме:
Я пытаюсь использовать реактивный трубопровод для агрегации при высокой нагрузке. Я обнаружил, что подавляющее большинство времени выполнения занято сериализацией и десериализацией записей моей карты через поток данных (они реализуют DataSerializable). Я думал, что смысл распределения операции агрегирования по узлам заключается в том, что они имеют прямой доступ к распределенным элементам в куче.
Pipeline:
pipeline.drawFrom(source)
.aggregate(aggregate)
.drainTo(sink);
Источник:
Sources.<Key, Record>map("mapname")
Совокупный:
AggregateOperation1<Entry<Key, Record>, T, Result>=
AggregateOperation
.withCreate(() -> {
Accumulator a = new Accumulator(this);
a.initialize();
return a;
}).<Entry<Key, Record>>andAccumulate(
(acc, row) -> acc.apply(row))
.andCombine(
(left, right) -> left.combine(right))
.andFinish(acc -> acc.finish());
Я обнаружил, что при выполнении конвейерного задания источник считывает / записывает записи на карте:
Record.readData
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:158)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:105)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:50)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:48)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:191)
at com.hazelcast.query.impl.CachedQueryEntry.getValue(CachedQueryEntry.java:75)
at Accumulator.apply(Accumulator.java:102)
код в Accumulator.apply (из шага "andAccumulate"), который вызывает это, выглядит как
private void apply(Entry<IntArr, Record> entry) {
Record record = entry.getValue();
...
}
Как создать источник данных карты Jet, который дает локальным записям или значениям AggregateOperation карты накапливать вызовы на каждом узле и потоке, не вызывая сериализацию? Есть что-то конкретное, что я делаю, чтобы заставить себя так себя вести?
Я считаю, что кластер настроен на использование резервных записей для агрегирования; это вызвало бы это? (Я не подтвердил, что это происходит на каждой записи)
EDIT:
Это мой текущий MapConfig:
MapConfig mapConfig = new MapConfig(mapName)
.setStatisticsEnabled(true)
.setReadBackupData(true)
.setInMemoryFormat(InMemoryFormat.OBJECT);
Как я понимаю из http://docs.hazelcast.org/docs/3.10.4/manual/html-single/index.html#setting-in-memory-format InMemoryFormat.OBJECT должен дать IMap команду сохранять значения в их объективной (десериализованной) форме.