Сбой сообщения Кафки по теме, неоднократно прослушиваемого слушателем весной Кафка - PullRequest
2 голосов
/ 06 ноября 2019

Я использую Spring Kafka с пружинной загрузкой 2.1.9 и confluentinc / cp-kafka: 5.3.0 (3 брокера сгруппированы) в качестве брокера.

иногда в прослушивателе, если какое-либо сообщение терпит неудачу из-за некоторого исключения, которое сообщение продолжает повторяться, слушатель будет снова и снова получать одно и то же сообщение и будет выдавать исключения.

А также из-зак этой проблеме другие сообщения для этой темы, не будут обработаны, новое сообщение само приостановлено, оно не прослушивается слушателем

Приведенные ниже журналы появляются после каждого раза. (это было почти в 30 раз с той же ошибкой для одного и того же сообщения)

Я попытался перезапустить Kafka, а также приложение весенней загрузки, но та же проблема.

Журналы:


2019-11-06 16:04:49.176  WARN [xxxxx-component-workflow-handler,47fb7bf746423fae,83d15cb4c9f92635,false] 10 --- [_response-4-C-1] o.s.k.core.DefaultKafkaProducerFactory   : Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@658b4494, txId=xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2]
2019-11-06 16:04:49.178 ERROR [xxxxx-component-workflow-handler,47fb7bf746423fae,83d15cb4c9f92635,false] 10 --- [_response-4-C-1] .s.i.ComponentWorkflowHandlerServiceImpl : failed to completeNormalFulfillmentItem 124

org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:127)
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:52)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:475)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$EnhancerBySpringCGLIB$$63013f17.createAuditForCompleteFulfilmentFail(<generated>)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeDefaultFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:1214)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeNormalFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:1312)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:994)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$FastClassBySpringCGLIB$$f9512a9d.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$EnhancerBySpringCGLIB$$63013f17.completeFulfillmentItem(<generated>)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener.handleFulfillmentComplete(WorkflowAsynHandlerListener.java:100)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$FastClassBySpringCGLIB$$f84eaee9.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$EnhancerBySpringCGLIB$$2ab8db7b.handleFulfillmentComplete(<generated>)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
        at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$FastClassBySpringCGLIB$$a98718f8.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.cloud.sleuth.instrument.messaging.MessageListenerMethodInterceptor.invoke(TraceMessagingAutoConfiguration.java:283)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$EnhancerBySpringCGLIB$$a252ca0f.onMessage(<generated>)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1308)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1291)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1252)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1700(KafkaMessageListenerContainer.java:387)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$4.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1177)
        at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1167)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1145)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:958)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:765)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:703)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:176)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.handleExistingTransaction(AbstractPlatformTransactionManager.java:430)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:354)
        at org.springframework.data.transaction.MultiTransactionStatus.registerTransactionManager(MultiTransactionStatus.java:69)
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:106)
        ... 67 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:758)
        at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:751)
        at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:216)
        at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.beginTransaction(DefaultKafkaProducerFactory.java:499)
        at brave.kafka.clients.TracingProducer.beginTransaction(TracingProducer.java:50)
        at org.springframework.kafka.core.ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactoryUtils.java:103)
        at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:160)
        ... 71 common frames omitted

2019-11-06 16:04:49.178 ERROR [xxxxx-component-workflow-handler,47fb7bf746423fae,83d15cb4c9f92635,false] 10 --- [_response-4-C-1] .s.i.ComponentWorkflowHandlerServiceImpl : failed to complete fulfillment item (completeFulfillmentItem) :124

com.xxxxx.model.exception.ProcessException: failed to completeNormalFulfillmentItem 124
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeNormalFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:1316)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:994)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$FastClassBySpringCGLIB$$f9512a9d.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$EnhancerBySpringCGLIB$$63013f17.completeFulfillmentItem(<generated>)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener.handleFulfillmentComplete(WorkflowAsynHandlerListener.java:100)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$FastClassBySpringCGLIB$$f84eaee9.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$EnhancerBySpringCGLIB$$2ab8db7b.handleFulfillmentComplete(<generated>)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
        at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$FastClassBySpringCGLIB$$a98718f8.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.cloud.sleuth.instrument.messaging.MessageListenerMethodInterceptor.invoke(TraceMessagingAutoConfiguration.java:283)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$EnhancerBySpringCGLIB$$a252ca0f.onMessage(<generated>)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1308)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1291)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1252)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1700(KafkaMessageListenerContainer.java:387)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$4.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1177)
        at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1167)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1145)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:958)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:765)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:703)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:127)
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:52)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:475)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289)
        at 

