Как остановить мой контейнер-производитель / API в Spring-интеграции с расширением kafka - PullRequest
1 голос
/ 27 мая 2019

Я использую весеннюю интеграцию с API производителя kafka.Я хочу остановить мой API-интерфейс Producer, если у меня есть какое-либо MessageException или ProducerException.

В потребителе (обычный spring-kafka) я использую ConsumerAwareListenerErrorHandler для простой остановки прослушивателя.

Точно так же я ищу некоторые обработчики в SI, чтобы остановить мой контейнер производителя.

Создал канал ошибок и получил все мои исключения в этот канал, но не смог найти способчтобы остановить мой контейнер.Пробовал что-то с SmartLifeCycle, но не повезло.

Здесь мой Бин для публикации сообщений на тему Кафки

@Bean
public IntegrationFlow hanldeKafka() {
    return IntegrationFlows.from(sendToKafkaChannel)
            .handle(
                    kafkaMessageHandler(producerFactory, topicName),
                    e -> e.id("myProducer"))
            .get();
}


public KafkaProducerMessageHandlerTemplateSpec<String, String> kafkaMessageHandler(
        ProducerFactory<String, String> producer, String topic) {
    return Kafka
            .outboundChannelAdapter(producer)
            .sync(true)
            .headerMapper(kafkaHeaderMapper())
            .messageKey(m ->  {
                try {
                Message value = jsonObjectMapper.fromJson(m.getPayload(), Message.class);
                return value.getlId();
                } catch (Exception e) {
                    log.error(e.toString());
                    throw new MessagingException(m);
                }
            })
            //.partitionId(m -> 10)
            .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
            .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}

Может кто-нибудь подсказать, как добавить некоторые обработчики / контроллеры, чтобы остановить контейнер производителя /API.

1 Ответ

0 голосов
/ 28 мая 2019

Непонятно, что вы подразумеваете под "контейнером производителя" - такого понятия нет.

Чтобы окончательно закрыть вызов Producer (s) reset() на DefaultKafkaProducerFactory.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...