Актерская "очередь"? - PullRequest
       6

Актерская "очередь"?

3 голосов
/ 08 июня 2010

В Java, чтобы написать библиотеку, которая делает запросы к серверу, я обычно реализую своего рода диспетчер (не похожий на тот, который находится здесь в библиотеке Twitter4J: http://github.com/yusuke/twitter4j/blob/master/twitter4j-core/src/main/java/twitter4j/internal/async/DispatcherImpl.java), чтобы ограничить количество соединений, чтобывыполнять асинхронные задачи и т. д.

Идея состоит в том, что создается число потоков N. "Задача" ставится в очередь, и все потоки уведомляются, и один из потоков, когда он будет готов, извлечет элемент изочереди, выполните работу, а затем вернитесь в состояние ожидания. Если все потоки заняты работой над задачей, то задача просто ставится в очередь, и следующий доступный поток будет ее принимать.

Это сохраняетмаксимальное количество подключений к N и позволяет одновременно работать не более N задач.

Мне интересно, какую систему я могу создать с актерами, которые будут выполнять то же самое?способ иметь N акторов, а когда новое сообщение готово, передать его актору для обработки - и если все актеры заняты, просто поставьте в очередь сообщениее

Ответы [ 2 ]

4 голосов
/ 08 июня 2010

Akka Framework предназначен для решения подобных проблем и является именно тем, что вы ищете.

Посмотрите это document - есть много настраиваемых диспетчера (на основе событий, на основе потоков, с балансировкой нагрузки, кражей работ и т. Д.), Которые управляют почтовыми ящиками участников и позволяют им работать в конъюнкции. Вы также можете найти интересную эту запись в блоге .

Например. этот код создает экземпляр Work Stealing Dispatcher на основе фиксированного пула потоков, который выполняет балансировку нагрузки между субъектами, которых он контролирует:

  val workStealingDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
  workStealingDispatcher
  .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
  .setCorePoolSize(16)
  .buildThreadPool

Актер, использующий диспетчера:

class MyActor extends Actor {

    messageDispatcher = workStealingDispatcher

    def receive = {
      case _ =>
    }
  }

Теперь, если вы запустите 2+ экземпляра субъекта, диспетчер уравновесит нагрузку между почтовыми ящиками (очередями) субъектов (субъект, у которого слишком много сообщений в почтовом ящике, "пожертвует" часть субъектам, у которых ничего нет делать).

1 голос
/ 08 июня 2010

Что ж, вы должны посмотреть на планировщик актёров, так как актёры обычно не 1-к-1 с потоками. Идея, стоящая за актерами, заключается в том, что их может быть много, но фактическое количество потоков будет ограничено чем-то разумным. Они также не должны работать долго, а скорее быстро отвечать на полученные сообщения. Короче говоря, архитектура этого кода, кажется, полностью расходится с тем, как можно проектировать систему актера.

Тем не менее, каждый работающий субъект может отправить сообщение субъекту очереди с просьбой о следующем задании, а затем вернуться к началу реакции. Этот субъект очереди должен получать либо сообщения об очередях, либо сообщения об исключении из очереди. Это может быть оформлено так:

val q: Queue[AnyRef] = new Queue[AnyRef]
loop {
  react {
    case Enqueue(d) => q enqueue d
    case Dequeue(a) if q.nonEmpty => a ! (q dequeue)
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...