Производитель реактора Кафка - Невозможно повторить - PullRequest
0 голосов
/ 22 апреля 2020

Я создал KafkaProducer, используя (реактор.kafka.sender.KafkaSender), используя Reactor Kafka (функциональный Java API для Kafka). Используя следующие конфигурации производителя,

max.block.ms = 8000
request.timeout.ms= 4000
retries = 3
retry.backoff.ms = 2000
max.in.flight.requests.per.connection = 512
acks = all

, когда я пытаюсь опубликовать sh запись в недопустимом topi c, я получаю исключение тайм-аута

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 8000 ms

ожидается. Но я настроил для повторных попыток, которые не происходят. Я предполагаю, что после того, как max.block.ms / request.timeout.ms закончится, повторная попытка произойдет после того, как все retry.backoff.ms до metadata.max.age.ms или retries будут исчерпаны. К вашему сведению, код:

    String topic = "order/";
    int count = 1;
    Flux<SenderRecord<String, Event, EventInfo>> source = Flux.range(1, count).map(x -> {
      Event event = new Event();
      return SenderRecord.create(
            new ProducerRecord<String, Event>(topic, event.getX(),
                event), event.getEvent());
    });
    kafkaSender.send(source).subscribe(x -> System.out.println(x));
    kafkaSender.close();
  • правильны ли конфигурации для включения повторной попытки?
  • когда повторная попытка произойдет после request.timeout.ms / max.block.ms?
  • Какие изменения необходимо внести в приведенный выше код, чтобы повторить попытку?

1 Ответ

0 голосов
/ 23 апреля 2020

Полагаю, вам также следует установить «delivery.timeout.ms»

См. Документацию здесь: https://docs.confluent.io/current/installation/configuration/producer-configs.html#retries

Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first before successful acknowledgement. Users should generally prefer to leave this config unset and instead use delivery.timeout.ms to control retry behavior.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...