Как опросить будущее в Scala? - PullRequest
8 голосов
/ 11 апреля 2019

Я хочу опросить конечную точку API, пока она не достигнет какого-либо условия. Я ожидаю, что это достигнет этого состояния через пару секунд до минуты. У меня есть метод для вызова конечной точки, которая возвращает Future. Есть ли какой-нибудь способ, которым я могу связать Future с, чтобы опросить эту конечную точку каждые n миллисекунды и сдаться после t попыток?

Предположим, у меня есть функция со следующей подписью:

def isComplete(): Future[Boolean] = ???

Самый простой способ сделать это, на мой взгляд, сделать все блокирующим:

def untilComplete(): Unit = {
  for { _ <- 0 to 10 } {
    val status = Await.result(isComplete(), 1.seconds)
    if (status) return Unit
    Thread.sleep(100)
  }
  throw new Error("Max attempts")
}

Но это может занимать все потоки и не асинхронно. Я также подумал сделать это рекурсивно:

def untilComplete(
    f: Future[Boolean] = Future.successful(false),
    attempts: Int = 10
  ): Future[Unit] = f flatMap { status =>
    if (status) Future.successful(Unit)
    else if (attempts == 0) throw new Error("Max attempts")
    else {
      Thread.sleep(100)
      untilComplete(isComplete(), attempts - 1)
    }
}

Тем не менее, я обеспокоен тем, чтобы максимизировать стек вызовов, поскольку это не хвостовая рекурсия.

Есть ли лучший способ сделать это?

Редактировать: я использую akka

Ответы [ 3 ]

5 голосов
/ 11 апреля 2019

Вы можете использовать Akka Streams . Например, для вызова isComplete каждые 500 миллисекунд, пока результат Future не станет истинным, максимум до пяти раз:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import scala.concurrent.Future
import scala.concurrent.duration._

def isComplete(): Future[Boolean] = ???

implicit val system = ActorSystem("MyExample")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val stream: Future[Option[Boolean]] =
  Source(1 to 5)
    .throttle(1, 500 millis)
    .mapAsync(parallelism = 1)(_ => isComplete())
    .takeWhile(_ == false, true)
    .runWith(Sink.lastOption)

stream onComplete { result =>
  println(s"Stream completed with result: $result")
  system.terminate()
}
4 голосов
/ 11 апреля 2019

Это на самом деле не рекурсивно вообще , поэтому стек будет в порядке.

Одно из усовершенствований вашего подхода, о котором я могу подумать, - это использовать какой-то планировщик вместо Thread.sleep, чтобы вы не задерживали поток.

В этом примере используются стандартные java TimerTask, но если вы используете какой-то фреймворк, такой как akka, play или что-то еще, у него, вероятно, есть собственный планировщик, это было бы лучшей альтернативой.

object Scheduler {
   val timer = new Timer(true)
   def after[T](d: Duration)(f :=> Future[T]): Future[T] = {
     val promise = Promise[T]()
     timer.schedule(TimerTask { def run() = promise.completeWith(f) }, d.toMillis)
     promise.future
   }
}


def untilComplete(attempts: Int = 10) = isComplete().flatMap { 
   case true => Future.successful(())
   case false if attempts > 1 => Scheduler.after(100 millis)(untilComplete(attempts-1))
   case _ => throw new Exception("Attempts exhausted.") 
}
3 голосов
/ 11 апреля 2019

Я дал себе библиотеку, чтобы сделать это.У меня есть

trait Poller extends AutoCloseable {
  def addTask[T]( task : Poller.Task[T] ) : Future[T]
  def close() : Unit
}

, где Poller.Task выглядит как

class Task[T]( val label : String, val period : Duration, val pollFor : () => Option[T], val timeout : Duration = Duration.Inf )

. Poller опрашивает каждый period до тех пор, пока метод pollFor не будет успешным (дает Some[T])или timeout превышен.

Для удобства, когда я начинаю опрос, я оборачиваю это в Poller.Task.withDeadline:

final case class withDeadline[T] ( task : Task[T], deadline : Long ) {
  def timedOut = deadline >= 0 && System.currentTimeMillis > deadline
}

, который преобразует (неизменяемый, многоразовый) timeout Продолжительность задачи до крайнего срока для попытки опроса для тайм-аута.

Для эффективного опроса я использую Java ScheduledExecutorService:

def addTask[T]( task : Poller.Task[T] ) : Future[T] = {
  val promise = Promise[T]()
  scheduleTask( Poller.Task.withDeadline( task ), promise )
  promise.future
}

private def scheduleTask[T]( twd : Poller.Task.withDeadline[T], promise : Promise[T] ) : Unit = {
  if ( isClosed ) { 
    promise.failure( new Poller.ClosedException( this ) )
  } else {
    val task     = twd.task
    val deadline = twd.deadline

    val runnable = new Runnable {

      def run() : Unit = {
        try {
          if ( ! twd.timedOut ) {
            task.pollFor() match {
              case Some( value ) => promise.success( value )
              case None          => Abstract.this.scheduleTask( twd, promise )
            }
          } else {
            promise.failure( new Poller.TimeoutException( task.label, deadline ) )
          }
        }
        catch {
          case NonFatal( unexpected ) => promise.failure( unexpected )
        }
      }
    }

    val millis = task.period.toMillis
    ses.schedule( runnable, millis, TimeUnit.MILLISECONDS )
  }
}

Кажется, это работаетну, не требуя сна или блокировки отдельных Threads.

(Глядя на библиотеку, можно сделать многое, чтобы сделать ее более понятной, удобной для чтения и прояснить роль Poller.Task.withDeadlineсделав необработанный конструктор для этого класса private. Крайний срок должен всегда вычисляться из задачи timeout, не должен быть произвольной свободной переменной.)

Этот код взят из здесь (framework)и черта) и здесь (реализация) .(Если вы хотите использовать прямые координаты maven, то здесь .)

...