У меня есть поток на основе DSL, который использует split
для перебора списка объектов и отправки сообщения Кафки:
.transform(...)
.split()
.channel(KAFKA_OUT_CHANNEL)
После того, как все сообщения были отправлены, мне нужно позвонить в службу, а также записать, сколько сообщений было обработано.
Я понимаю, что подход заключается в использовании publishSubscribeChannel
, когда первый subscribe
выполняет фактическую отправку Kafka, а затем aggregate
выполняет служебный вызов:
.transform(...)
.split().
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(f -> f.channel(KAFKA_OUT_CHANNEL)))
У меня проблемы с выяснением того, как на самом деле выполнить .aggregate
часть в pubSubChannel с использованием DSL. Пока я пробовал:
.subscribe(f -> f.channel(KAFKA_OUT_CHANNEL)
.subscribe(f -> f.aggregate(c -> c.processor( ?? ))))
Есть указатели?