Транзакции с ReactiveCrudRepository с spring-data-r2dbc - PullRequest
0 голосов
/ 21 января 2019

Я пытаюсь реализовать транзакции с репозиториями spring-data-r2dbc в сочетании с TransactionalDatabaseClient следующим образом:

class SongService(
    private val songRepo: SongRepo,
    private val databaseClient: DatabaseClient

){
    private val tdbc = databaseClient as TransactionalDatabaseClient

    ...
    ...
    fun save(song: Song){
        return tdbc.inTransaction{ 
            songRepo
                .save(mapRow(song, albumId)) //Mapping to a row representation
                .delayUntil { savedSong -> tdbc.execute.sql(...).fetch.rowsUpdated() } //saving a many to many relation
                .map(::mapSong) //Mapping back to actual song and retrieve the relationship data.
        }
    }

}

В настоящее время у меня есть класс конфигурации (помеченный @Configuration и @EnableR2dbcRepositories), который расширяется от AbstractR2dbcConfiguration. Здесь я переопределяю метод databaseClient, чтобы вернуть TransactionalDatabaseClient. Это должен быть тот же экземпляр, что и в классе SongService.

При запуске кода в тесте с использованием только подписки и печати я получаю org.springframework.transaction.NoTransactionException: ReactiveTransactionSynchronization not active и данные о взаимоотношениях не возвращаются.

При использовании stepverifier проекта Reactors я получаю java.lang.IllegalStateException: Connection is closed. Также в этом случае данные отношения не возвращаются.

Только для записи, я видел https://github.com/spring-projects/spring-data-r2dbc/issues/44

1 Ответ

0 голосов
/ 28 марта 2019

Вот рабочий пример Java:

@Autowired TransactionalDatabaseClient txClient;
@Autowired Mono<Connection> connection;
//You Can also use: @Autowired Mono<? extends Publisher> connectionPublisher;

public Flux<Void> example {
txClient.enableTransactionSynchronization(connection);
// Or, txClient.enableTransactionSynchronization(connectionPublisher);

Flux<AuditConfigByClub> audits = txClient.inTransaction(tx -> {
  txClient.beginTransaction();
  return tx.execute().sql("SELECT * FROM audit.items")
  .as(Item.class)
  .fetch()
  .all();
}).doOnTerminate(() -> {
  txClient.commitTransaction();
});
txClient.commitTransaction();

audits.subscribe(item -> System.out.println("anItem: " + item));
  return Flux.empty()
}

Я только начал реагировать, поэтому не слишком уверен, что я делаю с моими обратными вызовами, ха-ха.Но я решил пойти с TransactionalDatabaseClient над DatabaseClient или Connection, так как я возьму все утилиты, которые смогу получить, пока R2dbc находится в текущем состоянии.

В своем коде вы действительно создали экземплярОбъект подключения?Если это так, я думаю, вы бы сделали это в вашей конфигурации.Его можно использовать во всем приложении так же, как и DatabaseClient, но он немного сложнее.

Если нет:

@Bean
@Override // I also used abstract config
public ConnectionFactory connectionFactory() {
  ...
}

@Bean 
TransactionalDatabaseClient txClient() {
  ...
}

//TransactionalDatabaseClient will take either of these as arg in 
//#enableTransactionSynchronization method

@Bean
public Publisher<? extends Connection> connectionPublisher() {
  return connectionFactory().create();
}

@Bean
public Mono<Connection> connection() {
  return = Mono.from(connectionFactory().create());
}

Если у вас возникли проблемы с переводом на Kotlin, есть альтернативаспособ включить синхронизацию, которая может работать:

// From what I understand, this is a useful way to move between 
// transactions within a single subscription
TransactionResources resources = TransactionResources.create();
resources.registerResource(Resource.class, resource);

ConnectionFactoryUtils
  .currentReactiveTransactionSynchronization()
  .subscribe(currentTx -> sync.registerTransaction(Tx));

Надеюсь, это хорошо для Котлина.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...