Как предотвратить будущее никогда не заканчиваться - PullRequest
1 голос
/ 12 июня 2019

Предположим, у меня есть несколько задач, которые я хочу выполнять параллельно.

Каждая задача (метод) имеет внутреннюю рекурсивную функцию, которая в основном извлекает данные из базы данных и сохраняет их в некотором хранилище данных.

[упрощенная внутренняя рекурсивная функция]

 def simplifiedSomeTask(): Unit = {
    @scala.annotation.tailrec
    def get(
        stream: Stream[SomeEntity],
        result: Seq[SomeEntity],
    ): Stream[SomeEntity] = result match {
      case Nil =>
        stream
      case _ =>
        val query = //query to fetch data from database
        get(
          stream append result.toStream,
          query.run.value, // get fetched data from database
        )
    }

    val buffer = collection.mutable.Map.empty[String, String]

    get(
      Stream.empty,
      query.run.value
    ).foreach { r =>
      buffer.put(r.loginId, r.userId)
    }
  }

При попытке запустить A, Future никогда не завершается по какой-то причине.

[A]

val f1 =Future { someTask1() }
val f2 =Future { someTask2() }
val f3 =Future { someTask3() }

val f = for {
  _ <- f1 
  _ <- f2 
  _ <- f3 
} yield ()

Await.result(f, Duration.Inf)

Однако, B работает (хотя он не работает параллельно)

[B]

val f = for {
  _ <- Future { someTask1() }
  _ <- Future { someTask2() }
  _ <- Future { someTask3() }
} yield ()

Await.result(f, Duration.Inf)

Как мне изменить A, чтобы он работал как положено?

Ответы [ 3 ]

2 голосов
/ 12 июня 2019

Я не могу воспроизвести вашу проблему, но причина странного поведения может заключаться в том, что ваш синтаксис в первом примере не совсем корректен. Вы должны написать свой первый для понимания, как:

val f = for {
  _ <- f1
  _ <- f2
  _ <- f3
} yield ()

Но для понимания работает последовательно, и единственная причина, по которой ваши фьючерсы работают параллельно в вашем первом примере, заключается в том, что фьючерсы начинаются с нетерпением ( «Будущее начинается сейчас» ).

Если вы хотите убедиться, что фьючерсы будут выполняться параллельно, используйте Future.sequence:

val f = Future.sequence(
  List(
    Future { someTask1() },
    Future { someTask2() },
    Future { someTask3() }
  )
)
0 голосов
/ 27 июня 2019

Оказалось, что некоторые ссылки окружности при создании объектов query вызывали эту проблему.

0 голосов
/ 12 июня 2019

Проблема не в понимании, а в ваших задачах.Потенциально существует какой-то тупик от их параллельного запуска, но сначала я бы трижды проверил, что они не попадают в бесконечный цикл.Глядя на ваш пример, это может легко произойти, если query.run.value никогда не вернется пустым, и тогда рекурсия будет продолжаться вечно.Если любой из f1, f2 и f3 не разрешаются, то f, конечно же, никогда не разрешится.

...