Scala / Akka pull pattern - менеджер получает больше работы от рабочих - PullRequest
0 голосов
/ 10 мая 2018

Я ищу помощь в моем хобби-проекте. Я пытаюсь найти лучший способ, как менеджер в моем шаблоне извлечения будет обрабатывать обновления списка задач (список работ, которые необходимо выполнить)

Я реализовал шаблон извлечения, где есть Фабрика заданий, Менеджер и Рабочий. Менеджер получает новые задания от фабрики заданий с помощью планировщика akka. Каждый раз, когда появляется новая работа, Рабочий получает уведомление и начинает жевать.

Что может быть странным в моей реализации, так это то, что Worker порождает новые задачи, которые также должны быть выполнены. На данный момент я решил это с помощью рекурсивных сообщений от Рабочего к себе.

Вот очень упрощенное представление об этом:

class Worker extends Actor {
  def receive = {
    case Work0 ⇒ self ! Work1
    case Work1 ⇒ // ...
  }
}

Однако мне не нравится этот подход. Я хотел бы, чтобы уже существующий список задач был расширен на стороне менеджера и позволил работнику отправлять новые задания менеджеру, если появятся новые задачи.

class Worker extends Actor {
  def receive = {
    case Work0 ⇒ Manager ! Work1
    case Work1 ⇒ // ...
  }
}

Вот решение, как создать рабочий буфер внутри диспетчера, который может динамически изменяться. Всякий раз, когда работник отправляет работу менеджеру, он будет добавлен поверх буфера. В конце концов Worker будет проходить через все задачи, которые менеджер имеет внутри буфера.

В примере не использовались какие-либо параметры самого шаблона извлечения, идея состояла в том, чтобы просто проверить, как заставить работать буфер.

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

class Manager extends Actor {
  var iterator: Iterator[Int] = Iterator.empty
  var buffer: Option[mutable.ArrayBuffer[Int]] = None
  var iteratorCounter: Int = 0

  def receive = {
    case MyBuffer(workBuffer: mutable.ArrayBuffer[Int]) ⇒
      buffer = Some(workBuffer)
      iterator = workBuffer.iterator
    case "iterate" ⇒
      if (iterator.hasNext) {
        iterator.next() // This will be sent to worker as a Task to process
        iteratorCounter += 1
      } else if (buffer.get.length > iteratorCounter) {
        iterator = buffer.get.iterator
        iterator = iterator.drop(iteratorCounter)
      } else {
        iteratorCounter = 0
      }
    case "add" ⇒ // "Worker adds" new stuff to Manager
      val random = scala.util.Random
      val newVal = random.nextInt(100)
      val append = (buf: Option[ArrayBuffer[Int]], element: Int) => Some(buf.get += element)
      buffer = append(buffer, newVal)

    case _ => println("huh?")
  }
}

case class MyBuffer(workBuffer: ArrayBuffer[Int])
val buffer = mutable.ArrayBuffer(1,2,3,4,5,6,7,8,9,10)

val system = ActorSystem("PullPattern")
val helloActor = system.actorOf(Props(new Manager))
helloActor ! MyBuffer(buffer)
var a = 0

for (a <- 1 to 5) {
  helloActor ! "iterate"  
}

a = 0
for (a <- 1 to 3) {
  helloActor ! "add"  
}
a = 0
for (a <- 1 to 10) {
  helloActor ! "iterate"  
}

Ссылка Scalafiddle.

Мои вопросы:

  1. Это как создать динамически изменяющийся рабочий буфер внутри актера akka? Возможно, есть лучший способ решить эту проблему?
  2. Если это не самая плохая идея, тогда как насчет реализации. Наличие iteratorCounter кажется мне немного странным. Запрашиваемая длина итератора перемещает его указатель. Есть ли другой способ обойти это?
  3. Может быть, у Менеджера вообще не должно быть изменяющегося буфера в отношениях «работник-тяга», где работник способен создавать новые задачи? В настоящее время я не могу видеть никаких проблем с этим решением, но возможно, что я не задаю правильные вопросы.

1 Ответ

0 голосов
/ 10 мая 2018
  1. Вместо mutable.ArrayBuffer и Iterator я бы предложил использовать immutable.Queue для постановки в очередь и enqueue/dequeue для отправки / извлечения работы в / из очереди. Из вашего примера кода видно, что вы работаете в режиме FIFO, поэтому очередь лучше всего вам подойдет.

  2. Я бы не рекомендовал использовать Iterator (который является изменяемым) в качестве указателя. В частности, кроме next/hasNext небезопасно использовать итератор после вызова метода для него (см. Scala doc ). В очереди FIFO итератор не понадобится.

  3. Обычно актер-менеджер ведет внутреннюю очередь / карту / и т. Д., Чтобы отслеживать задачи для рабочих-актеров. Когда это применимо, я бы выбрал неизменную коллекцию поверх изменяемой. Если необходимо, сделайте неизменную коллекцию private var, хотя вы можете hot-swap внутреннее состояние актера через context.become , если хотите избежать var.

Re: рабочие системы с моделью вытягивания с использованием актеров Akka, вот хорошая статья . Существует также пример приложения распределенной рабочей системы от Lightbend, основанного на модели pull.

...