Многократное будущее, которое может потерпеть неудачу - возвращать как успехи, так и неудачи? - PullRequest
0 голосов
/ 22 сентября 2018

У меня есть ситуация, когда мне нужно запустить несколько операций параллельно.

Все операции имеют одинаковое возвращаемое значение (скажем, Seq[String]).

Возможно, что некоторыеиз операций может завершиться ошибкой, и другие успешно возвращают результаты.

Я хочу вернуть как успешные результаты, так и любые исключения, которые произошли, поэтому я могу записать их для отладки.

Есть ливстроенный или простой способ пройти через любую библиотеку (cats / scalaz), прежде чем я пойду и напишу свой собственный класс для этого?

Я думал о выполнении каждой операции в своем будущем,затем проверяя каждое будущее и возвращая кортеж Seq[String] -> Seq[Throwable], где левое значение - это успешные результаты (сглаженные / объединенные), а правое - список любых возникших исключений.

Есть ли лучший способ?

Ответы [ 4 ]

0 голосов
/ 27 сентября 2018

Я бы сделал это так:

import scala.concurrent.{Future, ExecutionContext}
import scala.util.Success

def eitherify[A](f: Future[A])(implicit ec: ExecutionContext): Future[Either[Throwable, A]] = f.transform(tryResult => Success(tryResult.toEither))

def eitherifyF[A, B](f: A => Future[B])(implicit ec: ExecutionContext): A => Future[Either[Throwable, B]] = { a => eitherify(f(a)) }

// here we need some "cats" magic for `traverse` and `separate`
// instead of `traverse` you can use standard `Future.sequence`
// there is no analogue for `separate` in the standard library

import cats.implicits._

def myProgram[A, B](values: List[A], asyncF: A => Future[B])(implicit ec: ExecutionContext): Future[(List[Throwable], List[B])] = {
  val appliedTransformations: Future[List[Either[Throwable, B]]] = values.traverse(eitherifyF(asyncF))
  appliedTransformations.map(_.separate)
}
0 голосов
/ 22 сентября 2018

Вызов value на вашем Future возвращает Option[Try[T]].Если Future не завершено, то Option равно None.Если он завершен, его легко развернуть и обработать.

if (myFutr.isCompleted)
  myFutr.value.map(_.fold( err: Throwable  => //log the error
                         , ss: Seq[String] => //process results
                         ))
else
 // do something else, come back later
0 голосов
/ 22 сентября 2018

Использование Await.ready, которое вы упоминаете в комментарии, обычно теряет большинство преимуществ от использования фьючерсов.Вместо этого вы можете сделать это, используя обычные Future комбинаторы.И давайте сделаем более общую версию, которая работает для любого возвращаемого типа;уплощение Seq[String] s может быть легко добавлено.

def successesAndFailures[T](futures: Seq[Future[T]]): Future[(Seq[T], Seq[Throwable])] = {
  // first, promote all futures to Either without failures
  val eitherFutures: Seq[Future[Either[Throwable, T]]] = 
    futures.map(_.transform(x => Success(x.toEither)))
  // then sequence to flip Future and Seq
  val futureEithers: Future[Seq[Either[Throwable, T]]] = 
    Future.sequence(eitherFutures)
  // finally, Seq of Eithers can be separated into Seqs of Lefts and Rights
  futureEithers.map { seqOfEithers =>
    val (lefts, rights) = seqOfEithers.partition(_.isLeft)
    val failures = lefts.map(_.left.get)
    val successes = rights.map(_.right.get)
    (successes, failures)
  }
}

У Скала и Кошки есть separate для упрощения последнего шага.

Типы могут быть выведеныкомпилятором они показаны только для того, чтобы помочь вам увидеть логику.

0 голосов
/ 22 сентября 2018

Звучит как хороший вариант использования для идиомы Try (она в основном похожа на монаду Either).

Пример использования из doc :

import scala.util.{Success, Failure}

val f: Future[List[String]] = Future {
  session.getRecentPosts
}

f onComplete {
  case Success(posts) => for (post <- posts) println(post)
  case Failure(t) => println("An error has occurred: " + t.getMessage)
}

На самом деле он делает немного больше, чем вы просили, потому что он полностью асинхронный.Подходит ли он для вашего варианта использования?

...