Kafka Streams: Как записать событие, когда Kafka Streams записывает данные в целевую тему - PullRequest
0 голосов
/ 26 января 2019

Я работаю над API-интерфейсом kafka-streams. По сути, Kafka-stream получает данные из исходного раздела и после применения некоторого фильтра записывает обратно в целевой раздел Kafka.

Используется зависимость. :

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.1.0</version>
    </dependency>

ниже код для того же. :

{ ...

       // create property

            Properties property =  new Properties();
            property.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
            property.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"kafka_streams_app");
            property.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
            property.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());

            //create topology
            StreamsBuilder streamsBuilder = new StreamsBuilder();

            //build topology
            KStream<String,String> inputTopic = streamsBuilder.stream("source_topic");

          //filtering data
            KStream<String,String> filteredStream = inputTopic.filter(
                    (k,val)-> filterData(val)>10000
            );
            filteredStream.to("target_topic");
            KafkaStreams streams = new KafkaStreams(
                    streamsBuilder.build(),
                    property
            );
            //start our stream app

            streams.start(); 
    ...
    }

вот моя архитектура приложения:

API производителя (производит в теме источника) =>
API-интерфейс kafka-stream (читает исходную тему и отправляет данные целевой теме) => API-интерфейс kafka-customer (читает из целевой темы)

Я хочу, чтобы, когда поток записывал данные в target topic, я хотел зафиксировать событие независимо от того, успешно оно или нет.

Есть ли способ перехватить этот обратный вызов? Спасибо

1 Ответ

0 голосов
/ 26 января 2019

Обратный звонок от производителя, который вы можете указать при отправке на обычном API производителя

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

не отображается в Streams API.

...