KAFKA - Какова причина для получения ProducerFencedException во время02.ndnd - PullRequest
0 голосов
/ 30 октября 2018

Попытка загрузить около 50K сообщений в тему KAFKA. В начале нескольких запусков становится ниже исключения, но не все время.

org.apache.kafka.common.KafkaException: Невозможно выполнить транзакционный метод, поскольку мы находимся в состоянии ошибки
в org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError (TransactionManager.java:784) ~ [kafka-clients-2.0.0.jar:?]
в org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort (TransactionManager.java:229) ~ [kafka-clients-2.0.0.jar:?]
в org.apache.kafka.clients.producer.KafkaProducer.abortTransaction (KafkaProducer.java:679) ~ [kafka-clients-2.0.0.jar:?]
at myPackage.persistUpdatesPostAction (MyCode.java: ??) ~ [aKafka.jar:?]
...
Вызвано: org.apache.kafka.common.errors.ProducerFencedException: производитель попытался выполнить операцию со старой эпохой. Либо существует более новый производитель с тем же транзакционным идентификатором, либо транзакция производителя истекла у брокера.

Code Block is below:  
--------------------  
public void persistUpdatesPostAction(List<Message> messageList )
{
    if ((messageList == null) || (messageList.isEmpty())) 
    {
        return;
    }
    logger.createDebug("Messages in batch(postAction) : "+ messageList.size());
    Producer<String,String> producer = KafkaUtils.getProducer(Thread.currentThread().getName());
    try
    {
        producer.beginTransaction();
        createKafkaBulkInsert1(producer, messageList, "Topic1");
        createKafkaBulkInsert2(producer, messageList, "Topic2");
        createKafkaBulkInsert3(producer, messageList, "Topic3");
        producer.commitTransaction();
    }
    catch (Exception e) {
        producer.abortTransaction();
        producer.close();
        KafkaUtils.removeProducer(Thread.currentThread().getName());
    }
}

-----------

static Properties setPropertiesProducer()
{
    Properties temp = new Properties();
    temp.put("bootstrap.servers", "localhost:9092");
    temp.put("acks", "all");
    temp.put("retries", 1);
    temp.put("batch.size", 16384);
    temp.put("linger.ms", 5);
    temp.put("buffer.memory", 33554432);
    temp.put("key.serializer",   "org.apache.kafka.common.serialization.StringSerializer");
    temp.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return temp;
}

public static Producer<String, String> getProducer(String aThreadId)
{
    if ((producerMap.size() == 0) || (producerMap.get(aThreadId) == null))
    {
        Properties temp = producerProps;
        temp.put("transactional.id", aThreadId);
        Producer<String, String> producer = new KafkaProducer<String, String>(temp);
        producerMap.put(aThreadId, producer);
        producer.initTransactions();
        return producer;
    }
    return producerMap.get(aThreadId);
}

public static void removeProducer(String aThreadId)
{
    logger.createDebug("Removing Thread ID :" + aThreadId);
    if (producerMap.get(aThreadId) == null)
        return;
    producerMap.remove(aThreadId);
}
---------------

1 Ответ

0 голосов
/ 29 ноября 2018

В коде инициализации моего продюсера было состояние гонки. Я исправил, изменив карту источника на тип ConcurrentHashMap, чтобы обеспечить безопасность потока.

...