Как работает I / O в Akka? - PullRequest
28 голосов
/ 30 июня 2011

Как работает модель актера (в Akka), когда вам нужно выполнить ввод / вывод (т. Е. Операцию с базой данных)?

Насколько я понимаю, операция блокировки вызовет исключение (и, по сути, разрушит весь параллелизм из-за четкой природы Netty, которую использует Akka). Следовательно, мне пришлось бы использовать Future или что-то подобное - однако я не понимаю модель параллелизма.

  1. Может ли 1 субъект обрабатывать несколько сообщений одновременно?
  2. Если актер делает блокирующий вызов в future (т. Е. future.get()), это блокирует только выполнение текущего актера; или это предотвратит выполнение на всех актерах, пока не завершится блокирующий вызов?
  3. Если он блокирует все выполнение, как использование будущего помогает параллелизму (т. Е. Не вызовет ли блокирующие вызовы в будущем по-прежнему равносильно созданию актера и выполнению блокирующего вызова)?
  4. Каков наилучший способ справиться с многоступенчатым процессом (т. Е. Чтение из базы данных; вызов веб-службы блокировки; чтение из базы данных; запись в базу данных), где каждый шаг зависит от последнего?

Основной контекст таков:

  • Я использую сервер Websocket, который будет поддерживать тысячи сеансов.
  • Каждый сеанс имеет некоторое состояние (например, данные аутентификации и т. Д.);
  • Клиент Javascript отправит на сервер сообщение JSON-RPC, которое передаст его соответствующему субъекту сеанса, который выполнит его и вернет результат.
  • Выполнение вызова RPC потребует некоторых операций ввода-вывода и блокировки вызовов.
  • Будет большое количество одновременных запросов (каждый пользователь будет делать значительное количество запросов через соединение WebSocket, и будет много пользователей).

Есть ли лучший способ добиться этого?

Ответы [ 3 ]

28 голосов
/ 30 июня 2011

Операции блокировки не выдают исключения в Akka. Вы можете блокировать вызовы от актера (что вы, вероятно, хотите минимизировать, но это уже другая история).

  1. нет, 1 экземпляр актера не может.
  2. Это не будет блокировать других актеров. Вы можете повлиять на это, используя определенный Диспетчер. Фьючерсы используют диспетчер по умолчанию (обычно глобальный, управляемый событиями), поэтому он работает в потоке в пуле. Вы можете выбрать, какой диспетчер вы хотите использовать для своих актеров (для каждого актера или для всех). Я думаю, что если вы действительно хотите создать проблему, вы можете передать точно такой же (основанный на потоках) диспетчер фьючерсам и актерам, но это потребует некоторых намерений с вашей стороны. Я предполагаю, что если у вас огромное количество блокируемых фьючерсов на неопределенный срок, а служба executorservice настроена на фиксированное количество потоков, вы можете взорвать службу executorservice. Так много «если». f.get блокируется, только если будущее еще не завершено. Он заблокирует «текущий поток» актера, из которого вы его вызываете (если вы вызываете его из актера, что, кстати, не обязательно)
  3. вам не обязательно блокировать. Вы можете использовать обратный вызов вместо f.get. Вы даже можете сочинять фьючерсы без блокировки. зацените разговор Виктора о «многообещающем будущем Акки» для более подробной информации: http://skillsmatter.com/podcast/scala/talk-by-viktor-klang
  4. Я бы использовал асинхронную связь между шагами (если шаги сами по себе являются значимыми процессами), поэтому используйте актера для каждого шага, где каждый актер отправляет одностороннее сообщение следующему, возможно, также одностороннее сообщение другому актеру это не будет блокировать, который может контролировать процесс. Таким образом, вы могли бы создавать цепочки актеров, из которых вы могли бы создать много, перед ним вы могли бы поместить актера с балансировкой нагрузки, чтобы, если один актер блокировал в одной цепочке, другой такого же типа мог не быть в другой цепочке. Это также сработало бы для вашего «контекстного» вопроса, передачи рабочей нагрузки местным субъектам, связывания их за субъектом балансировки нагрузки.

Что касается netty (и я предполагаю, что вы имеете в виду Remote Actors, потому что это единственное, для чего netty используется в Akka), передайте вашу работу как можно скорее локальному субъекту или будущему (с обратным вызовом), если Вы беспокоитесь о сроках или о том, как нетти может каким-то образом выполнять свою работу.

10 голосов
/ 30 июня 2011

Операции блокировки обычно не генерируют исключения, но ожидание в будущем (например, с помощью методов отправки !! или !!!) может вызвать исключение тайм-аута.Вот почему вы должны как можно больше придерживаться принципа «забей и забудь», использовать значимое значение времени ожидания и отдавать предпочтение обратным вызовам, когда это возможно.

  1. Актер akka не может явно обрабатывает несколько сообщений подряд, но вы можете играть со значением throughput через файл конфигурации.Затем субъект будет обрабатывать несколько сообщений (т.е. его метод приема будет вызываться несколько раз последовательно), если его очередь сообщений не пуста: http://akka.io/docs/akka/1.1.3/scala/dispatchers.html#id5

  2. Операции блокировки внутри субъекта не будут "блокировать" всеактеры, но если вы разделяете потоки между актерами (рекомендуемое использование), один из потоков диспетчера будет заблокирован до возобновления операций.Поэтому постарайтесь составить как можно больше фьючерсов и остерегайтесь значения времени ожидания).

3 и 4. Я согласен с ответами Раймонда.

1 голос
/ 27 января 2014

Как сказал Рэймонд и парадигматик, но также, если вы хотите избежать истощения пула потоков, вы должны заключить любые операции блокировки в scala.concurrent.blocking.

Конечно, лучше избегать блокирующих операций, но иногда вам нужно использовать библиотеку, которая блокирует. Если вы закроете указанный код в blocking, это позволит контексту выполнения знать, что вы, возможно, блокируете этот поток, чтобы он мог выделить другой, если это необходимо.

Проблема хуже, чем описывает парадигма, поскольку, если у вас есть несколько операций блокировки, вы можете в конечном итоге заблокировать все потоки в пуле потоков и не иметь свободных потоков. Вы можете оказаться в тупике, если все ваши потоки заблокированы из-за чего-то, что не произойдет, пока не будет назначен другой актер / будущее.

Вот пример:

import scala.concurrent.blocking
...

Future {
  val image = blocking { load_image_from_potentially_slow_media() }
  val enhanced = image.enhance()
  blocking {
    if (oracle.queryBetter(image, enhanced)) {
      write_new_image(enhanced)
    }
  }
  enhanced
}

Документация здесь .

...