Составьте фьючерсы с восстановлением в Scala - PullRequest
2 голосов
/ 31 марта 2020

У меня есть метод, который выполняет асин c действия. Каждое асин c действие требует восстановления. Как я могу написать метод более сложным способом?

Псевдо scala код:

object Test {

  import ExecutionContext.Implicits.global

  case class ObjectData(
                         internal: String,
                         external1: String = "",
                         external2: String = ""
                       )

  case class ObjectInfo(
                         id: String,
                         internal: String,
                         external1: String = "",
                         external2: String = ""
                       )

  def addObject(data: ObjectData): Future[ObjectInfo] = {
    internalActionWithRollback(data.internal) { objectInfo =>
      externalActionWithRollback(objectInfo.id, data.external1) {
        externalActionWithRollback(objectInfo.id, data.external2) {
          Future.successful(
            objectInfo.copy(
              external1 = data.external1,
              external2 = data.external2
            )
          )
        }
      }
    }
  }

  private def internalActionWithRollback[R](internal: String)
                                           (nextAction: ObjectInfo => Future[R]): Future[R] = ???

  private def externalActionWithRollback[R](id: String, external: String)
                                           (nextAction: => Future[R]): Future[R] = ???

}

Редактировать:

@ Иван Курченко помог мне решить проблему. Нам нужна будущая оболочка с функциями flatMap и map:

object TransactFuture {
  implicit class FutureOps[T, R](underling: Future[T]) {
    def rollbackWith(rollback: PartialFunction[Throwable, Future[R]]): TransactFuture[T, R] = {
      new TransactFuture[T, R](underling, rollback)
    }
    def empty: TransactFuture[T, R] = {
      new TransactFuture[T, R](underling, PartialFunction.empty[Throwable, Future[R]])
    }
  }

}

class TransactFuture[T, R](underlying: Future[T], rollback: PartialFunction[Throwable, Future[R]]) {

  private def recoveryInternal[S](implicit ec: ExecutionContext): PartialFunction[Throwable, Future[S]] = {
    case ex: Throwable =>
      val failed = Future.failed[S](ex)
      rollback.lift(ex).fold(failed)(_.flatMap[S](_ => failed))
  }

  def flatMap[S](f: T => Future[S])(implicit ec: ExecutionContext): Future[S] = {
    underlying.flatMap(f).recoverWith(recoveryInternal)
  }
  def map[S](f: T => S)(implicit ec: ExecutionContext): Future[S] = {
    underlying.map(f).recoverWith(recoveryInternal)
  }
}

С помощью этой оболочки мы можем написать основную функцию:

  def addObject(data: ObjectData): Future[ObjectInfo] = {
    for {
      objectInfo <- addObjectInternal(internal)
      _ <- addExternal(objectInfo.id, data.external1)
        .rollbackWith({
          case _: Throwable => deleteObjectInternal(objectInfo.id)
        })
      _ <- addExternal(objectInfo.id, data.external2)
        .rollbackWith({
          case _: Throwable => deleteExternal(objectInfo.id)
        })
    } yield {
      objectInfo.copy(
        external1 = data.external1,
        external2 = data.external2
      )
    }
  }

1 Ответ

1 голос
/ 31 марта 2020

Если я вас правильно понял, вы хотите вызвать какое-то действие, если выполнение дочернего или следующего Future не удалось. Кажется, такое поведение похоже на какую-то сделку. Ну, Future из коробки не обеспечивает такого поведения, но вместо этого может быть реализована некоторая оболочка:

class TransactFuture[T](underlying: Future[T], rollback: PartialFunction[Throwable, Future[Unit]]) {
  def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = {
    underlying.flatMap(f).recoverWith {
      case exception: Throwable =>
        val failure = Future.failed[S](exception)
        rollback.lift(exception).fold(failure)(_.flatMap(_ => failure))
    }
  }
}

// Just provides syntax sugar over Future
implicit class FutureOps[T](underling: Future[T]) {
  def rollbackWith(rollback: PartialFunction[Throwable, Future[Unit]]): TransactFuture[T] = {
    new TransactFuture[T](underling, rollback)
  }
}

, которая может быть использована следующим образом:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

val action = for {
  _ <- Future.successful(println("A executed")).rollbackWith {
    case _: Throwable => Future.successful(println("A recovered"))
  }
  _ <- Future.successful(println("B executed")).rollbackWith {
    case _: Throwable => Future.successful(println("B recovered"))
  }
  _ <- Future.failed(new Exception("C failed"))
} yield ()

Await.result(action, 1 second)

Таким образом, результат будет:

A executed
B executed
B recovered
A recovered
Exception in thread "main" ......
Caused by: java.lang.Exception: C failed

Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...