Можно ли использовать кафку для ответа на агрегат? - PullRequest
1 голос
/ 31 мая 2019

Я следую примеру агрегата весенней кафки здесь: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/kafka-streams-samples/kafka-streams-aggregate

Я построил несколько вариантов этого примера для построения более сложных структур данных, а не просто добавления строки, и все это работает хорошо.

Однако мне интересно использовать слушатель, будь то облачный поток, кафа-поток или нативный Kafka, для ответа на новые агрегатные значения. У меня проблемы с поиском каких-либо конкретных примеров этого или даже чего-то вроде близкого, поэтому я надеюсь получить толчок в правильном направлении.

Я всегда могу вручную написать агрегат в другой поток из метода агрегата

kStream.groupBy(...).aggregate(
    ...
    (s, domainEvent, agg) -> {
        ...
        // Update aggregate
        ...
        secondaryStreamChannel().send(MessageBuilder.withPayload(agg).build());
    },
    ...
);

Но это не кажется идеальным, поскольку это сделало бы фактический поток агрегатов / снимков лишним. Кроме того, теперь он как бы разрывает изоляцию агрегатного метода и связывает его в другой поток.

Я мог бы уведомить некоторый другой обработчик об агрегируемом ключе, но существует потенциальная возможность состояния гонки между выполнением нижестоящего обработчика и завершением агрегирования.

По сути, я хотел бы иметь поток событий, который затем агрегируется, а затем иметь метод, который вызывается и предоставляет по крайней мере агрегатный ключ или пару ключ / агрегат. В идеале метод не должен вызываться до тех пор, пока не станет доступно совокупное изменение. Возможно ли это с помощью существующих API-интерфейсов Cloud-Stream, Kafka-Stream или Kafka? Есть ли другой способ (т.е. ... я пытаюсь привязать этот вариант использования слишком близко к Kafka / streams, где это, возможно, и не требуется)?

...