Я ищу помощь в моем хобби-проекте. Я пытаюсь найти лучший способ, как менеджер в моем шаблоне извлечения будет обрабатывать обновления списка задач (список работ, которые необходимо выполнить)
Я реализовал шаблон извлечения, где есть Фабрика заданий, Менеджер и Рабочий. Менеджер получает новые задания от фабрики заданий с помощью планировщика akka. Каждый раз, когда появляется новая работа, Рабочий получает уведомление и начинает жевать.
Что может быть странным в моей реализации, так это то, что Worker порождает новые задачи, которые также должны быть выполнены. На данный момент я решил это с помощью рекурсивных сообщений от Рабочего к себе.
Вот очень упрощенное представление об этом:
class Worker extends Actor {
def receive = {
case Work0 ⇒ self ! Work1
case Work1 ⇒ // ...
}
}
Однако мне не нравится этот подход. Я хотел бы, чтобы уже существующий список задач был расширен на стороне менеджера и позволил работнику отправлять новые задания менеджеру, если появятся новые задачи.
class Worker extends Actor {
def receive = {
case Work0 ⇒ Manager ! Work1
case Work1 ⇒ // ...
}
}
Вот решение, как создать рабочий буфер внутри диспетчера, который может динамически изменяться. Всякий раз, когда работник отправляет работу менеджеру, он будет добавлен поверх буфера. В конце концов Worker будет проходить через все задачи, которые менеджер имеет внутри буфера.
В примере не использовались какие-либо параметры самого шаблона извлечения, идея состояла в том, чтобы просто проверить, как заставить работать буфер.
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
class Manager extends Actor {
var iterator: Iterator[Int] = Iterator.empty
var buffer: Option[mutable.ArrayBuffer[Int]] = None
var iteratorCounter: Int = 0
def receive = {
case MyBuffer(workBuffer: mutable.ArrayBuffer[Int]) ⇒
buffer = Some(workBuffer)
iterator = workBuffer.iterator
case "iterate" ⇒
if (iterator.hasNext) {
iterator.next() // This will be sent to worker as a Task to process
iteratorCounter += 1
} else if (buffer.get.length > iteratorCounter) {
iterator = buffer.get.iterator
iterator = iterator.drop(iteratorCounter)
} else {
iteratorCounter = 0
}
case "add" ⇒ // "Worker adds" new stuff to Manager
val random = scala.util.Random
val newVal = random.nextInt(100)
val append = (buf: Option[ArrayBuffer[Int]], element: Int) => Some(buf.get += element)
buffer = append(buffer, newVal)
case _ => println("huh?")
}
}
case class MyBuffer(workBuffer: ArrayBuffer[Int])
val buffer = mutable.ArrayBuffer(1,2,3,4,5,6,7,8,9,10)
val system = ActorSystem("PullPattern")
val helloActor = system.actorOf(Props(new Manager))
helloActor ! MyBuffer(buffer)
var a = 0
for (a <- 1 to 5) {
helloActor ! "iterate"
}
a = 0
for (a <- 1 to 3) {
helloActor ! "add"
}
a = 0
for (a <- 1 to 10) {
helloActor ! "iterate"
}
Ссылка Scalafiddle.
Мои вопросы:
- Это как создать динамически изменяющийся рабочий буфер внутри актера akka? Возможно, есть лучший способ решить эту проблему?
- Если это не самая плохая идея, тогда как насчет реализации. Наличие iteratorCounter кажется мне немного странным. Запрашиваемая длина итератора перемещает его указатель. Есть ли другой способ обойти это?
- Может быть, у Менеджера вообще не должно быть изменяющегося буфера в отношениях «работник-тяга», где работник способен создавать новые задачи? В настоящее время я не могу видеть никаких проблем с этим решением, но возможно, что я не задаю правильные вопросы.