Как Quarkus транзакционный работает с реактивным sql клиентом - PullRequest
0 голосов
/ 08 апреля 2020

В настоящее время я пытаюсь создать простое приложение-прототип с помощью Quarkus (1.3.1). Я пытаюсь использовать реактивный подход с реактивным подключением к базе данных PostgreSQL, используя https://quarkus.io/guides/reactive-sql-clients. Здесь я использую kotlin и мятеж, чтобы немного упростить вещи.

У меня есть следующий контроллер:

package me.invitation.infrastructure.rest

import io.smallrye.mutiny.Uni
import me.invitation.application.service.InvitationService
import me.invitation.infrastructure.rest.api.InvitationDto
import me.invitation.infrastructure.rest.system.MicroserviceContentType
import org.eclipse.microprofile.context.ThreadContext
import org.slf4j.LoggerFactory
import javax.inject.Inject
import javax.ws.rs.*
import javax.ws.rs.core.Response


@Path("/invitation")
class Resource @Inject constructor(
        private val invitationService: InvitationService,
        private val threadContext: ThreadContext) {


    @POST
    @Consumes(MicroserviceContentType.PUBLIC_RSC_JSON)
    fun postInvitation(invitation: InvitationDto): Uni<Response> {
        return Uni.createFrom()
                .completionStage(
                        threadContext.withContextCapture(
                                invitationService.sendInvite(invitation.email)))
                .map { Response.status(Response.Status.CREATED).build() }
    }

    companion object {
        private val log = LoggerFactory.getLogger(Resource::class.java)
    }
}

Реализация InvitationService выглядит следующим образом:

package me.invitation.application.service

import io.smallrye.mutiny.Uni
import me.invitation.application.registry.InvitationRegistry
import me.invitation.application.service.mail.MailService
import org.apache.commons.codec.binary.Base32
import org.eclipse.microprofile.config.inject.ConfigProperty
import org.eclipse.microprofile.context.ManagedExecutor
import org.eclipse.microprofile.context.ThreadContext
import org.slf4j.LoggerFactory
import java.security.NoSuchAlgorithmException
import java.security.SecureRandom
import java.time.Duration
import java.time.LocalDateTime
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.function.Supplier
import javax.enterprise.context.ApplicationScoped
import javax.inject.Inject
import javax.transaction.Transactional

@ApplicationScoped
class ExternalRegistryInvitationService @Inject constructor(
        private val invitationRepository: InvitationRegistry,
        private val mailService: MailService,
        private val threadContext: ThreadContext,
        private val managedExecutor: ManagedExecutor,
        @ConfigProperty(name ="invitation.expiration-ttl-seconds") private val invitationTTLSeconds: Long
) : InvitationService {

    @Transactional
    override fun sendInvite(email: String): CompletableFuture<Unit> {
        return Uni.createFrom()
                  .completionStage(threadContext.withContextCapture(invitationRepository.save(id, email)))
                  .flatMap {wasSaved ->
                                if(wasSaved) {
                                   Uni.createFrom().completionStage(threadContext.withContextCapture(mailService.sendInvitationEmail(email)))
                                } else {
                                    throw InvitationAlreadyPresentException()
                                }
                            }.subscribeAsCompletionStage()
    }
}

Здесь я пытаюсь сохранить запись в базе данных и после этого отправить электронное письмо

InvitationRegistry Реализация выглядит следующим образом:

package me.invitation.infrastructure.db

import io.smallrye.mutiny.Uni
import io.vertx.mutiny.pgclient.PgPool
import io.vertx.mutiny.sqlclient.Tuple
import me.invitation.application.registry.InvitationRegistry
import java.time.LocalDateTime
import java.util.concurrent.CompletableFuture
import javax.enterprise.context.ApplicationScoped
import javax.inject.Inject
import javax.transaction.Transactional

@ApplicationScoped
class PostgresInvitationRegistry @Inject constructor(private val client: PgPool) : InvitationRegistry {

    private final val insertQuery = "INSERT INTO invitation(id, email, aud_ts_created, aud_ts_last_modified) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING RETURNING id"

    @Transactional(value = Transactional.TxType.MANDATORY)
    override fun save(id: String, email: String): CompletableFuture<Boolean> {
        return client
                .preparedQuery(insertQuery, Tuple.of(id, email, LocalDateTime.now(), LocalDateTime.now()))
                .flatMap { result -> Uni.createFrom().item(result.rowCount() > 0) }
                .subscribeAsCompletionStage()
    }
}

Реализация MailService не имеет значения, потому что все, что она делает, вызывает исключение RuntimeException.

Здесь я ожидаю откат транзакции верхнего уровня. Потому что MailService.sendInvitationEmail вызывает исключение. Но это не так. Даже в случае исключения запись сохраняется. Я прочитал это руководство https://quarkus.io/guides/transaction#reactive -extensions и https://quarkus.io/guides/context-propagation#usage -example-for-завершений , и я подумал, что декларативные транзакции хорошо работают с реактивными sql в Quarkus.

Я думаю, что для реактивного соединения есть настройка autocommit , которая установлена ​​в true. Но не нашел ничего, связанного с автокоммитом, ни в документах, ни в источниках quarkus и его расширениях. Удивительно, но даже нет автоматической настройки коммитов даже для quarkus main jdb c pool = agroal!

Кто-нибудь знает, как заставить декларативные транзакции работать с реактивным sql клиентом в Quarkus

...