Кафка продолжает выдавать запросы, даже если брокер не работает - PullRequest
0 голосов
/ 29 ноября 2018

В настоящее время, когда я создаю продюсера для отправки моих записей и, например, по каким-то причинам kafka недоступен, продюсер продолжает посылать одно и то же сообщение в течение неопределенного времени.Как я могу прекратить создавать сообщения, например, после того, как я получил эту ошибку 3 раза:

Connection to node -1 could not be established. Broker may not be available.

Я использую реактор-производитель kafka:

    @Bean
    public KafkaSender<String, String> createSender() {
        return KafkaSender.create(senderOptions());
    }

    private SenderOptions<String, String> senderOptions() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProperties.getClientId());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducerRetries());
        return SenderOptions.create(props);
    }

и затем использую его для отправки записи:

sender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topicName, null, message), message)))
            .flatMap(result -> {
                if (result.exception() != null) {
                    return Flux.just(ResponseEntity.badRequest()
                        .body(result.exception().getMessage()));
                }
                return Flux.just(ResponseEntity.ok().build());
            })
            .next();

Ответы [ 3 ]

0 голосов
/ 29 ноября 2018

Вы можете использовать шаблон автоматического выключателя для этого типа проблем, но перед применением этого шаблона попробуйте найти основную причину, и кажется, что ваше свойство ProducerConfig.RETRIES_CONFIG где-то переопределяется.

0 голосов
/ 29 ноября 2018

Боюсь, что clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); не участвует в повторных попытках, и он повторяется до maxBlockTimeMs = 60000 по умолчанию.Вы можете уменьшить эту опцию для производителя через свойство ProducerConfig.MAX_BLOCK_MS_CONFIG:

public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
    private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long <code>KafkaProducer.send()</code> and <code>KafkaProducer.partitionsFor()</code> will block."
                                                    + "These methods can be blocked either because the buffer is full or metadata unavailable."
                                                    + "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.";

ОБНОВЛЕНИЕ

Мы можем решить эту проблему следующим образом:

@PostMapping(path = "/v1/{topicName}")
public Mono<ResponseEntity<?>> postData(
    @PathVariable("topicName") String topicName, String message) {
    return sender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topicName, null, message), message)))
        .flatMap(result -> {
            if (result.exception() != null) {
                sender.close();
                return Flux.just(ResponseEntity.badRequest()
                    .body(result.exception().getMessage()));
            }
            return Flux.just(ResponseEntity.ok().build());
        })
        .next();
}

Обратите внимание на sender.close(); в случае ошибки.

Я думаю, что пришло время поднять проблему с проектом Reactor Kafka, чтобы позволить близкому производителю в случае ошибки.

0 голосов
/ 29 ноября 2018

Вместо того, чтобы сосредоточиться на ошибке.Исправьте проблему - она ​​не подключается к брокеру

Вы не переопределяете это в своем файле compose, поэтому ваше приложение пытается подключиться к себе

bootstrap-servers: ${KAFKA_BOOTSTRAP_URL:localhost:9092} 

В файле compose yml,кажется, вы забыли это

rest-proxy:
   environment:
       KAFKA_BOOTSTRAP_URL: kafka:9092

В качестве альтернативы, если возможно, вы можете использовать существующее изображение док-станции Confluent REST Proxy вместо того, чтобы заново изобретать колесо

...