Вы, кажется, используете rollingAggregate
, который создает новый элемент вывода каждый раз, когда он получает какой-либо вход, но все, что вы проверяете, это первый элемент, который он испустил.Вместо этого вы должны найти последний элемент, который он испустил.Один из способов добиться этого - отправить результат в приемник IMap
, используя один и тот же ключ каждый раз:
p.drawFrom(KafkaSources.<Integer, User>kafka(brokerProperties(), TOPIC))
.withoutTimestamps()
.map(Map.Entry::getValue)
.rollingAggregate(minBy(comparingInt(user -> (Integer) user.get(2))))
.map(user -> entry("user", (Integer) user.get(2)))
.drainTo(Sinks.map("result"));
Вы можете получить последний результат с помощью
IMap<String, Integer> result = jet.getMap("result");
System.out.println(result.get("user");