Макс Агрегация с Hazelcast-Jet - PullRequest
0 голосов
/ 19 марта 2019

Я хочу сделать простой максимум для всего набора данных. Я начал с примера Кафки по адресу: https://github.com/hazelcast/hazelcast-jet-code-samples/blob/0.7-maintenance/kafka/src/main/java/avro/KafkaAvroSource.java

Я только что изменил конвейер на:

p.drawFrom(KafkaSources.<Integer, User>kafka(brokerProperties(), TOPIC))
    .map(Map.Entry::getValue)
    .rollingAggregate(minBy(comparingInt(user -> (Integer) user.get(2))))
    .map(user -> (Integer) user.get(2))
    .drainTo(Sinks.list("result"));

и перейти к:

IListJet<Integer> res = jet.getList("result");
SECONDS.sleep(10);
System.out.println(res.get(0));
SECONDS.sleep(15);
System.out.println(res.get(0));
cancel(job);

чтобы получить самый большой возраст людей в теме. Однако он не возвращает 20 и, похоже, возвращает разные значения при разных запусках. Есть идеи почему?

1 Ответ

2 голосов
/ 19 марта 2019

Вы, кажется, используете rollingAggregate, который создает новый элемент вывода каждый раз, когда он получает какой-либо вход, но все, что вы проверяете, это первый элемент, который он испустил.Вместо этого вы должны найти последний элемент, который он испустил.Один из способов добиться этого - отправить результат в приемник IMap, используя один и тот же ключ каждый раз:

p.drawFrom(KafkaSources.<Integer, User>kafka(brokerProperties(), TOPIC))
 .withoutTimestamps()
 .map(Map.Entry::getValue)
 .rollingAggregate(minBy(comparingInt(user -> (Integer) user.get(2))))
 .map(user -> entry("user", (Integer) user.get(2)))
 .drainTo(Sinks.map("result"));

Вы можете получить последний результат с помощью

IMap<String, Integer> result = jet.getMap("result");
System.out.println(result.get("user");
...