кафка асинхронная выдает потерянное сообщение - PullRequest
0 голосов
/ 20 апреля 2020

Попробуйте выполнить инструкцию на inte rnet, чтобы добиться кафки асинхронного продукта. Вот как выглядит мой продюсер:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public void asynSend(String topic, Integer partition, String message) {
    ProducerRecord<Object, Object> data = new ProducerRecord<>(topic, partition,null, message);
        producer.send(data, new DefaultProducerCallback());
    }

private static class DefaultProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
            logger.error("Asynchronous produce failed");
        }
    }
}

И я выдаю в течение l oop примерно так:

for (int i = 0; i < 5000; i++) {
    int partition = i % 2;
    FsProducerFactory.getInstance().asynSend(topic, partition,i + "th message to partition " + partition);
}

Однако некоторые сообщения могут быть потеряны. Как показано ниже, сообщение с 4508 по 4999 не доставлено.

enter image description here

Я считаю, что причиной может быть завершение процесса производителя, а все сообщения в кеше не отправить в это время будет потеряно. Добавьте эту строку после того, как l oop решит эту проблему:

producer.flush();

Однако я не уверен, является ли это решением с очарованием, потому что я заметил, что кто-то упомянул, что грипп sh сделает асинхронную отправку каким-то образом Синхронный, кто-нибудь может объяснить или помочь мне улучшить это?

1 Ответ

0 голосов
/ 20 апреля 2020

В книге Kafka - The definitive Guide есть пример для asznchronous Producer, приведенный в точности как вы написали код. Он использует send вместе с Callback.

В обсуждении написано:

Добавление flush() перед выходом сделает клиента дождитесь доставки ожидающих сообщений брокеру (и это будет около queue.buffering.max.ms плюс время ожидания). Если вы добавляете flush() после каждого produce() вызова, вы эффективно реализуете syn c продюсер.

Но если вы сделаете это после for l oop это больше не синхронно, а скорее асинхронно.

То, что вы также можете сделать, это установить acks в конфигурации источника на all. Таким образом, у вас будет еще несколько гарантий успешного создания сообщений в случае, если для репликации topi c установлено значение больше 1.

...