В настоящее время, когда я создаю продюсера для отправки моих записей и, например, по каким-то причинам kafka недоступен, продюсер продолжает посылать одно и то же сообщение в течение неопределенного времени.Как я могу прекратить создавать сообщения, например, после того, как я получил эту ошибку 3 раза:
Connection to node -1 could not be established. Broker may not be available.
Я использую реактор-производитель kafka:
@Bean
public KafkaSender<String, String> createSender() {
return KafkaSender.create(senderOptions());
}
private SenderOptions<String, String> senderOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProperties.getClientId());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducerRetries());
return SenderOptions.create(props);
}
и затем использую его для отправки записи:
sender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topicName, null, message), message)))
.flatMap(result -> {
if (result.exception() != null) {
return Flux.just(ResponseEntity.badRequest()
.body(result.exception().getMessage()));
}
return Flux.just(ResponseEntity.ok().build());
})
.next();