HazelcastJet прокатки-агрегация с удалением предыдущих данных и добавлением новых - PullRequest
0 голосов
/ 30 января 2019

У нас есть вариант использования, когда мы получаем сообщение от Кафки, которое необходимо агрегировать.Это должно быть агрегировано таким образом, чтобы, если обновления приходили с тем же идентификатором, то существующее значение, если необходимо, вычиталось и добавлялось новое значение.

Из различных форумов я узнал, что Jet нехранить необработанные значения, скорее агрегированный результат и некоторые внутренние данные.

В таком случае как мне этого добиться?

Пример

Balance 1 {id:1, amount:100} // aggregated result 100
Balance 2 {id:2, amount:200} // 300
Balance 3 {id:1, amount:400} // 600 after removing 100 and adding 400

Я мог быдобиться простого использования, где каждый раз добавить.Но я не смог достичь агрегации, в которой необходимо вычесть существующее значение и добавить новое значение.

rollingAggregation(AggregatorOperations.summingDouble(<login to add remove>))
    .drainTo(Sinks.logger()).
  1. Баланс 1,2,3 - последовательность сообщений
  2. В комментарии показано, каково агрегированное значение для каждого сообщения, выполняемого самолетом.
  3. Моя цельявляется добавление новой суммы (если идентификатор приходит впервые) и вычитание суммы, если приходит обновленный баланс, т.е. идентификатор такой же, как и раньше.

1 Ответ

0 голосов
/ 31 января 2019

Вы можете попробовать пользовательскую агрегатную операцию, которая выдаст предыдущие и наблюдаемые в данный момент значения, например:

public static <T> AggregateOperation1<T, ?, Tuple2<T, T>> previousAndCurrent() {
    return AggregateOperation
            .withCreate(() -> new Object[2])
            .<T>andAccumulate((acc, current) -> {
                acc[0] = acc[1];
                acc[1] = current;
            })
            .andExportFinish((acc) -> tuple2((T) acc[0], (T) acc[1]));
}

Выходные данные должны быть кортежем вида (previous, current).Затем вы можете применить прокатный агрегат снова к выходу.Чтобы упростить задачу как ввод, у меня есть пара (id, amount) пар.

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer, Long>mapJournal("map", START_FROM_OLDEST)) // (id, amount)
        .groupingKey(Entry::getKey)
        .rollingAggregate(previousAndCurrent(), (key, val) -> val)
        .rollingAggregate(AggregateOperations.summingLong(e -> {
            long prevValue = e.f0() == null ? 0 : e.f0().getValue();
            long newValue = e.f1().getValue();
            return newValue - prevValue;
        }))
        .drainTo(Sinks.logger());

JetConfig config = new JetConfig();
config.getHazelcastConfig().addEventJournalConfig(new EventJournalConfig().setMapName("map"));
JetInstance jet = Jet.newJetInstance(config);

IMapJet<Object, Object> map = jet.getMap("map");

map.put(0, 1L);
map.put(0, 2L);
map.put(1, 10L);
map.put(1, 40L);

jet.newJob(p).join();

Это должно привести к выводу: 1, 2, 12, 42.

...