Я использую весеннюю интеграцию с API производителя kafka.Я хочу остановить мой API-интерфейс Producer, если у меня есть какое-либо MessageException или ProducerException.
В потребителе (обычный spring-kafka) я использую ConsumerAwareListenerErrorHandler для простой остановки прослушивателя.
Точно так же я ищу некоторые обработчики в SI, чтобы остановить мой контейнер производителя.
Создал канал ошибок и получил все мои исключения в этот канал, но не смог найти способчтобы остановить мой контейнер.Пробовал что-то с SmartLifeCycle, но не повезло.
Здесь мой Бин для публикации сообщений на тему Кафки
@Bean
public IntegrationFlow hanldeKafka() {
return IntegrationFlows.from(sendToKafkaChannel)
.handle(
kafkaMessageHandler(producerFactory, topicName),
e -> e.id("myProducer"))
.get();
}
public KafkaProducerMessageHandlerTemplateSpec<String, String> kafkaMessageHandler(
ProducerFactory<String, String> producer, String topic) {
return Kafka
.outboundChannelAdapter(producer)
.sync(true)
.headerMapper(kafkaHeaderMapper())
.messageKey(m -> {
try {
Message value = jsonObjectMapper.fromJson(m.getPayload(), Message.class);
return value.getlId();
} catch (Exception e) {
log.error(e.toString());
throw new MessagingException(m);
}
})
//.partitionId(m -> 10)
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
Может кто-нибудь подсказать, как добавить некоторые обработчики / контроллеры, чтобы остановить контейнер производителя /API.