Spring @Transactional проблема между вызовами Kafka и DB - PullRequest
0 голосов
/ 31 октября 2019

Я пытаюсь проверить состояние транзакции между Kafka и вызовом БД (Postgres), когда в случае сбоя вызова БД Kafka не должен отправлять это конкретное сообщение в тему Kafka.

Я пытался добиться этого с помощью аннотации @Transactional в приложении Springboot.

Но Кафка отправляет сообщение в тему, даже если вызов БД не удался. Из журналов консоли я мог видеть поток ниже:

1) Состояние перехода Кафки изменено с INITIALIZING на READY, READY на IN_TRANSACTION на ABORTING_TRANSACTION, Отправлено произвести запрос на раздел

2) Ошибка вызова БД (ошибка SQL: 0, SQLState: 22001)

3) Переход Кафки из состояния IN_TRANSACTION в ABORTING_TRANSACTION

Поэтому я верю, когда мы даем @Транзакционный, Кафка должен отправлять сообщение только тогда, когда состояние Кафки перемещается из IN_TRANSACTION в COMMITTING_TRANSACTION.

@Service
public class TestService {
    @Autowired
    private TestDataService dataService;
    @Autowired
    private KafkaTemplate<Integer, Test> kafkaTemplate;

    @Transactional
    public Test send(Test entity) {
        kafkaTemplate.sendDefault(entity);
        dataService.save(entity);
        return entity;
    }
}

Журнал консоли:

