У меня есть клиент Spring-Cloud-Streams, который читает тему Kakfa, состоящую из нескольких разделов. Клиент вызывает веб-сервис для каждого сообщения Kafka, которое он читает. Если после нескольких попыток веб-служба недоступна, я хочу запретить потребителю читать из Kafka. Обращаясь к предыдущему вопросу Stackoverflow ( Spring Cloud Stream, связующие паузы / возобновления kafka ), я автоматически подключил BindingsEndpoint и вызвал метод changeState () , чтобы попытаться остановить потребителя но журналы показывают, что потребитель продолжает читать сообщения от Kafka после вызова changeState () .
Я использую Spring Boot версии 2.1.2.RELEASE с версией Spring Cloud Greenwich.RELEASE. Управляемая версия для spring-cloud-stream-binder-kafka - это 2.1.0.RELEASE. Я установил свойства autoCommitOffset = true и autoCommitOnError = false.
Ниже приведен фрагмент моих кодов. Я что-то пропустил? Первый входной параметр для changeState () должен быть именем темы?
Если я хочу, чтобы приложение потребителя закрывалось, когда веб-служба недоступна, могу ли я просто выполнить System.exit () без необходимости сначала останавливать потребителя?
@Autowired
private BindingsEndpoint bindingsEndpoint;
...
...
@StreamListener(MyInterface.INPUT)
public void read(@Payload MyDTO dto,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
try {
logger.info("Processing message "+dto);
process(dto); // this is the method that calls the webservice
} catch (Exception e) {
if (e instanceof IllegalStateException || e instanceof ConnectException) {
bindingsEndpoint.changeState("my.topic.name",
BindingsEndpoint.State.STOPPED);
// Binding<?> b = bindingsEndpoint.queryState("my.topic.name"); ==> Using topic name returns a valid Binding object
}
e.printStackTrace();
throw (e);
}
}