Scala повторите последовательность фьючерсов, пока они не завершатся - PullRequest
3 голосов
/ 08 января 2020

В scala как бы вы написали функцию, которая принимает Последовательность Фьючерсов, запускает их все, непрерывно повторяет все неудачные попытки и возвращает результаты?

Например, подпись может быть:

  def waitRetryAll[T](futures: Seq[Future[T]]): Future[Seq[T]]

Бонусные баллы за настраиваемое время ожидания, при котором функция перестает работать, и вызываемый абонент может обработать этот случай.
Бонусные бонусные баллы, если этот обработчик ошибок может получить список фьючерсов, которые потерпели неудачу.

Спасибо!

Ответы [ 2 ]

3 голосов
/ 08 января 2020

На основе Повторите функцию, которая возвращает Future , рассмотрите

def retry[T](expr: => Future[T], n: Int = 3): Future[Either[Throwable, T]] = {
  Future
    .unit
    .flatMap(_ => expr).map(v => Right(v))
    .recoverWith {
      case _ if n > 1 => retry(expr, n - 1)
      case e => Future.failed(e).recover{case e => Left(e)}
    }
}

в сочетании с

Future.sequence

, которая преобразует List[Future[T]] в Future[List[T]]. Тем не менее, sequence ведет себя безотказно, поэтому нам пришлось поднять наш Future[T] до Future[Either[Throwable, T]]

Соединяя эти части вместе, мы можем определить

def waitRetryAll[T](futures: List[() => Future[T]]): Future[List[Either[Throwable, T]]] = {
  Future.sequence(futures.map(f => retry(f.apply())))
}

и использовать его следующим образом

val futures = List(
  () => Future(42),
  () => Future(throw new RuntimeException("boom 1")),
  () => Future(11),
  () => Future(throw new RuntimeException("boom 2"))
)

waitRetryAll(futures)
  .andThen { case v => println(v) }

, который выводит

Success(List(Right(42), Left(java.lang.RuntimeException: boom 1), Right(11), Left(java.lang.RuntimeException: boom 2)))

Мы можем collect наших Left с или Right с и соответственно восстановить или продолжить обработку, например

waitRetryAll(futures)
  .map(_.collect{ case v if v.isLeft => v })
  ...

Обратите внимание, как мы должны были ввести List[() => Future[T]] вместо List[Future[T]], чтобы предотвратить стремительный запуск фьючерса.

2 голосов
/ 08 января 2020

Насколько я помню, в стандартной библиотеке нет утилит для Future timeout.

Как бы вы прервали / отменили текущие вычисления на JVM? В общем случае вы не можете, вы можете прерывать Thread только тогда, когда он включен wait, но если он никогда не wait с? Библиотеки ввода-вывода для асинхронных вычислений c (которые определяют отмену) выполняют ввод-вывод как серию небольших непрерывных задач (каждая карта / flatMap создает новый шаг), и если они получают отмену / тайм-аут, они продолжат выполнение текущей задачи (так как они не могу остановить это) но они не начнут следующий. Вы можете вернуть исключение по таймауту, но все же последний шаг будет выполнен, поэтому, если это будет некоторый побочный эффект (например, операция БД), он будет завершен после того, как вы уже вернули ошибку.

Это не интуитивно понятно и хитрый, и я думаю, что это было причиной, почему это поведение не было добавлено в стандартную библиотеку.

Кроме того, будущее - это продолжающаяся операция, которая может иметь побочные эффекты. Вы не можете принять значение типа Future[A] и выполнить его повторно. Однако вы можете передать в качестве параметра имя по имени, чтобы в .recoverWith вы могли заново создать будущее.

В то время, как вам грустно, вы можете реализовать что-то вроде «повторить до LocalDateTime.now - startTime> =» потому что это то, что, я думаю, вам нужно:

def retry[A](future: => Future[A], attemptsLeft: Int, timeoutTime: Instant) =
  future.recoverWith {
    case error: Throwable =>
      if (attemptsLeft <= 0 || Instant.now.isAfter(timeoutTime)) Future.failure(error)
      else retryHelper(future, attemptsLeft - 1, timeoutTime)
  }

Это можно объединить с Future.sequence для создания списка результатов:

def retryFutures[A](list: List[() => Future[A]) = {
  val attempts: Int = ...
  val timeout: Instant = ...
  Future.sequence(list.map(future => retry(future(), attempts, timeout)))
}

Если вы хотите отслеживать, какие будущее провалилось и успешно завершилось:

def futureAttempt[A](future: Future[A]): Future[Either[Throwable, A]] =
  future.map(a => Right(a))).recover {
    case error: Throwable => Left(error)
  }

def retryFutures[A](list: List[() => Future[A]) = {
  val attempts: Int = ...
  val timeout: Instant = ...
  Future.sequence(list.map(future => retry(futureAttempt(future()), attempts, timeout)))
}

Если у вас нет проблем с отменой фьючерсов на JVM и если у вас есть больше подобных случаев, я бы предложил использовать библиотеку.

Если вы хотите чтобы использовать что-то, что реализует для вас повтор, есть cats-retry

Если вы хотите иметь что-то лучше, чем Future при определении вычислений (например, что-то, что вам не нужно использовать параметры по именам или нулевые функции) попробуйте Monix или ZIO (https://zio.dev/)

...