Я отправляю сообщение от kafka -roduction в нескольких потоках, но произошла потеря сообщения - PullRequest
0 голосов
/ 23 мая 2018

Я использую kafka-продюсер и отправляю данные в тему 'test-topic', которая состоит из фактора репликации 3 и разделов 1 в kafka-cluster (состоящей из трех брокеров).

Я создал пять потоков.каждый поток отправил 10 000 сообщений (размер каждого сообщения 4000 байтов).

я ожидал, что последнее смещение 50 000, но на самом деле 44 993.

произошло около 5000 потерь сообщений.

Почему сообщениепотеря произойдет?Ниже моих кодов ... (KAFKA-VERSION 1.1.0)

KafkaMessageSender.class

public class KafkaMessageSender {
    private final static Logger logger = 
 LoggerFactory.getLogger(KafkaMessageSender.class);
    private Properties props;
    private KafkaProducer<String, String> producer;
    private String topic;
    private AtomicInteger count;

    public KafkaMessageSender(AtomicInteger count, String bootstrapUrls, String topic) {
        logger.info("KafkaMessageSender initializing...");
        this.topic = topic;
        this.count = count;
        props = new Properties();
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //16384
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
        logger.info("KafkaMessageSender initializing end");     

    }

    public void sendMessages() {
        producer.send(new ProducerRecord<String, String>(topic, Messages.MSG_4K)); //Messages.MSG_4K indicates 4000bytes message
        count.getAndIncrement();
        logger.info("count : "+count.get());
    }
}

KafkaMessageSenderMain.class

public class KafkaMessageSenderMain {

    private final static Logger logger = LoggerFactory.getLogger(KafkaMessageSenderMain.class);

    final static String bootstrap_url = "ism1.solulink.co.kr:9092,ism2.solulink.co.kr:9092,ism3.solulink.co.kr:9092";
    final static String topic = "test-topic"; //topic name
    final static AtomicInteger count = new AtomicInteger(0);
    final static int MAX_LOOP = 10000; //message sending count
    final static int MAX_THREAD = 5;  //created number of threads

    public static void main(String[] args) {        
        long startTime = System.currentTimeMillis();
        ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD);
        for(int i = 0; i < MAX_THREAD; i++) {
            executorService.execute(() ->{
                    KafkaMessageSender sender = new KafkaMessageSender(count, bootstrap_url, topic);
                    for(int j = 0; j < MAX_LOOP; j++) {
                        sender.sendMessages(); //send message
                    }
            });
        }

        executorService.shutdown();
        try {
            boolean flag = executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); 
            long endTime = System.currentTimeMillis();
            long procTime = (endTime - startTime);
            logger.info("all Threads is shutdown? : "+flag);
            logger.info("processTime : " + ((double)procTime/(double)1000L)+"sec");
        } catch (InterruptedException e) {
            logger.error("awaitTermination exception",e);
        }
    }
}

Результат

Результат изображения

1 Ответ

0 голосов
/ 23 мая 2018

Можете ли вы изменить и запустить свой код, как показано ниже, чтобы увидеть, что это за ошибка?

producer.send(new ProducerRecord<String, String>(topic, Messages.MSG_4K), new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception e) {
            if (e != null)
                e.printStackTrace();
        }
});
...