Spring-Cloud-Streams Kafka - Как остановить потребителей - PullRequest
0 голосов
/ 05 апреля 2019

У меня есть клиент Spring-Cloud-Streams, который читает тему Kakfa, состоящую из нескольких разделов. Клиент вызывает веб-сервис для каждого сообщения Kafka, которое он читает. Если после нескольких попыток веб-служба недоступна, я хочу запретить потребителю читать из Kafka. Обращаясь к предыдущему вопросу Stackoverflow ( Spring Cloud Stream, связующие паузы / возобновления kafka ), я автоматически подключил BindingsEndpoint и вызвал метод changeState () , чтобы попытаться остановить потребителя но журналы показывают, что потребитель продолжает читать сообщения от Kafka после вызова changeState () .

Я использую Spring Boot версии 2.1.2.RELEASE с версией Spring Cloud Greenwich.RELEASE. Управляемая версия для spring-cloud-stream-binder-kafka - это 2.1.0.RELEASE. Я установил свойства autoCommitOffset = true и autoCommitOnError = false.

Ниже приведен фрагмент моих кодов. Я что-то пропустил? Первый входной параметр для changeState () должен быть именем темы?

Если я хочу, чтобы приложение потребителя закрывалось, когда веб-служба недоступна, могу ли я просто выполнить System.exit () без необходимости сначала останавливать потребителя?

@Autowired
private BindingsEndpoint bindingsEndpoint;

...
...
@StreamListener(MyInterface.INPUT)  
    public void read(@Payload MyDTO dto,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
        try {

            logger.info("Processing message "+dto);
            process(dto); // this is the method that calls the webservice


        } catch (Exception e) {

            if (e instanceof IllegalStateException || e instanceof ConnectException) {

                bindingsEndpoint.changeState("my.topic.name", 
                    BindingsEndpoint.State.STOPPED);    
                // Binding<?> b = bindingsEndpoint.queryState("my.topic.name"); ==> Using topic name returns a valid Binding object                     
            }

                e.printStackTrace();
                throw (e);
  }
}

1 Ответ

1 голос
/ 05 апреля 2019

Это можно сделать с помощью функции Визуализация и управление связыванием , где вы можете визуализировать , а также остановить / запустить / приостановить / возобновить привязок.

Кроме того, вы знаете, что System.exit() завершит работу всей JVM?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...