Я получаю InterruptedException при отправке данных на kafka от нескольких производителей - PullRequest
0 голосов
/ 26 декабря 2018

Я пытаюсь запустить производителя kafka с помощью API kafka-clients.У меня есть несколько производителей, работающих с отдельными потоками, и каждый пытается записать данные в Кафку.Проблема в том, что я получаю прерванное исключение от kafka, когда увеличиваю количество потоков, работающих параллельно.Например, если я запускаю 20 потоков параллельно, он не выдает никаких исключений, но когда я запускаю его с 100 потоками параллельно, я получаю следующее исключение:

Исключение в потоке "pool-910-thread-1 "org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
в org.apache.kafka.clients.producer.KafkaProducer.close (KafkaProducer.java:1154)
в org.apache.kafka.clients.producer.KafkaProducer.close (KafkaProducer.java:1128)
в org.apache.kafka.clients.producer.KafkaProducer.close (KafkaProducer.java:1107)
в com.t4e.kafka.producer.IEC104KafkaReadMessageProcessor.runProducer (IEC104KafkaReadMessageProcessor.java:45)
в com.t4e.iec104.connection.Iec60870ReadListener.writeT..Iec60870ReadListener.newASdu (Iec60870ReadListener.java:75)
в org.openmuc.j60870.Connection $ ConnectionReader $ 1.run (Connection.java:143)
в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624)
в java.lang.Thread.run (Thread.java:748)
Вызвано: java.lang.InterruptedException
в java.lang.Object.wait (собственный метод)
в java.lang.Thread.join (Thread.java:1260)
в орг.apache.kafka.clients.producer.KafkaProducer.close (KafkaProducer.java:1152)

Вот мой код производителя:

private static final Logger logger = LoggerFactory.getLogger(IEC104KafkaReadMessageProcessor.class);
    static KafkaProducerConfigReader kafkaConfig = new KafkaProducerConfigReader();
    static String newLine = System.getProperty("line.separator");

    /**
     * @param message
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    public static synchronized RecordMetadata runProducer(String message) throws InterruptedException, ExecutionException {
        Producer<Long, String> producer = ProducerCreator.createProducer();
        ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(kafkaConfig.getTopicName(), message);
        try {
            RecordMetadata metadata = producer.send(record).get();
            logger.info(("Record sent with key " + " to partition " + metadata.partition() + " with offset "
                    + metadata.offset()));
            return metadata;
        } catch (ExecutionException e) {
            logger.error("ExecutionException : Error in sending record to kafka");
            throw new ExecutionException(e);
        } catch (InterruptedException e) {
            logger.error("InterruptedException : Error in sending record" + newLine);
            throw new InterruptedException();
        } finally {
            logger.info(" Closing Kafka producer ");
            producer.close();
        }
    }

1 Ответ

0 голосов
/ 26 марта 2019

Возможно, проблема не в этом фрагменте кода, где вы создаете продюсера.Просматривая журнал, которым вы поделились,

Исключение в потоке "pool-910-thread-1" org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException

Я вижу, что активны 910 пулов потоков.Создание такого количества пулов вместо создания потоков, содержащих один пул, вероятно, будет лучшей идеей.Возможно, вы захотите взглянуть на место, где вы создаете пулы потоков, и управлять им.

Я подозреваю, что утечка потоков в вашем коде вызовет это исключение прерывания.

...