Запись в топи c с процессора в приложении Spring Cloud Streams Kafka Stream - PullRequest
0 голосов
/ 02 мая 2020

Я использую Processor API для выполнения низкоуровневой обработки в хранилище состояний. Дело в том, что мне также нужно записать в topi c после хранения в магазине. Как это можно сделать в приложениях Spring Cloud Streams Kafka?

@Bean
fun processEvent() = Consumer<KStream<EventId, EventValue>> { event ->

    event.map{
        ...
    }.process(ProcessorSupplier {

            object : Processor<EventId, MappedEventValue> {

                private lateinit var store: KeyValueStore<EventId, MappedEventValue>

                override fun init(context: ProcessorContext) {
                    store = context.getStateStore("event-store") as KeyValueStore<EventId, MappedEventValue>
                }

                override fun process(key: EventId, value: MappedEventValue) {
                    ...
                    store.put(key, processedMappedEventValue)

                    //TODO Write into a topic
                }
            }
    }
}  

1 Ответ

0 голосов
/ 03 мая 2020

Вы не можете. Метод process() является терминальной операцией, которая не позволяет передавать данные в нисходящем направлении. Вместо этого вы можете использовать transform(), хотя (это в основном то же самое, что и process(), но позволяет передавать данные в нисходящем направлении); или, в зависимости от вашего приложения, transformValues() или flatTransform() et c.

Используя transform(), вы получаете KStream обратно, что вы можете записать в топи c.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...