Spring Webflux с транзакциями - PullRequest
0 голосов
/ 25 мая 2018

Возможно ли управление транзакциями с помощью Spring Webflux?Мы будем создавать микросервис, и я планирую использовать Spring Webflux, но я хотел бы убедиться, что запрос PUT / POST, который изменяет состояние, может быть атомарным.Например, если полученный запрос должен:

  • Состояние обновления в собственной базе данных микросервиса
  • Обновление нисходящей службы через API REST

Можно ли обернуть шаги внутри транзакции, чтобы изменения были атомарными?

Ответы [ 2 ]

0 голосов
/ 25 января 2019

да, конечно, вы можете иметь транзакцию !.При использовании базы данных NoSql, такой как Cassandra или Mongo, реактивное расширение через реактивный поток является нативным, но в случае реляционной базы данных могут возникнуть некоторые проблемы.Конечно, вы можете обернуть ваш вызов репозитория в Mono и использовать schedulder для асинхронной функции, но я не настолько уверен, что транзакция корректно охватывает многие фреймворки, такие как JPA, полагаются на тот факт, что все выполняются в потоке с использованием локального потока и JDBCблокирующий IO api ..... в нескольких словах использование mysql postgress и т. д. возможно, но не эффективно и в некоторых случаях опасно.

Хорошая новость заключается в том, что существуют некие рамки, которые заполняют этот пробел во многихэто экспериментально, но возможно, R2DBC - это случай.

Это очень красивая структура.

Это образец, взятый из моего репозитория на github, он построен на весенней загрузке 2.1 и kotlinно в Java синтаксис очень похож:

веб-слой

@Configuration
class ReservationRoutesConfig {

    @Bean
    fun reservationRoutes(@Value("\${baseServer:http://localhost:8080}") baseServer: String,
                          reservationRepository: ReservationRepository) =
            router {
                POST("/reservation") {
                    it.bodyToMono(ReservationRepresentation::class.java)
                            .flatMap { Mono.just(ReservationRepresentation.toDomain(reservationRepresentation = it)) }
                            .flatMap { reservationRepository.save(it).toMono() }
                            .flatMap { ServerResponse.created(URI("$baseServer/reservation/${it.reservationId}")).build() }

                }

                GET("/reservation/{reservationId}") {
                    reservationRepository.findOne(it.pathVariable("reservationId")).toMono()
                            .flatMap { Mono.just(ReservationRepresentation.toRepresentation(it)) }
                            .flatMap { ok().body(BodyInserters.fromObject(it)) }
                }

                DELETE("/reservation/{reservationId}") {
                    reservationRepository.delete(it.pathVariable("reservationId")).toMono()
                            .then(noContent().build())
                }
            }
}

уровень хранилища:

class ReactiveReservationRepository(private val databaseClient: TransactionalDatabaseClient,
                                    private val customerRepository: CustomerRepository) : ReservationRepository {

    override fun findOne(reservationId: String): Publisher<Reservation> =
            databaseClient.inTransaction {
                customerRepository.find(reservationId).toMono()
                        .flatMap { customer ->
                            it.execute().sql("SELECT * FROM reservation WHERE reservation_id=$1")
                                    .bind("$1", reservationId)
                                    .exchange()
                                    .flatMap { sqlRowMap ->
                                        sqlRowMap.extract { t, u ->
                                            Reservation(t.get("reservation_id", String::class.java)!!,
                                                    t.get("restaurant_name", String::class.java)!!,
                                                    customer, t.get("date", LocalDateTime::class.java)!!)
                                        }.one()
                                    }
                        }
            }

    override fun save(reservation: Reservation): Publisher<Reservation> =
            databaseClient.inTransaction {
                customerRepository.save(reservation.reservationId, reservation.customer).toMono()
                        .then(it.execute().sql("INSERT INTO reservation (reservation_id, restaurant_name, date) VALUES ($1, $2, $3)")
                                .bind("$1", reservation.reservationId)
                                .bind("$2", reservation.restaurantName)
                                .bind("$3", reservation.date)
                                .fetch().rowsUpdated())
            }.then(Mono.just(reservation))


    override fun delete(reservationId: String): Publisher<Void> =
            databaseClient.inTransaction {
                customerRepository.delete(reservationId).toMono()
                        .then(it.execute().sql("DELETE FROM reservation WHERE reservation_id = $1")
                                .bind("$1", reservationId)
                                .fetch().rowsUpdated())
            }.then(Mono.empty())

}

Я надеюсь, что это может помочь вам

0 голосов
/ 21 августа 2018

Вы можете использовать TransactionTemplate в своем реактивном коде:

private TransactionTemplate transactionTemplate;

// save to DB method
public Void saveMembers(Member m1, Member m2) {
    saveMember(m1);
    saveMember(m2);
}

// your reactive method 
public Mono<Void> saveMembersAsync(Member m1, Member m) {
    return Mono.fromCallable(() -> transactionTemplate.execute(transactionStatus -> 
        saveMembers(m1, m2)));
}
...