ChainedTransactionManager не откатывает транзакцию БД при сбое транзакции kafka - PullRequest
0 голосов
/ 29 апреля 2020

Я пытался реализовать chainedTM (для транзакции с БД, за которой следовала транзакция Кафки). Я следовал до c здесь - https://docs.spring.io/spring-kafka/reference/html/#ex -jdb c -syn c.

После реализации я создал тест, в котором я намеренно провалю транзакцию kafka, чтобы убедиться, что транзакция БД откатывается, но это НЕ происходит так. Нужна помощь.

Вот мой код конфигурации

@Configuration
@EnableTransactionManagement
public class KafkaConfig {

private static final Logger LOG = LoggerFactory.getLogger(KafkaConfig.class);

@Autowired
private KafkaProperties kafkaProperties;


@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props =
            new HashMap<>(kafkaProperties.buildProducerProperties());

    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
            true);
    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 0);
    return props;
}


@Bean
public Map<String, Object> producervdtConfigs() {
    Map<String, Object> props =
            new HashMap<>(kafkaProperties.buildProducerProperties());

    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
            true);
    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 0);

    return props;
}

@Bean
public ProducerFactory<String, let> producerFactory() {
    DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<String, let>(producerConfigs());
    factory.setTransactionIdPrefix(UUID.randomUUID().toString());
    return factory;
}

@Bean(name = "chainedlbrTM")
public ChainedTransactionManager chainedTransactionManager(JpaTransactionManager transactionManager,
                                                           KafkaTransactionManager<String, let> kafkaTransactionManager) {
    return new ChainedTransactionManager(transactionManager, kafkaTransactionManager);
}


@Bean(name = "chainedvdtTM")
public ChainedTransactionManager chainedvdtTransactionManager(JpaTransactionManager transactionManager,
                                                           KafkaTransactionManager<String, vet> kafkavdtTransactionManager) {
    return new ChainedTransactionManager(transactionManager, kafkavdtTransactionManager);
}

@Bean
public KafkaTemplate<String, let> kafkaTemplate() {
    return new KafkaTemplate<String, let>(producerFactory());
}

@Bean
public KafkaTransactionManager<String, let> kafkaTransactionManager() {
    KafkaTransactionManager<String, let> ktm = new KafkaTransactionManager<String, let>(producerFactory());
    ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return ktm;
}

@Bean
public KafkaTransactionManager<String, vet> kafkavdtTransactionManager() {
    KafkaTransactionManager<String, vet> ktm = new KafkaTransactionManager<String, vet>(producervdtFactory());
    ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return ktm;
}

@Bean
@Primary
public JpaTransactionManager transactionManager(EntityManagerFactory em) {
    return new JpaTransactionManager(em);
}


@Bean
public ProducerFactory<String, vet> producervdtFactory() {
    DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<String, vet>(producervdtConfigs());
    factory.setTransactionIdPrefix(UUID.randomUUID().toString());
    return factory;
}

@Bean
public KafkaTemplate<String, vet> kafkavdtTemplate() {
    return new KafkaTemplate<String, vet>(producervdtFactory());
}

Мой класс обслуживания

