Как настроить конкретный диспетчер для Akka HTTP? - PullRequest
0 голосов
/ 11 февраля 2019

У меня есть простое приложение Akka HTTP, в котором используется websocket.Мой обработчик запросов имеет блокирующие вызовы (например, JDBC).Итак, мне нужно использовать некоторый пул потоков фиксированного размера для работы с таким кодом.

Итак, как я понимаю, я должен использовать application.conf (вот так - https://github.com/mkuthan/example-akka-http/blob/master/src/main/resources/application.conf). Но я понятия не имею, как настроить пользовательский пул потоков с постоянно фиксированными потоками.

Когда я запускаю свое приложение и выполняю дамп потока, я вижу два имени потоков:

  • akka-system-akka.actor.default-dispatcher-62
  • Routes-akka.actor.default-dispatcher-79

Я не понимаю, что такоеэто эти пулы потоков.

Я попытался настроить пул потоков по умолчанию, например:

akka {
  actor {
    default-dispatcher {
      type = Dispatcher
      executor = "thread-pool-executor"
      thread-pool-executor {
        fixed-pool-size = 40
      }
    }
  }
}

это работает очень странно:

  • Каждый пул потоков имеетУ меня 40 потоков, поэтому у меня 80 потоков. Как я понимаю, у каждого запущенного диспетчера будет 40 собственных потоков. Это плохо.
  • Это не FixedThreadPool - https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool(int). Когда приложение запускается. потоков нет. Затем, когда приложение обработало несколько запросов, потоки создаются.поступающие запросы на некоторое время, темы умерли.

1 Ответ

0 голосов
/ 12 февраля 2019

Вы, вероятно, запустили две отдельные ActorSystems с двумя именами akka-system-akka и Routes-akka, потому что в журналах вы видите два типа имен потоков.

thread-pool-executor - это ThreadPoolExecutor , определяемый средой выполнения Java, и он не FixedThreadPool.

ThreadPoolExecutor автоматически корректирует размер пула (см. GetPoolSize ()) в соответствии сдо границ, установленных corePoolSize (см. getCorePoolSize ()) и MaximumPoolSize (см. getMaximumPoolSize ()).Когда новая задача отправляется в метод execute (Runnable) и выполняется меньше потоков corePoolSize, создается новый поток для обработки запроса, даже если другие рабочие потоки неактивны.Если запущено больше потоков corePoolSize, но меньше MaximumPoolSize, новый поток будет создан, только если очередь заполнена.Установив одинаковые значения corePoolSize и MaximumPoolSize, вы создаете пул потоков фиксированного размера.Устанавливая в MaximumPoolSize практически неограниченное значение, такое как Integer.MAX_VALUE, вы разрешаете пулу размещать произвольное количество одновременных задач.Чаще всего размер ядра и максимальный пул устанавливаются только при создании, но они также могут изменяться динамически с помощью setCorePoolSize (int) и setMaximumPoolSize (int).

Таким образом, вы можете видеть, что он начинается с меньшего количествачем определенный размер и увеличивает его количество потоков в зависимости от входящих задач.

Прочтите раздел для Keep-alive times, который объясняет, как завершаются потоки.

Переходя к рассматриваемой проблеме:

Не переопределяйте default-dispatcher, так как он используется akka для других целей, то есть для передачи сообщений для исполнения всем акторам, и это может быть легко неправильно настроено.,И, самое главное, не запускайте задачи блокировки в диспетчере по умолчанию (для получения более подробной информации прочитайте akka docs ).Вместо этого введите отдельного диспетчера и выполните там свой код блокировки.

Вот пример, как это сделать с обычным http-запросом (извините, я раньше не использовал веб-сокеты)

  val asyncHandler: HttpRequest => Future[HttpResponse] = { req =>
    val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
    Future {
      // blocking call 
      HttpRespone(???)
    }(blockingExecutionContext) // this can be passed implicitly too
  }
  Http().bindAndHandleAsync(asyncHandler, "localhost")

blocking-dispatcher должен быть настроен в application.conf аналогичнок тому, что вы сделали с default-dispatcher, но это определено в корне файла конфигурации.

blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    core-pool-size-min = 2
    core-pool-size-factor = 2.0
    core-pool-size-max = 10
  }
}

В общем, изолируйте выполнение блокировки в отдельном контексте выполнения, который вы можете настроить специально для этого типа операции блокировки.Такой подход легче настроить, так как типы задач выполнения ограничены и контролируются вами.

...