<int-kafka:outbound-channel-adapter sync="false"
kafka-template="kafkaTemplate" id="kafkaOutboundChannelAdapter"
topic="learning-topic" channel="KafkaAdapterChan"
send-failure-channel="FailureChan">
</int-kafka:outbound-channel-adapter>
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="xyz:9092" />
... <!-- more producer properties -->
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
Приведенный выше пример отлично работает для счастливого пути. Но когда кластер kafka перезапускается, я вижу первое сообщение после сбоя перезапуска с ошибкой «не лидер» ... запрашивающей обновить метаданные.
Как только метаданные обновлены для первого сообщения после перезапуска , более поздние сообщения публикуются правильно. Поэтому я ищу, если есть какой-либо способ я могу восстановить или обновить метаданные для адаптера исходящего канала производителя kakfa, когда происходит перезапуск кластера kafka?