Scala Futures с несколькими зависимостями - PullRequest
0 голосов
/ 25 июня 2018

Мне нужно асинхронно вычислить набор функций, которые могут иметь несколько зависимостей друг от друга (без циклов). Например

 class FeatureEncoderMock(val n:String, val deps: List[String] = List.empty) {
      def compute = {
          println(s"starting computation feature $n")
          Thread.sleep(r.nextInt(2500))
          println(s"end computation feature $n")
      }
  }

  val registry = Map(
        "feat1" -> new FeatureEncoderMock("feat1", List("factLogA", "factLogB")),
        "factLogA" -> new FeatureEncoderMock("factLogA"),
        "factLogB" -> new FeatureEncoderMock("factLogB"),
        "feat1" -> new FeatureEncoderMock("feat1", List("factLogA", "factLogB")),
        "feat2" -> new FeatureEncoderMock("feat2", List("factLogA")),
        "feat3" -> new FeatureEncoderMock("feat3", List("feat1")),
        "feat4" -> new FeatureEncoderMock("feat4", List("feat3", "factLogB"))
  )

Чего я хочу добиться, так это вызвать единственную функцию на feat4, которая запустит вычисление всех зависимых функций и позаботится о зависимостях между ними. Я пробовал с этим

def run(): Unit = {
val requested = "feat4"

val allFeatures = getChainOfDependencies(requested)

val promises = allFeatures.zip(Seq.fill(allFeatures.size)(Promise[Unit])).toMap

def computeWithDependencies(f: String) = Future {
  println(s"computing $f")
  val encoder = registry(f)

  if(encoder.deps.isEmpty) {
    promises(f).success(registry(f).compute)
  }
  else {
    val depTasks = promises.filterKeys(encoder.deps.contains)

    val depTasksFuture = Future.sequence(depTasks.map(_._2.future))

    depTasksFuture.onSuccess({
      case _ =>
        println(s"all deps for $f has been computed")
        promises(f).success(registry(f).compute)
        println(s"done for $f")
    })
  }
 }

computeWithDependencies(requested)
}

Но я не могу понять, почему порядок исполнения не такой, как ожидалось. Я не уверен, как правильно кормить будущее внутри обещания. Я совершенно уверен, что этот фрагмент кода неправильный в этой части.

1 Ответ

0 голосов
/ 25 июня 2018

Я думаю, что вы переосмысливаете это своими обещаниями;Future композиция, вероятно, все, что вам нужно.Примерно так:

import scala.collection.mutable

def computeWithDependencies(s: String, cache: mutable.Map[String, Future[Unit]] = mutable.Map.empty)
                           (implicit ec: ExecutionContext): Future[Unit] = {
  cache.get(s) match {
    case Some(f) => f
    case None => {
      val encoder = registry(s)
      val depsFutures = encoder.deps.map(d => computeWithDependencies(d, cache))
      val result = Future.sequence(depsFutures).flatMap(_ => Future { encoder.compute })
      cache += s -> result
      result
    }
  }
}

Вызов flatMap гарантирует, что все фьючерсы зависимостей завершатся до выполнения «текущего» будущего, даже если результат (List[Unit]) игнорируется.Бизнес с кешем состоит в том, чтобы просто предотвратить повторное вычисление, если в графе зависимостей есть «ромб», но его можно исключить, если этого не произойдет, или если вы согласны с повторным вычислением.Во всяком случае, при запуске этого:

val futureResult = computeWithDependencies("feat4")
Await.result(futureResult, 30 seconds)

я вижу этот вывод:

starting computation feature factLogB
starting computation feature factLogA
end computation feature factLogB
end computation feature factLogA
starting computation feature feat1
end computation feature feat1
starting computation feature feat3
end computation feature feat3
starting computation feature feat4
end computation feature feat4

Что мне кажется правильным.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...