Попробуйте выполнить инструкцию на inte rnet, чтобы добиться кафки асинхронного продукта. Вот как выглядит мой продюсер:
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public void asynSend(String topic, Integer partition, String message) {
ProducerRecord<Object, Object> data = new ProducerRecord<>(topic, partition,null, message);
producer.send(data, new DefaultProducerCallback());
}
private static class DefaultProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Asynchronous produce failed");
}
}
}
И я выдаю в течение l oop примерно так:
for (int i = 0; i < 5000; i++) {
int partition = i % 2;
FsProducerFactory.getInstance().asynSend(topic, partition,i + "th message to partition " + partition);
}
Однако некоторые сообщения могут быть потеряны. Как показано ниже, сообщение с 4508 по 4999 не доставлено.
Я считаю, что причиной может быть завершение процесса производителя, а все сообщения в кеше не отправить в это время будет потеряно. Добавьте эту строку после того, как l oop решит эту проблему:
producer.flush();
Однако я не уверен, является ли это решением с очарованием, потому что я заметил, что кто-то упомянул, что грипп sh сделает асинхронную отправку каким-то образом Синхронный, кто-нибудь может объяснить или помочь мне улучшить это?