Попытка загрузить около 50K сообщений в тему KAFKA. В начале нескольких запусков становится ниже исключения, но не все время.
org.apache.kafka.common.KafkaException: Невозможно выполнить транзакционный метод, поскольку мы находимся в состоянии ошибки
в org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError (TransactionManager.java:784) ~ [kafka-clients-2.0.0.jar:?]
в org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort (TransactionManager.java:229) ~ [kafka-clients-2.0.0.jar:?]
в org.apache.kafka.clients.producer.KafkaProducer.abortTransaction (KafkaProducer.java:679) ~ [kafka-clients-2.0.0.jar:?]
at myPackage.persistUpdatesPostAction (MyCode.java: ??) ~ [aKafka.jar:?]
...
Вызвано: org.apache.kafka.common.errors.ProducerFencedException: производитель попытался выполнить операцию со старой эпохой. Либо существует более новый производитель с тем же транзакционным идентификатором, либо транзакция производителя истекла у брокера.
Code Block is below:
--------------------
public void persistUpdatesPostAction(List<Message> messageList )
{
if ((messageList == null) || (messageList.isEmpty()))
{
return;
}
logger.createDebug("Messages in batch(postAction) : "+ messageList.size());
Producer<String,String> producer = KafkaUtils.getProducer(Thread.currentThread().getName());
try
{
producer.beginTransaction();
createKafkaBulkInsert1(producer, messageList, "Topic1");
createKafkaBulkInsert2(producer, messageList, "Topic2");
createKafkaBulkInsert3(producer, messageList, "Topic3");
producer.commitTransaction();
}
catch (Exception e) {
producer.abortTransaction();
producer.close();
KafkaUtils.removeProducer(Thread.currentThread().getName());
}
}
-----------
static Properties setPropertiesProducer()
{
Properties temp = new Properties();
temp.put("bootstrap.servers", "localhost:9092");
temp.put("acks", "all");
temp.put("retries", 1);
temp.put("batch.size", 16384);
temp.put("linger.ms", 5);
temp.put("buffer.memory", 33554432);
temp.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
temp.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return temp;
}
public static Producer<String, String> getProducer(String aThreadId)
{
if ((producerMap.size() == 0) || (producerMap.get(aThreadId) == null))
{
Properties temp = producerProps;
temp.put("transactional.id", aThreadId);
Producer<String, String> producer = new KafkaProducer<String, String>(temp);
producerMap.put(aThreadId, producer);
producer.initTransactions();
return producer;
}
return producerMap.get(aThreadId);
}
public static void removeProducer(String aThreadId)
{
logger.createDebug("Removing Thread ID :" + aThreadId);
if (producerMap.get(aThreadId) == null)
return;
producerMap.remove(aThreadId);
}
---------------