Я работаю над API-интерфейсом kafka-streams. По сути, Kafka-stream получает данные из исходного раздела и после применения некоторого фильтра записывает обратно в целевой раздел Kafka.
Используется зависимость. :
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.1.0</version>
</dependency>
ниже код для того же. :
{
...
// create property
Properties property = new Properties();
property.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
property.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"kafka_streams_app");
property.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
property.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
//create topology
StreamsBuilder streamsBuilder = new StreamsBuilder();
//build topology
KStream<String,String> inputTopic = streamsBuilder.stream("source_topic");
//filtering data
KStream<String,String> filteredStream = inputTopic.filter(
(k,val)-> filterData(val)>10000
);
filteredStream.to("target_topic");
KafkaStreams streams = new KafkaStreams(
streamsBuilder.build(),
property
);
//start our stream app
streams.start();
...
}
вот моя архитектура приложения:
API производителя (производит в теме источника) =>
API-интерфейс kafka-stream (читает исходную тему и отправляет данные целевой теме)
=> API-интерфейс kafka-customer (читает из целевой темы)
Я хочу, чтобы, когда поток записывал данные в target topic
, я хотел зафиксировать событие независимо от того, успешно оно или нет.
Есть ли способ перехватить этот обратный вызов? Спасибо