Почему этот код Scala выполняет два Futures в одном потоке? - PullRequest
9 голосов
/ 01 июля 2019

Я давно использую несколько потоков, но не могу объяснить такой простой случай.

import java.util.concurrent.Executors
import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))

def addOne(x: Int) = Future(x + 1)
def addTwo(x: Int) = Future {addOne(x + 1)}

addTwo(1)
// res5: Future[Future[Int]] = Future(Success(Future(Success(3))))

К моему удивлению, это работает. И я не знаю почему.

Вопрос:
Почему данный один поток может выполнять два фьючерса одновременно?

Мои ожидания :
Первый Future (addTwo) занимает один-единственный поток (newFixedThreadPool(1)), затем он вызывает другой Future (addOne), которому снова нужен другой поток.
Таким образом, программа должна в конечном итоге испытывать недостаток в потоках и застрять.

Ответы [ 2 ]

9 голосов
/ 01 июля 2019

Причина того, что ваш код работает, заключается в том, что оба фьючерса будут выполняться одним и тем же потоком.Создаваемый вами ExecutionContext не будет использовать Thread напрямую для каждого Future, а вместо этого будет планировать выполнение задач (Runnable экземпляров).Если в пуле больше нет доступных потоков, эти задачи будут помещены в BlockingQueue в ожидании выполнения.(Подробнее см. API ThreadPoolExecutor )

Если вы посмотрите на реализацию Executors.newFixedThreadPool(1), вы увидите, что создается Исполнитель с неограниченной очередью:

new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable])

Чтобы получить эффект истощения потока, который вы искали, вы можете самостоятельно создать исполнителя с ограниченной очередью:

 implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L, 
                     TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))

Поскольку минимальная емкость ArrayBlockingQueue равна 1, вам потребуется три фьючерса.чтобы достичь предела, и вам также необходимо добавить некоторый код, который будет выполняться в результате будущего, чтобы не допустить их завершения (в приведенном ниже примере я делаю это путем добавления .map(identity))

.В следующем примере

import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L, 
                      TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))

def addOne(x: Int) = Future {
  x + 1
}
def addTwo(x: Int) = Future {
  addOne(x + 1) .map(identity)
}
def addThree(x: Int) = Future {
  addTwo(x + 1).map(identity)
}

println(addThree(1))

завершается с

java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@65a264b6 rejected from java.util.concurrent.ThreadPoolExecutor@10d078f4[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 1]
3 голосов
/ 01 июля 2019

расширить до Promise легко понять

val p1 = Promise[Future[Int]]
ec.execute(() => {
  // the fist task is start run
  val p2 = Promise[Int]
  //the second task is submit , but no run
  ec.execute(() => {
    p2.complete(Success(1))
    println(s"task 2 -> p1:${p1},p2:${p2}")
  })
  //here the p1 is completed, not wait p2.future finish
  p1.complete(Success(p2.future))
  println(s"task 1 -> p1:${p1},p2:${p2}")// you can see the p1 is completed but the p2 have not
  //first task is finish, will run second task
})
val result: Future[Future[Int]] = p1.future

Thread.sleep(1000)
println(result)
...