Я следую примеру агрегата весенней кафки здесь: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/kafka-streams-samples/kafka-streams-aggregate
Я построил несколько вариантов этого примера для построения более сложных структур данных, а не просто добавления строки, и все это работает хорошо.
Однако мне интересно использовать слушатель, будь то облачный поток, кафа-поток или нативный Kafka, для ответа на новые агрегатные значения. У меня проблемы с поиском каких-либо конкретных примеров этого или даже чего-то вроде близкого, поэтому я надеюсь получить толчок в правильном направлении.
Я всегда могу вручную написать агрегат в другой поток из метода агрегата
kStream.groupBy(...).aggregate(
...
(s, domainEvent, agg) -> {
...
// Update aggregate
...
secondaryStreamChannel().send(MessageBuilder.withPayload(agg).build());
},
...
);
Но это не кажется идеальным, поскольку это сделало бы фактический поток агрегатов / снимков лишним. Кроме того, теперь он как бы разрывает изоляцию агрегатного метода и связывает его в другой поток.
Я мог бы уведомить некоторый другой обработчик об агрегируемом ключе, но существует потенциальная возможность состояния гонки между выполнением нижестоящего обработчика и завершением агрегирования.
По сути, я хотел бы иметь поток событий, который затем агрегируется, а затем иметь метод, который вызывается и предоставляет по крайней мере агрегатный ключ или пару ключ / агрегат. В идеале метод не должен вызываться до тех пор, пока не станет доступно совокупное изменение. Возможно ли это с помощью существующих API-интерфейсов Cloud-Stream, Kafka-Stream или Kafka? Есть ли другой способ (т.е. ... я пытаюсь привязать этот вариант использования слишком близко к Kafka / streams, где это, возможно, и не требуется)?