customer config

@Configuration
@EnableKafka
public class KafkaReceiverConfig {

    // Kafka Server Configuration
    @Value("${kafka.servers}")
    private String kafkaServers;

    // Group Identifier
    @Value("${kafka.groupId}")
    private String groupId;

    // Kafka Max Retry Attempts
    @Value("${kafka.retry.maxAttempts:5}")
    private Integer retryMaxAttempts;

    // Kafka Max Retry Interval
    @Value("${kafka.retry.interval:180000}")
    private Long retryInterval;

    // Kafka Concurrency
    @Value("${kafka.concurrency:10}")
    private Integer concurrency;

    // Kafka Concurrency
    @Value("${kafka.poll.timeout:100}")
    private Integer pollTimeout;

    // Kafka Consumer Offset
    @Value("${kafka.consumer.auto-offset-reset:earliest}")
    private String offset = "earliest";

    @Value("${kafka.max.records:100}")
    private Integer maxPollRecords;

    @Value("${kafka.max.poll.interval.time:500000}")
    private Integer maxPollIntervalMs;

    @Value("${kafka.max.session.timeout:200000}")
    private Integer sessionTimoutMs;

    // Logger
    private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);


    @Bean
    public RetryPolicy retryPolicy() {
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
        return simpleRetryPolicy;
    }

    @Bean
    public BackOffPolicy backOffPolicy() {
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(retryInterval);
        return backOffPolicy;
    }

    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy());
        retryTemplate.setBackOffPolicy(backOffPolicy());
        return retryTemplate;
    }


    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
            ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);

        factory.getContainerProperties().setSyncCommits(true);
        factory.setRetryTemplate(retryTemplate());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setTransactionManager(chainedTM);
        factory.setStatefulRetry(true);
        // NOTE: retryMaxAttempts should always +1 due to spring kafka bug
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            log.warn("failed to process kafka message (retries are exausted). topic name:" + record.topic() + " value:"
                    + record.value());
            messageProducer.saveFailedMessage(record, exception);
        }, retryMaxAttempts + 1);

        factory.setErrorHandler(errorHandler);
        log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
        return factory;
    }


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        log.debug("Kafka Receiver Config consumerFactory created");
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new ConcurrentHashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // Disable the Auto Commit if required for testing
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimoutMs);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        log.debug("Kafka Receiver Config consumerConfigs created");
        return props;
    }

}

listener

@KafkaListener(id = TOPIC_FULFILLMENT_CREATE, topics = TOPIC_FULFILLMENT_CREATE)
    @Transactional(readOnly = false)
    public void processCreateRequest(@Payload String message) throws IOException {
        ComponentWorkflowModel componentWorkflowModel = JsonUtil.toObject(message, ComponentWorkflowModel.class);
        componentWorkflowStarter.processCreateRequest(componentWorkflowModel);
    }
  1. Есть ли какое-либо решение, чтобы прекратить прослушивание сообщения об ошибке? Я использовал seekErrorHandler Работает только иногда, есть ли какие-либо проблемы с конфигурацией?

  2. Есть ли проблемы с Kafka с пружиной Kafka?

  3. Какрешить эту проблему?

1 Ответ

1 голос
/ 06 ноября 2019

Выглядит как неправильная конфигурация.

org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:127)
        at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:52)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:475)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
...
Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:176)

Вы пытаетесь начать новую транзакцию, когда она уже существует.

Вы ввели менеджер цепочек транзакций в контейнер слушателя, так что естьуже транзакция.

Похоже, у вас есть аннотация @Transactional, которая также ссылается на тот же менеджер транзакций, возможно, с распространением REQUIRES_NEW. Вы не можете использовать это с Kafka, если у вас нет другого transactional.id - но это, вероятно, не то, что вы хотите в любом случае.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...