Столкновение с проблемой `` Неожиданная ошибка в AddOffsetsToTxnResponse '' в Spring kafka - PullRequest
0 голосов
/ 09 июля 2020

Я использую Spring Boot 2.2.7.RELEASE и Spring Kafka 2.3.8 (а также менеджер связанных транзакций Kafka)

confluent kafka является брокером.

У меня возникли некоторые проблемы, пока Отправка сообщения Кафке. вот логи

2020-07-09 08:28:13.139 ERROR [xxxxxx-component-workflow-handler,,,] 9 --- [_response-4-C-1] essageListenerContainer$ListenerConsumer : Send offsets to transaction failed

org.apache.kafka.common.KafkaException: Unexpected error in AddOffsetsToTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id.
        at org.apache.kafka.clients.producer.internals.TransactionManager$AddOffsetsToTxnHandler.handleResponse(TransactionManager.java:1406)
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1069)
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
        at java.base/java.lang.Thread.run(Thread.java:832)

2020-07-09 08:28:13.153 ERROR [xxxxxx-component-workflow-handler,,,] 9 --- [_response-4-C-1] o.s.k.core.DefaultKafkaProducerFactory   : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@3d0f1d94, txId=xxxxxx-dagusa-Process-Handler-et3YEAYB1R1F6h-complete_fulfillment_item_response.complete_fulfillment_item_response.4]

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
        at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:924)
        at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:296)
        at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1008)
        at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:295)
        at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:704)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java:691)
        at brave.kafka.clients.TracingProducer.commitTransaction(TracingProducer.java:72)
        at org.springframework.kafka.core.KafkaResourceHolder.commit(KafkaResourceHolder.java:58)
        at org.springframework.kafka.transaction.KafkaTransactionManager.doCommit(KafkaTransactionManager.java:200)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
        at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
        at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:150)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1569)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1546)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1288)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1035)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:949)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.kafka.common.KafkaException: Unexpected error in AddOffsetsToTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id.
        at org.apache.kafka.clients.producer.internals.TransactionManager$AddOffsetsToTxnHandler.handleResponse(TransactionManager.java:1406)
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1069)
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
        ... 1 common frames omitted

2020-07-09 08:28:13.157  WARN [xxxxxx-component-workflow-handler,,,] 9 --- [_response-4-C-1] o.s.k.core.DefaultKafkaProducerFactory   : Error during some operation; producer removed from cache: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@3d0f1d94, txId=xxxxxx-dagusa-Process-Handler-et3YEAYB1R1F6h-complete_fulfillment_item_response.complete_fulfillment_item_response.4]
2020-07-09 08:28:13.167 ERROR [xxxxxx-component-workflow-handler,,,] 9 --- [_response-4-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back

org.springframework.transaction.HeuristicCompletionException: Heuristic completion: outcome state is mixed; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
        at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:177)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1569)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1546)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1288)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1035)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:949)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
        at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:924)
        at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:296)
        at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1008)
        at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:295)
        at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:704)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java:691)
        at brave.kafka.clients.TracingProducer.commitTransaction(TracingProducer.java:72)
        at org.springframework.kafka.core.KafkaResourceHolder.commit(KafkaResourceHolder.java:58)
        at org.springframework.kafka.transaction.KafkaTransactionManager.doCommit(KafkaTransactionManager.java:200)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
        at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
        at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:150)
        ... 9 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Unexpected error in AddOffsetsToTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id.
        at org.apache.kafka.clients.producer.internals.TransactionManager$AddOffsetsToTxnHandler.handleResponse(TransactionManager.java:1406)
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1069)
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
        ... 1 common frames omitted

Вот моя конфигурация:

конфигурация отправителя kafka:

@Configuration
@EnableKafka
public class KafkaSenderConfig{

    @Value("${kafka.servers}")
    private String kafkaServers;
    
    @Value("${application.name}")
    private String applicationName;
    
    private static final Logger log = LoggerFactory.getLogger(KafkaSenderConfig.class);
    
    
    @Bean(value = "stringKafkaTransactionManager")
    public KafkaTransactionManager<String, String> kafkaStringTransactionManager() {
        KafkaTransactionManager<String, String> ktm = new KafkaTransactionManager<String, String>(stringProducerFactory());
        ktm.setNestedTransactionAllowed(true);
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
        return ktm;
    }
    
