Я использую Spring Boot 2.2.7.RELEASE и Spring Kafka 2.3.8 (а также менеджер связанных транзакций Kafka)
confluent kafka является брокером.
У меня возникли некоторые проблемы, пока Отправка сообщения Кафке. вот логи
2020-07-09 08:28:13.139 ERROR [xxxxxx-component-workflow-handler,,,] 9 --- [_response-4-C-1] essageListenerContainer$ListenerConsumer : Send offsets to transaction failed
org.apache.kafka.common.KafkaException: Unexpected error in AddOffsetsToTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id.
at org.apache.kafka.clients.producer.internals.TransactionManager$AddOffsetsToTxnHandler.handleResponse(TransactionManager.java:1406)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1069)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
at java.base/java.lang.Thread.run(Thread.java:832)
2020-07-09 08:28:13.153 ERROR [xxxxxx-component-workflow-handler,,,] 9 --- [_response-4-C-1] o.s.k.core.DefaultKafkaProducerFactory : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@3d0f1d94, txId=xxxxxx-dagusa-Process-Handler-et3YEAYB1R1F6h-complete_fulfillment_item_response.complete_fulfillment_item_response.4]
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:924)
at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:296)
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1008)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:295)
at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:704)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java:691)
at brave.kafka.clients.TracingProducer.commitTransaction(TracingProducer.java:72)
at org.springframework.kafka.core.KafkaResourceHolder.commit(KafkaResourceHolder.java:58)
at org.springframework.kafka.transaction.KafkaTransactionManager.doCommit(KafkaTransactionManager.java:200)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:150)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1569)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1546)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1288)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1035)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:949)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.kafka.common.KafkaException: Unexpected error in AddOffsetsToTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id.
at org.apache.kafka.clients.producer.internals.TransactionManager$AddOffsetsToTxnHandler.handleResponse(TransactionManager.java:1406)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1069)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
... 1 common frames omitted
2020-07-09 08:28:13.157 WARN [xxxxxx-component-workflow-handler,,,] 9 --- [_response-4-C-1] o.s.k.core.DefaultKafkaProducerFactory : Error during some operation; producer removed from cache: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@3d0f1d94, txId=xxxxxx-dagusa-Process-Handler-et3YEAYB1R1F6h-complete_fulfillment_item_response.complete_fulfillment_item_response.4]
2020-07-09 08:28:13.167 ERROR [xxxxxx-component-workflow-handler,,,] 9 --- [_response-4-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back
org.springframework.transaction.HeuristicCompletionException: Heuristic completion: outcome state is mixed; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:177)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1569)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1546)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1288)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1035)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:949)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:924)
at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:296)
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1008)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:295)
at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:704)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java:691)
at brave.kafka.clients.TracingProducer.commitTransaction(TracingProducer.java:72)
at org.springframework.kafka.core.KafkaResourceHolder.commit(KafkaResourceHolder.java:58)
at org.springframework.kafka.transaction.KafkaTransactionManager.doCommit(KafkaTransactionManager.java:200)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:150)
... 9 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Unexpected error in AddOffsetsToTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id.
at org.apache.kafka.clients.producer.internals.TransactionManager$AddOffsetsToTxnHandler.handleResponse(TransactionManager.java:1406)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1069)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
... 1 common frames omitted
Вот моя конфигурация:
конфигурация отправителя kafka:
@Configuration
@EnableKafka
public class KafkaSenderConfig{
@Value("${kafka.servers}")
private String kafkaServers;
@Value("${application.name}")
private String applicationName;
private static final Logger log = LoggerFactory.getLogger(KafkaSenderConfig.class);
@Bean(value = "stringKafkaTransactionManager")
public KafkaTransactionManager<String, String> kafkaStringTransactionManager() {
KafkaTransactionManager<String, String> ktm = new KafkaTransactionManager<String, String>(stringProducerFactory());
ktm.setNestedTransactionAllowed(true);
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
return ktm;
}
@Bean(value = "stringProducerFactory")
@Primary
public ProducerFactory<String, String> stringProducerFactory() {
log.debug("Kafka Servers: " + kafkaServers);
Map<String, Object> config = getConfigs();
//config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(config);
String randomString=applicationName.replaceAll("\\s+","-").concat("-").concat(StringUtil.getRandomString(14)).concat("-");
defaultKafkaProducerFactory.setTransactionIdPrefix(randomString);
return defaultKafkaProducerFactory;
}
/**
* Create a new Kafka Template for String based Messages
*
* @return
*/
@Bean(value = "stringKafkaTemplate")
@Primary
public KafkaTemplate<String, String> stringKafkaTemplate() {
log.debug("Creating the Kafka Template for String Producer Factory");
return new KafkaTemplate<>(stringProducerFactory(),true);
}
@Bean(name = "chainedStringKafkaTransactionManager")
@Primary
public ChainedKafkaTransactionManager<String, String> chainedTransactionManager(JpaTransactionManager jpaTransactionManager, DataSourceTransactionManager dsTransactionManager) {
return new ChainedKafkaTransactionManager<>(kafkaStringTransactionManager(), jpaTransactionManager, dsTransactionManager);
}
private Map<String, Object> getConfigs() {
Map<String, Object> config = new ConcurrentHashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//Try to send msgs out in 100ms even if the batch size is not met
config.put(ProducerConfig.LINGER_MS_CONFIG, 100);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.ACKS_CONFIG, "all");
return config;
}
}
конфигурация приемника kafka:
@Configuration
@EnableKafka
public class KafkaReceiverConfig {
// Kafka Server Configuration
@Value("${kafka.servers}")
private String kafkaServers;
// Group Identifier
@Value("${kafka.groupId}")
private String groupId;
// Kafka Max Retry Attempts
@Value("${kafka.retry.maxAttempts:3}")
private Integer retryMaxAttempts;
// Kafka Max Retry Interval
@Value("${kafka.retry.interval:30000}")
private Long retryInterval;
// Kafka Concurrency
@Value("${kafka.concurrency:10}")
private Integer concurrency;
// Kafka Concurrency
@Value("${kafka.poll.timeout:300}")
private Integer pollTimeout;
// Kafka Consumer Offset
@Value("${kafka.consumer.auto-offset-reset:earliest}")
private String offset = "earliest";
@Value("${kafka.max.records:100}")
private Integer maxPollRecords;
@Value("${kafka.max.poll.interval.time:500000}")
private Integer maxPollIntervalMs;
@Value("${kafka.max.session.timeout:60000}")
private Integer sessionTimoutMs;
// String Kafka Template to send Messages
@Autowired
@Qualifier("stringKafkaTemplate")
private KafkaTemplate<String, String> stringKafkaTemplate;
// Logger
private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.getContainerProperties().setSyncCommits(true);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(chainedTM);
DefaultAfterRollbackProcessor<String, String> afterRollbackProcessor = new DefaultAfterRollbackProcessor<>(
(record, exception) -> {
log.warn("failed to process kafka message (retries are exausted). topic name:" + record.topic()
+ " value:" + record.value());
messageProducer.saveFailedMessage(record, exception);
}, new FixedBackOff(retryInterval, retryMaxAttempts));
afterRollbackProcessor.setCommitRecovered(true);
afterRollbackProcessor.setKafkaTemplate(stringKafkaTemplate);
factory.setAfterRollbackProcessor(afterRollbackProcessor);
log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
log.debug("Kafka Receiver Config consumerFactory created");
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new ConcurrentHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimoutMs);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
log.debug("Kafka Receiver Config consumerConfigs created");
return props;
}
}
вот мой код для отправки сообщения
@Transactional(readOnly = false)
public void initiateOrderUpdate(String jsonString){
// some logic here
stringKafkaTemplate.send("some_tpic", jsonString);
// some logic here
}
, раньше я использовал spring boot 2.1.9 и spring Kafka 2.2.9, все работает нормально, но после обновления до выше, я столкнулся с этой проблемой.
Есть ли проблемы с конфигурацией?