Nested Future.sequence последовательно выполняет включенные фьючерсы - PullRequest
5 голосов
/ 24 апреля 2020

У меня есть будущее (doFour), которое выполняется и результаты передаются в плоскую карту. Внутри плоской карты я выполняю еще две будущие ( doOne и doTwo ) функции, ожидая, что они будут работать параллельно, но я вижу, что они работают последовательно (2.13). Scast ie

Почему doOne и doTwo не выполняются параллельно?

Как я могу запустить их параллельно?

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

object Test {
  def doOne(): Future[Unit] = Future {
    println("startFirst");      Thread.sleep(3000);     println("stopFirst")
  }

  def doTwo(): Future[Unit] = Future {
    println("startSecond");      Thread.sleep(1000);      println("stopSecond")
  }


  def doFour(): Future[Unit] = Future {
    println("do 4");     Thread.sleep(1000);     println("done 4")

  }


  def main(args: Array[String]) {


    val resOut = doFour().flatMap { a =>

      val futureOperations = Seq(doOne(), doTwo())

      val res = Future.sequence(futureOperations)
      res
    }

    val stream = Await.result(resOut, Duration.Inf)
  }
}

1 Ответ

7 голосов
/ 24 апреля 2020

A Future получает право на выполнение, как только оно создано. Таким образом, эта строка создает два Futures, которые потенциально могут быть выполнены:

val futureOperations = Seq(doOne(), doTwo())

Вызов Future.sequence создаст новый Future, который ждет для каждого из фьючерсов на завершить по очереди, но они оба уже будут доступны для выполнения к этому моменту в коде.

val res = Future.sequence(futureOperations)

Если вы хотите, чтобы Future с запускали последовательно, вам нужно использовать map/flatMap:

val res = doOne().map( _ => doTwo())

С этим кодом doTwo не будет вызываться до тех пор, пока не завершится doOne (и вовсе не произойдет сбой doOne)

Причина, по которой этого не происходит похоже, что в вашем примере это то, что вы вызываете блокирующую операцию в Future, которая блокирует поток, который в противном случае использовался бы для выполнения других Future s. Таким образом, хотя для выполнения доступно два Future, на самом деле выполняется только один.

Если вы пометите код как blocking, он будет работать правильно:

import scala.concurrent.blocking

def doOne(): Future[Unit] = Future {
  blocking{println("startFirst");      Thread.sleep(3000);     println("stop First")}
}

def doTwo(): Future[Unit] = Future {
  blocking{println("startSecond");      Thread.sleep(1000);      println("stop Second")}
}

См. Раздел комментариев для подробностей того, почему поведение по умолчанию отличается в разных версиях, и почему вы никогда не должны делать предположений относительно относительного порядка выполнения независимых Future s.

...