Пример весенней сделки с jpa и kafka? - PullRequest
1 голос
/ 17 марта 2020

После обновления весенней загрузки до 2.2.5 с версии 2.1.11 клиент kafka выдает сообщения брокеру до того, как транзакция jpa будет зафиксирована. Без использования менеджера транзакций с цепочкой kafka транзакция работала нормально. Есть ли какие-либо изменения в обратной совместимости, вносимые в транзакции между 2.1.x и 2.2.x? Может ли кто-нибудь предоставить какой-нибудь рабочий менеджер транзакций, охватывающий JPA и Kafka?

Я использовал только следующий менеджер транзакций для JPA

  @Bean
  @Primary
  public PlatformTransactionManager transactionManager(final EntityManagerFactory emf) {
    final JpaTransactionManager txManager = new JpaTransactionManager();
    txManager.setEntityManagerFactory(emf);
    return txManager;
  }

Я использовал следующие свойства для транзакции kafka:

spring.cloud.stream.kafka.default.consumer.configuration.isolation.level: read_committed spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: xyz-0 spring.kafka.producer.transaction -id-префикс: xyz-0

1 Ответ

0 голосов
/ 23 марта 2020

Мы можем создать цепочку транзакций с помощью jpa и kafka. Мы можем использовать BinderFactory, чтобы получить связыватель kafka и создать менеджер транзакций связывания kafka.

@Bean(name = "chainedTransactionManager")
  @Primary
  public PlatformTransactionManager chainedTransactionManager(JpaTransactionManager jpaTM,
      BinderFactory binders) {
    Binder<MessageChannel,?,?> binder = binders.getBinder("kafka", MessageChannel.class);
    if (binder instanceof KafkaMessageChannelBinder) {
      ProducerFactory<byte[], byte[]> pf =
          ((KafkaMessageChannelBinder) binder).getTransactionalProducerFactory();
      KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
      ktm.setTransactionSynchronization(
          AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
      return new ChainedKafkaTransactionManager<Object, Object>(jpaTM, ktm);
    } else {
      return jpaTM;
    }
  }```
...