Как отменить будущее Scala по таймауту? - PullRequest
0 голосов
/ 26 сентября 2018

Допустим, у меня определено будущее, как показано ниже:

import scala.concurrent.Future

def appendCharWithTimeout(transformationId: String, char: Char, delayTimeMs: Long, delayTimes: Int) = (s: String) => {
  for (i <- 1 to delayTimes) {
    println(s"$transformationId waiting iteration $i ...")
    Thread.sleep(delayTimeMs)
  }
  s"$s$char"
}

Future("Hello ")
  .map( appendCharWithTimeout("mapJ", 'J', 200, 5) )
  .map( appendCharWithTimeout("mapO", 'o', 200, 5) )
  .map( appendCharWithTimeout("mapH", 'h', 200, 5) )
  .map( appendCharWithTimeout("mapN", 'n', 200, 5) )
  .map( appendCharWithTimeout("map!", '!', 200, 5) )

Время выполнения этого будущего составляет 5 секунд (5 * 5 * 200 мс).

Я ищу способ обернуть это будущее в своего рода «контекст тайм-аута» и остановить выполнение по тайм-ауту, поэтому не все преобразования будут выполнены.

В идеале я предполагаюесть что-то вроде этого:

Future("Hello ")
  .within(2 seconds)
  .map( appendCharWithTimeout("mapJ", 'J', 200, 5) )
  .map( appendCharWithTimeout("mapO", 'o', 200, 5) )
  .map( appendCharWithTimeout("mapH", 'h', 200, 5) )
  .map( appendCharWithTimeout("mapN", 'n', 200, 5) )
  .map( appendCharWithTimeout("map!", '!', 200, 5) )

И вывод должен быть:

mapJ waiting iteration 1 ...
mapJ waiting iteration 2 ...
mapJ waiting iteration 3 ...
mapJ waiting iteration 4 ...
mapJ waiting iteration 5 ...
mapO waiting iteration 1 ...
mapO waiting iteration 2 ...
mapO waiting iteration 3 ...
mapO waiting iteration 4 ...
mapO waiting iteration 5 ...

Ответы [ 2 ]

0 голосов
/ 26 сентября 2018

Вот несколько способов сделать это:

0) Не связывайте Future s.Выполнение является последовательным, поэтому просто используйте цикл внутри одного Future и отследите общее истекшее время в вашем цикле.

1) Запишите время начала в val вне Future и используйте этоизменить значение времени ожидания, заданное для appendCharWithTimeout, чтобы общее время выполнения не превышалось.

2) Пусть appendCharWithTimeout займет общее время выполнения и вернет оставшееся время до следующей итерации.Используйте это, чтобы остановить выполнение при превышении времени ожидания.

Выбор зависит от того, что на самом деле делает реальный код, и можете ли вы изменить код в appendCharWithTimeout.

0 голосов
/ 26 сентября 2018

Прежде всего, пожалуйста, не смешивайте Thread.sleep с фьючерсами.Фьючерсы работают с ExecutionContext, который планирует вычисления в пуле потоков.Таким образом, если ваше будущее будет блокировать указанные темы ... это приведет к проблемам.

import java.util.{Timer, TimerTask}

import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.{Duration, TimeUnit}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}


def createFutureWithDelay[T](result: T, timeout: Duration) = {
  val promise = Promise[T]
  val timerTask = new TimerTask {
    override def run(): Unit = promise.success(result)
  }
  val timer = new Timer()
  timer.schedule(timerTask, timeout.toMillis)
  promise.future
}

def getNonBlockingFutureWithTimeout[A, T](computation: => T, timeout: Duration, t: Throwable) = {
  val promise = Promise[T]
  promise.tryCompleteWith(Future(computation))
  val timerTask = new TimerTask {
    override def run(): Unit = promise.failure(t)
  }
  val timer = new Timer()
  timer.schedule(timerTask, timeout.toMillis)
  promise.future
}

def wrapFutureWithTimeout[T](f: Future[T], timeout: Duration, t: Throwable) = {
  val promise = Promise[T]
  promise.tryCompleteWith(f)
  val timerTask = new TimerTask {
    override def run(): Unit = promise.failure(t)
  }
  val timer = new Timer()
  timer.schedule(timerTask, timeout.toMillis)
  promise.future
}

val f = createFutureWithDelay(5, 5 minutes).flatMap(_ => createFutureWithDelay(5, 5 minutes))

val f2 = wrapFutureWithTimeout(f, 5 seconds, new Throwable("ENDING with timeout"))

f2.onComplete({
  case Success(value) => println(s"success - $value")
  case Failure(t) => println(s"failure - ${t.getMessage}")
})
...