2019-10-30 09:21:52.165 TRACE 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Starting the Kafka producer
2019-10-30 09:21:52.209  INFO 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Instantiated a transactional producer.
2019-10-30 09:21:52.209  INFO 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
2019-10-30 09:21:52.209  INFO 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Overriding the default max.in.flight.requests.per.connection to 1 since idempontence is enabled.
2019-10-30 09:21:52.209  INFO 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Overriding the default acks to all since idempotence is enabled.
2019-10-30 09:21:52.281 DEBUG 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : Starting Kafka producer I/O thread.
2019-10-30 09:21:52.286  INFO 8560 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.0
2019-10-30 09:21:52.288  INFO 8560 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : cb8625948210849f
2019-10-30 09:21:52.290 DEBUG 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Kafka producer started
2019-10-30 09:21:52.292 DEBUG 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Transition from state UNINITIALIZED to INITIALIZING
2019-10-30 09:21:52.292  INFO 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] ProducerId set to -1 with epoch -1
2019-10-30 09:21:52.296 DEBUG 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=my.transaction.id.0, transactionTimeoutMs=60000)
2019-10-30 09:21:52.297 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Request (type=InitProducerIdRequest, transactionalId=my.transaction.id.0, transactionTimeoutMs=60000) dequeued for sending
2019-10-30 09:21:52.298 DEBUG 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=my.transaction.id.0, coordinatorType=TRANSACTION)
2019-10-30 09:21:52.299 DEBUG 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=my.transaction.id.0, transactionTimeoutMs=60000)
2019-10-30 09:21:52.399 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Request (type=FindCoordinatorRequest, coordinatorKey=my.transaction.id.0, coordinatorType=TRANSACTION) dequeued for sending
2019-10-30 09:21:52.553 DEBUG 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [TransactionalId my.transaction.id.0] Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=my.transaction.id.0, coordinatorType=TRANSACTION) to node localhost:9092 (id: -1 rack: null)
2019-10-30 09:21:52.561 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Received transactional response FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=LIN70000505.corp.capgemini.com:9092 (id: 0 rack: null)) for request (type=FindCoordinatorRequest, coordinatorKey=my.transaction.id.0, coordinatorType=TRANSACTION)
2019-10-30 09:21:52.561 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Request (type=InitProducerIdRequest, transactionalId=my.transaction.id.0, transactionTimeoutMs=60000) dequeued for sending
2019-10-30 09:21:52.667 DEBUG 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [TransactionalId my.transaction.id.0] Sending transactional request (type=InitProducerIdRequest, transactionalId=my.transaction.id.0, transactionTimeoutMs=60000) to node LIN70000505.corp.capgemini.com:9092 (id: 0 rack: null)
2019-10-30 09:21:52.672 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Received transactional response InitProducerIdResponse(error=NONE, producerId=3000, producerEpoch=13, throttleTimeMs=0) for request (type=InitProducerIdRequest, transactionalId=my.transaction.id.0, transactionTimeoutMs=60000)
2019-10-30 09:21:52.673  INFO 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] ProducerId set to 3000 with epoch 13
2019-10-30 09:21:52.673 DEBUG 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Transition from state INITIALIZING to READY
2019-10-30 09:21:52.673 DEBUG 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Transition from state READY to IN_TRANSACTION
2019-10-30 09:21:52.674 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.kafka.core.KafkaResourceHolder@758f6aa7] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@126f8f24] to thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.677 TRACE 8560 --- [nio-8080-exec-1] o.s.kafka.core.KafkaTemplate             : Sending: ProducerRecord(topic=topic5, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=com.example.demo.entities.Test@347a5bd2, timestamp=null)
2019-10-30 09:21:52.686 TRACE 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Requesting metadata update for topic topic5.
2019-10-30 09:21:52.710 TRACE 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Sending record ProducerRecord(topic=topic5, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=com.example.demo.entities.Test@347a5bd2, timestamp=null) with callback org.springframework.kafka.core.KafkaTemplate$1@3028a6bd to topic topic5 partition 0
2019-10-30 09:21:52.710 DEBUG 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Begin adding new partition topic5-0 to transaction
2019-10-30 09:21:52.710 TRACE 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.RecordAccumulator    : Allocating a new 16384 byte message buffer for topic topic5 partition 0
2019-10-30 09:21:52.720 TRACE 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Waking up the sender since topic topic5 partition 0 is either full or getting a new batch
2019-10-30 09:21:52.720 TRACE 8560 --- [nio-8080-exec-1] o.s.kafka.core.KafkaTemplate             : Sent: ProducerRecord(topic=topic5, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=com.example.demo.entities.Test@347a5bd2, timestamp=null)
After Kafka call
2019-10-30 09:21:52.723 DEBUG 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, partitions=[topic5-0])
2019-10-30 09:21:52.723 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Request (type=AddPartitionsToTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, partitions=[topic5-0]) dequeued for sending
2019-10-30 09:21:52.724 DEBUG 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [TransactionalId my.transaction.id.0] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, partitions=[topic5-0]) to node LIN70000505.corp.capgemini.com:9092 (id: 0 rack: null)
2019-10-30 09:21:52.728 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$DefaultCrudMethodMetadata@4a455630] for key [public abstract java.lang.Object org.springframework.data.repository.CrudRepository.save(java.lang.Object)] to thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.730 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Received transactional response AddPartitionsToTxnResponse(errors={topic5-0=NONE}, throttleTimeMs=0) for request (type=AddPartitionsToTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, partitions=[topic5-0])
2019-10-30 09:21:52.731 DEBUG 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Successfully added partitions [topic5-0] to transaction
2019-10-30 09:21:52.731 DEBUG 8560 --- [ad | producer-1] o.a.k.c.p.internals.RecordAccumulator    : Assigning sequence number 0 from producer (producerId=3000, epoch=13) to dequeued batch from partition topic5-0 bound for LIN70000505.corp.capgemini.com:9092 (id: 0 rack: null).
2019-10-30 09:21:52.732 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.orm.jpa.EntityManagerHolder@169784b8] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@e7b265e] bound to thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.732 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jdbc.datasource.ConnectionHolder@7903817d] for key [HikariDataSource (testdb)] bound to thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.732 TRACE 8560 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor           : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2019-10-30 09:21:52.736 TRACE 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : Nodes with data ready to send: [LIN70000505.corp.capgemini.com:9092 (id: 0 rack: null)]
2019-10-30 09:21:52.740 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.orm.jpa.EntityManagerHolder@169784b8] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@e7b265e] bound to thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.741 TRACE 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : Sent produce request to 0: (type=ProduceRequest, magic=2, acks=-1, timeout=30000, partitionRecords=({topic5-0=[(record=DefaultRecord(offset=0, timestamp=1572427312710, key=0 bytes, value=38 bytes))]}), transactionalId='my.transaction.id.0'
2019-10-30 09:21:52.752 TRACE 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : Received produce response from node 0 with correlation id 6
2019-10-30 09:21:52.752 DEBUG 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : Incremented sequence number for topic-partition topic5-0 to 1
2019-10-30 09:21:52.752 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.ProducerBatch        : Successfully produced messages to topic5-0 with base offset 48.
2019-10-30 09:21:52.801 TRACE 8560 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2019-10-30 09:21:52.801 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$DefaultCrudMethodMetadata@4a455630] for key [public abstract java.lang.Object org.springframework.data.repository.CrudRepository.save(java.lang.Object)] from thread [http-nio-8080-exec-1]
After DB call
2019-10-30 09:21:52.801 TRACE 8560 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [com.example.demo.service.TestService.send]
2019-10-30 09:21:52.802 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.kafka.core.KafkaResourceHolder@758f6aa7] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@126f8f24] from thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.830  WARN 8560 --- [nio-8080-exec-1] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 0, SQLState: 22001
2019-10-30 09:21:52.830 ERROR 8560 --- [nio-8080-exec-1] o.h.engine.jdbc.spi.SqlExceptionHelper   : ERROR: value too long for type character varying(10)
2019-10-30 09:21:52.831  INFO 8560 --- [nio-8080-exec-1] o.h.e.j.b.internal.AbstractBatchImpl     : HHH000010: On release of batch it still contained JDBC statements
2019-10-30 09:21:52.833 ERROR 8560 --- [nio-8080-exec-1] o.h.i.ExceptionMapperStandardImpl        : HHH000346: Error during managed flush [org.hibernate.exception.DataException: could not execute statement]
2019-10-30 09:21:52.834 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
2019-10-30 09:21:52.835 DEBUG 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Transition from state IN_TRANSACTION to ABORTING_TRANSACTION
2019-10-30 09:21:52.836 DEBUG 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Enqueuing transactional request (type=EndTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, result=ABORT)
2019-10-30 09:21:52.836 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Request (type=EndTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, result=ABORT) dequeued for sending
2019-10-30 09:21:52.836 DEBUG 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [TransactionalId my.transaction.id.0] Sending transactional request (type=EndTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, result=ABORT) to node LIN70000505.corp.capgemini.com:9092 (id: 0 rack: null)
2019-10-30 09:21:52.839 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Received transactional response EndTxnResponse(error=NONE, throttleTimeMs=0) for request (type=EndTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, result=ABORT)
2019-10-30 09:21:52.839 DEBUG 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId my.transaction.id.0] Transition from state ABORTING_TRANSACTION to READY
2019-10-30 09:21:52.839 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.jdbc.datasource.ConnectionHolder@7903817d] for key [HikariDataSource (testdb)] from thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.844 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.orm.jpa.EntityManagerHolder@169784b8] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@e7b265e] from thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.854 ERROR 8560 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.dao.DataIntegrityViolationException: could not execute statement; SQL [n/a]; nested exception is org.hibernate.exception.DataException: could not execute statement] with root cause

org.postgresql.util.PSQLException: ERROR: value too long for type character varying(10)

1 Ответ

0 голосов
/ 01 ноября 2019

Кафка не поддерживает транзакции - по крайней мере, транзакции, включающие более одного кластера Кафка.

...