Существует пример транзакции Scala mongodb:
https://github.com/mongodb/mongo-scala-driver/blob/r2.4.0/driver/src/it/scala/org/mongodb/scala/DocumentationTransactionsExampleSpec.scala
Но не ясно, как откатить транзакцию в случае сбоя.
Воткод, который я скопировал из официального примера, но немного изменил, чтобы во второй вставке произошла ошибка транзакции (вставка 2 документов с одинаковыми идентификаторами), но проблема в том, что первый документ сохраняется, и мне нужно откатить ВСЕ транзакцию.
import org.mongodb.scala._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
object Application extends App {
val mongoClient: MongoClient = MongoClient("mongodb://localhost:27018")
val database = mongoClient.getDatabase("hr")
val employeesCollection = database.getCollection("employees")
// Implicit functions that execute the Observable and return the results
val waitDuration = Duration(5, "seconds")
implicit class ObservableExecutor[T](observable: Observable[T]) {
def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration)
}
implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
def execute(): T = Await.result(observable.toFuture(), waitDuration)
}
updateEmployeeInfoWithRetry(mongoClient).execute()
Thread.sleep(3000)
/// -------------------------
def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {
observable.map(clientSession => {
val eventsCollection = database.getCollection("events")
val transactionOptions = TransactionOptions.builder().readConcern(ReadConcern.SNAPSHOT).writeConcern(WriteConcern.MAJORITY).build()
clientSession.startTransaction(transactionOptions)
eventsCollection.insertOne(clientSession, Document("_id" -> "123", "employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
.subscribe((res: Completed) => println(res))
// THIS SHOULD FAIL, SINCE THERE IS ALREADY DOCUMENT WITH ID = 123, but PREVIOUS OPERATION SHOULD BE ALSO ROLLED BACK.
// I COULD NOT FIND THE WAY HOW TO ROLLBACK WHOLE TRANSACTION IF ONE OF OPERATIONS FAILED
eventsCollection.insertOne(clientSession, Document("_id" -> "123", "employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
.subscribe((res: Completed) => println(res))
// I'VE TRIED VARIOUS THINGS (INCLUDING CODE BELOW)
// .subscribe(new Observer[Completed] {
// override def onNext(result: Completed): Unit = println("onNext")
//
// override def onError(e: Throwable): Unit = clientSession.abortTransaction()
//
// override def onComplete(): Unit = println("complete")
// })
clientSession
})
}
def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
println("UnknownTransactionCommitResult, retrying commit operation ...")
commitAndRetry(observable)
}
case e: Exception => {
println(s"Exception during commit ...: $e")
throw e
}
})
}
def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
println("TransientTransactionError, aborting transaction and retrying ...")
runTransactionAndRetry(observable)
}
})
}
def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {
val database = client.getDatabase("hr")
val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())
val commitTransactionObservable: SingleObservable[Completed] =
updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
runTransactionAndRetry(commitAndRetryObservable)
}
}
Как откатить всю транзакцию, если какая-либо операция не удалась?