Kafka Streams - переопределить реализацию addSink по умолчанию / пользовательский производитель - PullRequest
1 голос
/ 30 апреля 2020

Это мой первый пост по этому вопросу здесь, и я не уверен, что это было описано здесь ранее, но здесь идет речь: у меня есть приложение Kafka Streams, использующее Processor API, в соответствии с топологией, приведенной ниже:

1. Consume data from an input topic (processor.addSource())
2. Inserts data into a DB (processor.addProcessor())
3. Produce its process status to an output topic (processor.addSink())

Приложение работает большое время, однако для целей отслеживания мне нужно, чтобы в журналах был момент, когда kstreams генерировал сообщение для вывода topi c, а также его RecordMetaData (topi c, partition, offset).

Пример ниже:

KEY="MY_KEY" OUTPUT_TOPIC="MY-OUTPUT-TOPIC" PARTITION="1" OFFSET="1000" STATUS="SUCCESS"

Я не уверен, есть ли способ переопределить производителя потоков kafka по умолчанию, чтобы добавить эту запись или, возможно, создать моего собственного производителя, чтобы подключить его к addSink обработать. Я частично достиг этого, реализовав свой собственный ExceptionHandler (default.producer.exception.handler), но он охватывает только исключения.

Заранее спасибо,

Гильерме

1 Ответ

1 голос
/ 30 апреля 2020

Если вы сконфигурируете потоковое приложение для использования ProducerInterceptor, тогда вы сможете получить необходимую информацию. В частности, реализация onAcknowledgement () обеспечит доступ ко всему, что вы перечислили выше.

Для настройки перехватчиков в потоковом приложении:

 Properties props = new Properties();
 // add this configuration in addition to your other streams configs

props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), Collections.singletonList(MyProducerInterceptor.class));

Вы можете предоставить более один перехватчик, если хотите, просто добавьте имя класса и измените реализацию списка с одиночного на обычный List. Выполнение перехватчиков следует порядку классов в списке.

РЕДАКТИРОВАТЬ: просто чтобы быть понятным, вы можете переопределить предоставленные Producer в потоках Kafka через интерфейс KafkaClientSupplier , но ИМХО использование перехватчика - более чистый подход. Но какое направление к go зависит от вас. Вы передаете свой KafkaClientSupplier в перегруженном конструкторе Kafka Streams .

...