Запуск Future.map внутри Future с помощью одного потока ForkJoinPool - PullRequest
0 голосов
/ 13 сентября 2018

У меня был код, который использовал сборку ExecutionContext (EC) с akka (ActorSystem). Этот код делает что-то довольно своеобразное: он использует AkkaForkJoinPool с parallelism-max = 1 и выполняет что-то вроде:

implicit ec = // akka EC backed by AkkaForkJoinPool with parallelism=1

Future{ // (1)
  // (2) get data from DB which uses a separate ExecutionContext for IO
  val data: Future[Data] = getData()

  // (3) use the data
  data.map{ whatEver }

  // etc ...
}

[Редактировать: я знаю, говоря таким образом, это странно иметь верхнее будущее (1). Но на самом деле код не мой, он охватывает несколько функций и использует более сложные операции, такие как несколько заключенных для понимания. Так что я не буду это менять]

Теперь я переместил этот код и заменил неявный ExecutionContext (EC), предоставленный akka, своим собственным по тому же правилу: я использую (java) ForkJoinPool с параллелизмом = 1.

Как следствие, этот код застревает на карте (3). Насколько я понимаю, когда вызывается карта (3), ей требуется поток, но ЕК не может его предоставить, потому что только его доступная берется Future (1).

Мне не ясно, как ForkJoinPool должен работать. Итак, мой вопрос: правильно ли я понял, и:

  1. если нет, то я неправильно использую java ForkJoinPool. То есть Есть ли способ сделать эту работу?
  2. Если да, как Акка управляет этим?

Я использую Акка 2.3.15, Скала 2.11.12 и Ява 8

Ответы [ 2 ]

0 голосов
/ 14 сентября 2018

Глядя в код Акки, я думаю, что нашел то, что он делает. Я не совсем уверен, но почти: akka ActorSystem создает Dispatchers, который создает MessageDispatcherConfigurator, который создает Dispatcher, который создает ExecutorService (я передаю иерархию классов этого). Существует несколько возможных реализаций, но я думаю, что это наиболее распространенная ситуация, и именно это происходит при использовании ForkJoinPool.

Теперь Dispatcher расширяет BatchingExecutor, который может объединять внутреннюю задачу, такую ​​как карта в вопросе (для запуска которой требуется поток), в текущий поток.

Еще раз, код слишком сложен для меня, чтобы быть уверенным, и я не буду исследовать больше. Но действительно, akka EC может обернуть внутренний вызов карты в родительский поток, чего не происходит со стандартным (то есть java) ForkJoinPool.

Я думаю, что это умный трюк от akka, а не типичная реализация. Документ BatchingExecutor говорит:

/**
 * Mixin trait for an Executor
 * which groups multiple nested `Runnable.run()` calls
 * into a single Runnable passed to the original
 * Executor. This can be a useful optimization
 * because it bypasses the original context's task
 * queue and keeps related (nested) code on a single
 * thread which may improve CPU affinity. However,
 * if tasks passed to the Executor are blocking
 * or expensive, this optimization can prevent work-stealing
 * and make performance worse. Also, some ExecutionContext
 * may be fast enough natively that this optimization just
 * adds overhead.
 * The default ExecutionContext.global is already batching
 * or fast enough not to benefit from it; while
 * `fromExecutor` and `fromExecutorService` do NOT add
 * this optimization since they don't know whether the underlying
 * executor will benefit from it.
 * A batching executor can create deadlocks if code does
 * not use `scala.concurrent.blocking` when it should,
 * because tasks created within other tasks will block
 * on the outer task completing.
 * This executor may run tasks in any order, including LIFO order.
 * There are no ordering guarantees.
 *
 * WARNING: The underlying Executor's execute-method must not execute the submitted Runnable
 * in the calling thread synchronously. It must enqueue/handoff the Runnable.
 */
0 голосов
/ 13 сентября 2018

Вместо того, чтобы обернуть все в будущем, используйте для понимания результат первого будущего, поскольку все зависит от него.

for {
  data <- getData()
} yield data.map( whatEver )

или

getData().map { data =>
  data.map { whatEver }
}
...