Простой Akka целочисленный список с актерами - PullRequest
1 голос
/ 01 мая 2020

Я очень стараюсь выполнить это упражнение, но не могу найти способ заставить его работать, даже после копирования примеров с сайта Akka (https://doc.akka.io/docs/akka/current/typed/actors.html).

I нужно просто разделить список целых чисел на n разделов, дать каждому разделу каждый n актер, позволить актору суммировать все их значения и, наконец, вернуть сообщение родительскому актору с результатом, который добавит его в переменную-накопитель. Мне не нужно делать что-то необычное, например, протоколирование, отказоустойчивость, оптимизацию производительности и т. Д. c.

import AdderMain.StartJob
import AdderWorker.ProcessDone
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, LoggerOps}
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}

object AdderWorker {
    final case class Process(replyTo: ActorRef[ProcessDone], payloadPartition: List[Int])
    final case class ProcessDone(result: Int)

    def apply(): Behaviors.Receive[Process] =
        Behaviors.receiveMessage[Process] {
            case Process(replyTo, payloadPartition) =>
                println(s"Worker is processing data...")
                val result = sumListElements(payloadPartition)
                replyTo ! ProcessDone(result)
                Behaviors.same
        }

    private def sumListElements(list: List[Int]): Int = list match {
        case Nil => 0
        case x :: xs => x + sumListElements(xs)
    }
}

object AdderMain extends App {

    final case class StartJob(payload: List[Int])

    def apply(payload: List[Int]): Behavior[ProcessDone] =
        Behaviors.setup { context =>
            val payloadPartitions: List[List[Int]] = payload.grouped(4).toList
            payloadPartitions.foreach { l =>
                val worker = context.spawn(AdderWorker(), "Stuff")
                worker ! AdderWorker.Process(context.self, l)
            }

            Behaviors.receiveMessage { message =>
                println(s"Done ${message}")
                Behaviors.same
            }
        }

    val payload = List(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)

    val system: ActorSystem[AdderMain.StartJob] = ActorSystem(AdderMain(payload), "AdderMain")
}

Как видите, AdderMain порождает AdderWorker для каждого раздела списка и отправляет сообщение «Обработка», содержащее список. Я получаю ошибку

Error:(45, 72) type mismatch;
 found   : akka.actor.typed.Behavior[AdderWorker.ProcessDone]
 required: akka.actor.typed.Behavior[Product with java.io.Serializable]
Note: AdderWorker.ProcessDone <: Product with java.io.Serializable, but class Behavior is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
    val system: ActorSystem[AdderMain.StartJob] = ActorSystem(AdderMain(payload), "AdderMain")

, но похоже, что это связано с большими проблемами в моем коде ... не стесняйтесь исправлять меня столько, сколько хотите, я только начал с Scala, Акка и функциональное программирование.

PS: я еще не накопил результатов ни одной переменной

РЕДАКТИРОВАТЬ: Вот рабочее решение

import java.util.UUID

import AdderWorker.ProcessDone
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, LoggerOps}


object AdderWorker {
    final case class Process(replyTo: ActorRef[ProcessDone], payloadPartition: List[Int])
    final case class ProcessDone(result: Int)

    def apply(): Behaviors.Receive[Process] =
        Behaviors.receiveMessage[Process] {
            case Process(replyTo, payloadPartition) =>
                println(s"Worker is processing data...")
                val result = sumListElements(payloadPartition)
                replyTo ! ProcessDone(result)
                Behaviors.stopped
        }

    private def sumListElements(list: List[Int]): Int = list match {
        case Nil => 0
        case x :: xs => x + sumListElements(xs)
    }
}

object BasicAdder extends App {
    var counter = 0
    var max = 0
    var sum = 0

    final case class StartJob(payload: List[Int])

    def apply(payload: List[Int]): Behavior[ProcessDone] =
        Behaviors.setup { context =>
            val payloadPartitions: List[List[Int]] = payload.grouped(4).toList
            max = payloadPartitions.size

            payloadPartitions.foreach { l =>
                val worker = context.spawn(AdderWorker(), UUID.randomUUID.toString)

                // Send the Process message to the newly spawned worker
                // providing a reference to the guardian actor and a partition of the payload
                worker ! AdderWorker.Process(context.self, l)
            }

            Behaviors.receiveMessage { message =>
                // this is thread-safe
                sum += message.result
                counter += 1

                if (counter < max) {
                    println(s"Current sum: ${sum}")
                    Behaviors.same
                } else {
                    println(s"Final sum: ${sum}")
                    Behaviors.stopped
                }
            }
        }

        val payload = List(10, 20, 30, 40, 50, 60, 70, 80, 90, 100) // result should be 550

    val system: ActorSystem[AdderWorker.ProcessDone] = ActorSystem(BasicAdder(payload), "AdderMain")
}
...