@Service("libraryService")
@EnableTransactionManagement
public class LibraryServiceImpl
    implements ILibraryService
{
    @Autowired
    private LibraryRepository libraryRepository;

    @Autowired
    private IEventProducer producer;

    @Override
    @Transactional(transactionManager = "chainedLibraryTM")
    public Library createLibrary (final Library library)
        throws LibraryAlreadyExist
    {
        LOG.debug("Creating library {}:{}", library.getName(), library.getVersion());
        final Library result = libraryRepository.createLibrary(library);
        producer.libraryCreated(result);
        return result;
    }

мой EventProducer

    @Autowired
    private KafkaTemplate<String, LibraryEvent> kafkaTemplate;


public void libraryCreated(final Library library)
    {
        sendMessage(EventType.CREATED, library);
    }

    private void sendMessage(final EventType type, final Library library) {
        LOG.info("Sending {} event for library {}, topic = {}", type, library.getId(), topic);
        final LibraryEvent event = LibraryEvent.newBuilder()
                .setLibraryId(library.getId())
                .setLibraryName(library.getName())
                .setLibraryVersion(library.getVersion())
                .setEventType(type)
                .build();
        LOG.trace("Kafka Record: {}", event.toString());
        ProducerRecord<String, LibraryEvent> record =  new ProducerRecord<String, LibraryEvent>(topic, library.getId().toString(), event);
ListenableFuture<SendResult<String, LibraryEvent>> future = kafkaTemplate.send(record);
        future.addCallback(new ListenableFutureCallback<SendResult<String, LibraryEvent>>(){

            @Override
            public void onSuccess(SendResult<String, LibraryEvent> result) {
            }

            @Override
            public void onFailure(Throwable ex) {
            }
        });

        if (future.isDone()) {
            try {
                future.get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new KafkaException("Interrupted", e);
            }
            catch (ExecutionException e) {
                throw new KafkaException("Send failed", e.getCause());
            }
        }

Мой тест

@Test
    public void testArt() throws Exception {
          library2 = this.libraryService.createLibrary(
                new LibraryBuilder()
                        .name(tid)
                        .version("1.0")
                        .artifact(
                                new ArtifactBuilder()
                                        .name("TestP")
                                        .tag(P)
                                        .content("first content")
                                        .build())
                        .artifact(
                                new ArtifactBuilder()
                                        .name("TestF")
                                        .tag(F)
                                        .content("first content")
                                        .build())
                        .build());

После включения уровня ведения журнала заметил следующие журналы. Исключение обусловлено тем, что я установил конфиг (props.put (ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 0);), это приводит к сбоям транзакции kafka, чтобы выполнить откат БД.

{"@timestamp":"2020-04-30T11:18:42.184+05:30","@version":"1","message":"Exception thrown when sending a message with key='1' and payload='{\"libraryId\": 1, \"libraryName\": \"ware\", \"libraryVersion\": \"1.0\", \"eventType\": \"CREATED\"}' to topic pr-rt-deploy:","logger_name":"org.springframework.kafka.support.LoggingProducerListener","thread_name":"Test worker","level":"ERROR","level_value":40000,"stack_trace":"org.apache.kafka.common.errors.RecordTooLargeException: The message is 139 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.\n","tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.194+05:30","@version":"1","message":"Unable to post deploy event for library 1, topic = pr-rt-deploy, due to = Failed to send; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 139 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. .","logger_name":"events.EventProducerImpl","thread_name":"Test worker","level":"INFO","level_value":20000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.195+05:30","@version":"1","message":"Completing transaction for [service.LibraryServiceImpl.createLibrary]","logger_name":"org.springframework.transaction.interceptor.TransactionInterceptor","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.201+05:30","@version":"1","message":"Clearing transaction synchronization","logger_name":"org.springframework.transaction.support.TransactionSynchronizationManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.202+05:30","@version":"1","message":"Removed value [org.springframework.orm.jpa.EntityManagerHolder@421d8c5] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@40617316] from thread [Test worker]","logger_name":"org.springframework.transaction.support.TransactionSynchronizationManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.205+05:30","@version":"1","message":"Initializing transaction synchronization","logger_name":"org.springframework.transaction.support.TransactionSynchronizationManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.205+05:30","@version":"1","message":"Triggering beforeCommit synchronization","logger_name":"org.springframework.kafka.transaction.KafkaTransactionManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.206+05:30","@version":"1","message":"Triggering beforeCompletion synchronization","logger_name":"org.springframework.kafka.transaction.KafkaTransactionManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.206+05:30","@version":"1","message":"Initiating transaction commit","logger_name":"org.springframework.kafka.transaction.KafkaTransactionManager","thread_name":"Test worker","level":"DEBUG","level_value":10000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.213+05:30","@version":"1","message":"Triggering afterCommit synchronization","logger_name":"org.springframework.kafka.transaction.KafkaTransactionManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.213+05:30","@version":"1","message":"Clearing transaction synchronization","logger_name":"org.springframework.transaction.support.TransactionSynchronizationManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.214+05:30","@version":"1","message":"Triggering afterCompletion synchronization","logger_name":"org.springframework.kafka.transaction.KafkaTransactionManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.214+05:30","@version":"1","message":"Removed value [org.springframework.kafka.core.KafkaResourceHolder@31ab2fab] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@1e34605e] from thread [Test worker]","logger_name":"org.springframework.transaction.support.TransactionSynchronizationManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.216+05:30","@version":"1","message":"Resuming suspended transaction after completion of inner transaction","logger_name":"org.springframework.kafka.transaction.KafkaTransactionManager","thread_name":"Test worker","level":"DEBUG","level_value":10000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.216+05:30","@version":"1","message":"Initializing transaction synchronization","logger_name":"org.springframework.transaction.support.TransactionSynchronizationManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.247+05:30","@version":"1","message":"DataSourceManager shutdown started ..","logger_name":".infra.datasource.DataSourceManager","thread_name":"Test worker","level":"INFO","level_value":20000}

1 Ответ

0 голосов
/ 29 апреля 2020

@EnableTransactionManagement должен относиться к классу @Configuration, а не @Component с (@Service).

Если вы включите ведение журнала TRACE для org.springframework.transaction и org.springframework.kafka.transaction, вы увидите все поведение транзакции.

РЕДАКТИРОВАТЬ

kafkaTemplate.send(record);

Отправка выполняется асинхронно. Вы не проверяете состояние отправки (например, kafkaTemplate.send(record).get(10, TimeUnit.SECONDS), поэтому исключение не распространяется на контейнер слушателя.

Некоторые ошибки (например, вызываемые вами) обнаруживаются сразу, а не асинхронно. I недавно добавил код к KafkaTemplate, чтобы обнаружить эти «немедленные» сбои и выдать исключение.

Я не уверен, какую версию вы используете, но это изменение в последней версии из всех поддерживаемых веток (2.4.x, 2.3.x, 2.2.x, 1.3.x).

Но, в любом случае, вам нужно получить результат отправки в случае асинхронности c ошибка.

...