    @Bean(value = "stringProducerFactory")
    @Primary
    public ProducerFactory<String, String> stringProducerFactory() {
        log.debug("Kafka Servers: " + kafkaServers);
        Map<String, Object> config = getConfigs();
        //config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(config);
        String randomString=applicationName.replaceAll("\\s+","-").concat("-").concat(StringUtil.getRandomString(14)).concat("-");
        defaultKafkaProducerFactory.setTransactionIdPrefix(randomString);
         return defaultKafkaProducerFactory;
    }
    
    /**
     * Create a new Kafka Template for String based Messages
     * 
     * @return
     */
    @Bean(value = "stringKafkaTemplate")
    @Primary
    public KafkaTemplate<String, String> stringKafkaTemplate() {
        log.debug("Creating the Kafka Template for String Producer Factory");
        return new KafkaTemplate<>(stringProducerFactory(),true);
    }
  
    @Bean(name = "chainedStringKafkaTransactionManager")
    @Primary
    public ChainedKafkaTransactionManager<String, String> chainedTransactionManager(JpaTransactionManager jpaTransactionManager, DataSourceTransactionManager dsTransactionManager) {
        return new ChainedKafkaTransactionManager<>(kafkaStringTransactionManager(), jpaTransactionManager, dsTransactionManager);
    } 
    
    private Map<String, Object> getConfigs() {
        Map<String, Object> config = new ConcurrentHashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);   
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //Try to send msgs out in 100ms even if the batch size is not met
        config.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        return config;
    }
}

конфигурация приемника kafka:

@Configuration
@EnableKafka
public class KafkaReceiverConfig {

    // Kafka Server Configuration
    @Value("${kafka.servers}")
    private String kafkaServers;

    // Group Identifier
    @Value("${kafka.groupId}")
    private String groupId;

    // Kafka Max Retry Attempts
    @Value("${kafka.retry.maxAttempts:3}")
    private Integer retryMaxAttempts;

    // Kafka Max Retry Interval
    @Value("${kafka.retry.interval:30000}")
    private Long retryInterval;

    // Kafka Concurrency
    @Value("${kafka.concurrency:10}")
    private Integer concurrency;

    // Kafka Concurrency
    @Value("${kafka.poll.timeout:300}")
    private Integer pollTimeout;

    // Kafka Consumer Offset
    @Value("${kafka.consumer.auto-offset-reset:earliest}")
    private String offset = "earliest";

    @Value("${kafka.max.records:100}")
    private Integer maxPollRecords;

    @Value("${kafka.max.poll.interval.time:500000}")
    private Integer maxPollIntervalMs;

    @Value("${kafka.max.session.timeout:60000}")
    private Integer sessionTimoutMs;

    // String Kafka Template to send Messages
    @Autowired
    @Qualifier("stringKafkaTemplate")
    private KafkaTemplate<String, String> stringKafkaTemplate;

    // Logger
    private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
            ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        factory.getContainerProperties().setAckMode(AckMode.RECORD);
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setTransactionManager(chainedTM);
        DefaultAfterRollbackProcessor<String, String> afterRollbackProcessor = new DefaultAfterRollbackProcessor<>(
                (record, exception) -> {
                    log.warn("failed to process kafka message (retries are exausted). topic name:" + record.topic()
                            + " value:" + record.value());
                    messageProducer.saveFailedMessage(record, exception);
                }, new FixedBackOff(retryInterval, retryMaxAttempts));

        afterRollbackProcessor.setCommitRecovered(true);
        afterRollbackProcessor.setKafkaTemplate(stringKafkaTemplate);
        factory.setAfterRollbackProcessor(afterRollbackProcessor);
        log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
        return factory;
    }


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        log.debug("Kafka Receiver Config consumerFactory created");
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new ConcurrentHashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimoutMs);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        log.debug("Kafka Receiver Config consumerConfigs created");
        return props;
    }

}

вот мой код для отправки сообщения

@Transactional(readOnly = false)
public void initiateOrderUpdate(String jsonString){

     // some logic here

    stringKafkaTemplate.send("some_tpic", jsonString);

// some logic here 
}

, раньше я использовал spring boot 2.1.9 и spring Kafka 2.2.9, все работает нормально, но после обновления до выше, я столкнулся с этой проблемой.

Есть ли проблемы с конфигурацией?

...