Scala Mongodb транзакция: как выполнить откат? - PullRequest
0 голосов
/ 29 апреля 2019

Существует пример транзакции Scala mongodb:

https://github.com/mongodb/mongo-scala-driver/blob/r2.4.0/driver/src/it/scala/org/mongodb/scala/DocumentationTransactionsExampleSpec.scala

Но не ясно, как откатить транзакцию в случае сбоя.

Воткод, который я скопировал из официального примера, но немного изменил, чтобы во второй вставке произошла ошибка транзакции (вставка 2 документов с одинаковыми идентификаторами), но проблема в том, что первый документ сохраняется, и мне нужно откатить ВСЕ транзакцию.

import org.mongodb.scala._
import scala.concurrent.Await
import scala.concurrent.duration.Duration

object Application extends App {

  val mongoClient: MongoClient = MongoClient("mongodb://localhost:27018")

  val database = mongoClient.getDatabase("hr")
  val employeesCollection = database.getCollection("employees")

  // Implicit functions that execute the Observable and return the results
  val waitDuration = Duration(5, "seconds")
  implicit class ObservableExecutor[T](observable: Observable[T]) {
    def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration)
  }

  implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
    def execute(): T = Await.result(observable.toFuture(), waitDuration)
  }


  updateEmployeeInfoWithRetry(mongoClient).execute()

  Thread.sleep(3000)

  /// -------------------------


  def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {
    observable.map(clientSession => {
      val eventsCollection = database.getCollection("events")

      val transactionOptions = TransactionOptions.builder().readConcern(ReadConcern.SNAPSHOT).writeConcern(WriteConcern.MAJORITY).build()
      clientSession.startTransaction(transactionOptions)

      eventsCollection.insertOne(clientSession, Document("_id" -> "123", "employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
        .subscribe((res: Completed) => println(res))

      // THIS SHOULD FAIL, SINCE  THERE IS ALREADY DOCUMENT WITH ID = 123, but PREVIOUS OPERATION SHOULD BE ALSO ROLLED BACK.
      // I COULD NOT FIND THE WAY HOW TO ROLLBACK WHOLE TRANSACTION IF ONE OF OPERATIONS FAILED
      eventsCollection.insertOne(clientSession, Document("_id" -> "123", "employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
        .subscribe((res: Completed) => println(res))
      // I'VE TRIED VARIOUS THINGS (INCLUDING CODE BELOW)
//        .subscribe(new Observer[Completed] {
//          override def onNext(result: Completed): Unit = println("onNext")
//
//          override def onError(e: Throwable): Unit = clientSession.abortTransaction()
//
//          override def onComplete(): Unit = println("complete")
//        })
      clientSession
    })
  }

  def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
    observable.recoverWith({
      case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
        println("UnknownTransactionCommitResult, retrying commit operation ...")
        commitAndRetry(observable)
      }
      case e: Exception => {
        println(s"Exception during commit ...: $e")
        throw e
      }
    })
  }

  def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
    observable.recoverWith({
      case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
        println("TransientTransactionError, aborting transaction and retrying ...")
        runTransactionAndRetry(observable)
      }
    })
  }

  def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {

    val database = client.getDatabase("hr")
    val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())
    val commitTransactionObservable: SingleObservable[Completed] =
      updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
    val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)

    runTransactionAndRetry(commitAndRetryObservable)
  }
}

Как откатить всю транзакцию, если какая-либо операция не удалась?

1 Ответ

0 голосов
/ 17 мая 2019

Из исходного кода драйвера Scala на https://github.com/mongodb/mongo-scala-driver/blob/r2.6.0/driver/src/main/scala/org/mongodb/scala/ClientSessionImplicits.scala

Похоже, что существует метод abortTransaction(), определенный вместе с commitTransaction().

В другом примечаниив настоящее время одна транзакция набора реплик в MongoDB 4.0 будет автоматически прервана, если она не будет зафиксирована в течение 60 секунд (настраивается).В блоге Многодокументарные транзакции ACID MongoDB :

По умолчанию MongoDB автоматически прерывает любую многодокументную транзакцию, которая выполняется более 60 секунд.Обратите внимание: если объем записи на сервер низкий, у вас есть возможность настраивать транзакции для более длительного времени выполнения.

...