Я очень стараюсь выполнить это упражнение, но не могу найти способ заставить его работать, даже после копирования примеров с сайта Akka (https://doc.akka.io/docs/akka/current/typed/actors.html).
I нужно просто разделить список целых чисел на n разделов, дать каждому разделу каждый n актер, позволить актору суммировать все их значения и, наконец, вернуть сообщение родительскому актору с результатом, который добавит его в переменную-накопитель. Мне не нужно делать что-то необычное, например, протоколирование, отказоустойчивость, оптимизацию производительности и т. Д. c.
import AdderMain.StartJob
import AdderWorker.ProcessDone
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, LoggerOps}
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
object AdderWorker {
final case class Process(replyTo: ActorRef[ProcessDone], payloadPartition: List[Int])
final case class ProcessDone(result: Int)
def apply(): Behaviors.Receive[Process] =
Behaviors.receiveMessage[Process] {
case Process(replyTo, payloadPartition) =>
println(s"Worker is processing data...")
val result = sumListElements(payloadPartition)
replyTo ! ProcessDone(result)
Behaviors.same
}
private def sumListElements(list: List[Int]): Int = list match {
case Nil => 0
case x :: xs => x + sumListElements(xs)
}
}
object AdderMain extends App {
final case class StartJob(payload: List[Int])
def apply(payload: List[Int]): Behavior[ProcessDone] =
Behaviors.setup { context =>
val payloadPartitions: List[List[Int]] = payload.grouped(4).toList
payloadPartitions.foreach { l =>
val worker = context.spawn(AdderWorker(), "Stuff")
worker ! AdderWorker.Process(context.self, l)
}
Behaviors.receiveMessage { message =>
println(s"Done ${message}")
Behaviors.same
}
}
val payload = List(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)
val system: ActorSystem[AdderMain.StartJob] = ActorSystem(AdderMain(payload), "AdderMain")
}
Как видите, AdderMain
порождает AdderWorker
для каждого раздела списка и отправляет сообщение «Обработка», содержащее список. Я получаю ошибку
Error:(45, 72) type mismatch;
found : akka.actor.typed.Behavior[AdderWorker.ProcessDone]
required: akka.actor.typed.Behavior[Product with java.io.Serializable]
Note: AdderWorker.ProcessDone <: Product with java.io.Serializable, but class Behavior is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
val system: ActorSystem[AdderMain.StartJob] = ActorSystem(AdderMain(payload), "AdderMain")
, но похоже, что это связано с большими проблемами в моем коде ... не стесняйтесь исправлять меня столько, сколько хотите, я только начал с Scala, Акка и функциональное программирование.
PS: я еще не накопил результатов ни одной переменной
РЕДАКТИРОВАТЬ: Вот рабочее решение
import java.util.UUID
import AdderWorker.ProcessDone
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, LoggerOps}
object AdderWorker {
final case class Process(replyTo: ActorRef[ProcessDone], payloadPartition: List[Int])
final case class ProcessDone(result: Int)
def apply(): Behaviors.Receive[Process] =
Behaviors.receiveMessage[Process] {
case Process(replyTo, payloadPartition) =>
println(s"Worker is processing data...")
val result = sumListElements(payloadPartition)
replyTo ! ProcessDone(result)
Behaviors.stopped
}
private def sumListElements(list: List[Int]): Int = list match {
case Nil => 0
case x :: xs => x + sumListElements(xs)
}
}
object BasicAdder extends App {
var counter = 0
var max = 0
var sum = 0
final case class StartJob(payload: List[Int])
def apply(payload: List[Int]): Behavior[ProcessDone] =
Behaviors.setup { context =>
val payloadPartitions: List[List[Int]] = payload.grouped(4).toList
max = payloadPartitions.size
payloadPartitions.foreach { l =>
val worker = context.spawn(AdderWorker(), UUID.randomUUID.toString)
// Send the Process message to the newly spawned worker
// providing a reference to the guardian actor and a partition of the payload
worker ! AdderWorker.Process(context.self, l)
}
Behaviors.receiveMessage { message =>
// this is thread-safe
sum += message.result
counter += 1
if (counter < max) {
println(s"Current sum: ${sum}")
Behaviors.same
} else {
println(s"Final sum: ${sum}")
Behaviors.stopped
}
}
}
val payload = List(10, 20, 30, 40, 50, 60, 70, 80, 90, 100) // result should be 550
val system: ActorSystem[AdderWorker.ProcessDone] = ActorSystem(BasicAdder(payload), "AdderMain")
}