scala Будущее с обратным вызовом и переключением контекста - PullRequest
0 голосов
/ 08 января 2020

Я пытаюсь избежать переключения контекста на моих обратных вызовах карты будущего. Я вижу, что у akka есть SameThreadExecutionContext, который должен иметь дело с этим типом обратного вызова, но я не уверен, что полностью его понимаю:

  val ec1 = ExecutionContext.fromExecutorService(...)
  val ec2 = ExecutionContext.fromExecutorService(...)

  println("0 " + Thread.currentThread().getName)
  def futureOnEc1 = Future {
    println(s"1 " + Thread.currentThread().getName)
  }(ec1)

  futureOnEc1.map { a =>
    println(s"2 " + Thread.currentThread().getName)
    a + 1
  }(AkkaSameThreadExecutionContext)

Я думал, что получу:

0 pool-2-thread-1
1 pool-1-thread-1
2 pool-1-thread-1

, но фактический результат -

0 pool-2-thread-1
1 pool-1-thread-1
2 pool-2-thread-1

что мне не хватает? смысл в том, чтобы запустить обратный вызов в том же потоке будущего, а не в потоке, который вызывает исходное будущее.

Ответы [ 2 ]

2 голосов
/ 08 января 2020

Аккуратный трюк, чтобы избежать переключения контекста при использовании Scala Future s, заключается в использовании parasitic в качестве ExecutionContext, который "крадет время выполнения из других потоков, поскольку его Runnables запускаются в потоке, который вызывает execute и затем возвращая управление вызывающей стороне после того, как все были выполнены его Runnables ". parasitic доступен с Scala 2.13, но вы можете легко понять его и перенести его на проекты до 2.13, посмотрев его код (здесь для версии 2.13.1) . Наивная, но работающая реализация для проектов до 2.13 просто запускает Runnable s, не заботясь о распределении их в потоке, что делает трюк, как в следующем фрагменте:

object parasitic212 extends ExecutionContext {

  override def execute(runnable: Runnable): Unit =
    runnable.run()

  // reporting failures is left as an exercise for the reader
  override def reportFailure(cause: Throwable): Unit = ???

}

The * Реализация 1013 *, конечно, более тонкая. Для более глубокого понимания рассуждений и некоторых предостережений относительно их использования я бы предложил вам сослаться на PR и введенный parasitic в качестве общедоступного API (он уже был реализован, но зарезервирован для внутреннего использования).

Цитируя оригинальное описание PR:

В рамках реализации Future долгое время использовался синхронный трамплин, ExecutionContext для запуска управляемого лога c как можно дешевле.

Я полагаю, что существует значительное количество случаев, когда для эффективности имеет смысл выполнять logi c синхронно безопасным (-i sh) способом, не имея пользователей для реализации logi c для этого ExecutionContext - это сложно реализовать, если не сказать больше.

Важно помнить, что ExecutionContext должен быть предоставлен через неявный параметр, чтобы вызывающая сторона могла решить, где должен logi c быть казненным. Использование ExecutionContext.parasiti c означает, что logi c может в конечном итоге работать в потоках / пулах, которые не были предназначены или предназначены для запуска указанных logi c. Например, вы можете в конечном итоге запустить привязанную к ЦП логи c в пуле, спроектированном в IO, или наоборот. Поэтому использование паразити c рекомендуется только тогда, когда это действительно имеет смысл. Существует также реальный риск попадания в StackOverflowErrors для определенных шаблонов вложенных вызовов, когда глубокая цепочка вызовов заканчивается в исполнителе parasiti c, что приводит к еще большему использованию стека в последующем выполнении. В настоящее время parasiti c ExecutionContext разрешает вложенную последовательность вызовов максимум 16, это может быть изменено в будущем, если будет обнаружено, что это вызывает проблемы.

Как указано в официальном документацию для parasitic, рекомендуется использовать только тогда, когда исполняемый код быстро возвращает управление вызывающей стороне. Вот документация, приведенная для версии 2.13.1:

ПРЕДУПРЕЖДЕНИЕ: всегда выполняйте только logi c, который быстро вернет управление вызывающей стороне.

Этот ExecutionContext крадет время выполнения у другие потоки, запустив свои Runnables в потоке, который вызывает execute и затем возвращает вызывающему элементу контроль после того, как all его Runnables были выполнены. Вложенные вызовы execute будут запутаны для предотвращения неконтролируемого увеличения пространства стека.

При использовании parasiti c с абстракциями, такими как Future, во многих случаях будет не определено c относительно того, какой поток будет выполняться логи c, поскольку это зависит от того, когда / если это Будущее завершено.

Не не вызывает любой код блокировки в Runnables, переданных этому ExecutionContext, так как это предотвратит прогресс других ставит в очередь Runnables и вызывающий поток.

Симптомы неправильного использования этого ExecutionContext включают, но не ограничиваются этим, взаимоблокировки и серьезные проблемы с производительностью.

Любые исключения NonFatal или InterruptedException будут сообщаться в defaultReporter.

2 голосов
/ 08 января 2020

Обратный вызов вызывается в том же пуле потоков ec1, когда будущее еще не завершено. Проверьте это с добавлением Thread.sleep(1000) в ваше тело Future.

Этот код работает так, как вы ожидаете

  println("0 " + Thread.currentThread().getName)

  val futureOnEc1 = Future {
    Thread.sleep(1000)
    println(s"1 " + Thread.currentThread().getName)
    0
  }(ec1)

  futureOnEc1.map { a =>
    println(s"2 " + Thread.currentThread().getName)
    a + 1
  }(sameThreadExecutionContext)

Отпечатки

0 main
1 pool-1-thread-1
2 pool-1-thread-1

Но если будущее завершено обратный вызов выполняется немедленно потоком, который его регистрирует.

Удалить Thread.sleep и тот же код печатает после

0 main
1 pool-1-thread-1
2 main

Редактировать:

Документы из scala.concurrent.Future#onComplete указывают это поведение.

Когда это будущее завершено, либо через исключение, либо через значение, примените предоставленную функцию. Если будущее уже завершено, оно будет применено немедленно или запланировано асинхронно.

И с scala.concurrent.impl.Promise.DefaultPromise#dispatchOrAddCallback

Пытается добавить обратный вызов, если уже завершено, отправляет ответный вызов для выполнения.

...