Я создал KafkaProducer, используя (реактор.kafka.sender.KafkaSender), используя Reactor Kafka (функциональный Java API для Kafka). Используя следующие конфигурации производителя,
max.block.ms = 8000
request.timeout.ms= 4000
retries = 3
retry.backoff.ms = 2000
max.in.flight.requests.per.connection = 512
acks = all
, когда я пытаюсь опубликовать sh запись в недопустимом topi c, я получаю исключение тайм-аута
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 8000 ms
ожидается. Но я настроил для повторных попыток, которые не происходят. Я предполагаю, что после того, как max.block.ms
/ request.timeout.ms
закончится, повторная попытка произойдет после того, как все retry.backoff.ms
до metadata.max.age.ms
или retries
будут исчерпаны. К вашему сведению, код:
String topic = "order/";
int count = 1;
Flux<SenderRecord<String, Event, EventInfo>> source = Flux.range(1, count).map(x -> {
Event event = new Event();
return SenderRecord.create(
new ProducerRecord<String, Event>(topic, event.getX(),
event), event.getEvent());
});
kafkaSender.send(source).subscribe(x -> System.out.println(x));
kafkaSender.close();
- правильны ли конфигурации для включения повторной попытки?
- когда повторная попытка произойдет после
request.timeout.ms
/ max.block.ms
? - Какие изменения необходимо внести в приведенный выше код, чтобы повторить попытку?