Это мой первый пост по этому вопросу здесь, и я не уверен, что это было описано здесь ранее, но здесь идет речь: у меня есть приложение Kafka Streams, использующее Processor API, в соответствии с топологией, приведенной ниже:
1. Consume data from an input topic (processor.addSource())
2. Inserts data into a DB (processor.addProcessor())
3. Produce its process status to an output topic (processor.addSink())
Приложение работает большое время, однако для целей отслеживания мне нужно, чтобы в журналах был момент, когда kstreams генерировал сообщение для вывода topi c, а также его RecordMetaData (topi c, partition, offset).
Пример ниже:
KEY="MY_KEY" OUTPUT_TOPIC="MY-OUTPUT-TOPIC" PARTITION="1" OFFSET="1000" STATUS="SUCCESS"
Я не уверен, есть ли способ переопределить производителя потоков kafka по умолчанию, чтобы добавить эту запись или, возможно, создать моего собственного производителя, чтобы подключить его к addSink обработать. Я частично достиг этого, реализовав свой собственный ExceptionHandler (default.producer.exception.handler), но он охватывает только исключения.
Заранее спасибо,
Гильерме