как использовать ActorSink.actorRefWithBackpressure в потоке akka, используя типизированный akka - PullRequest
0 голосов
/ 18 июня 2020

Я пытаюсь изучить потоки akka, используя набранный akka, документация немного абстрактна, когда дело доходит до набранного akka

Пример Sink.actorRefWithBackpressure довольно прост и легко понять, где как ActorSink.actorRefWithBackpressure пример абстрактный

в первом примере у нас есть AckingReceiver актер, который выполняет требуемую работу, но когда дело доходит до второго примера

нет реализации классов case, как это было в AckingReceiver

val actor: ActorRef[Protocol] = targetActor()

, я видел этот код где-то, но я тоже не могу его понять

def targetActor(): ActorRef[Protocol] = ???

как мы можем предоставить реализацию целевого актора, который имеет дело с классами кейсов, любая помощь будет оценена

1 Ответ

1 голос
/ 18 июня 2020

ActorRef[Protocol] типизированный актер, как и любой другой. Получение ActorRef вне ActorSystem в типизированном виде несколько сложнее, чем в classi c, вероятно, поэтому в документации это опущено (поскольку это не важно для объяснения того, как использовать ActorSink.actorRefWithBackpressure).

Обычно вы устанавливаете набранный ActorSystem и спрашиваете, что ActorSystem для ActorRef:

import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl._

object MainSystem {
  sealed trait Command
  case class ObtainProtocolActor(replyTo: ActorRef[ProtocolActorIs])

  sealed trait Reply
  case class ProtocolActorIs(actor: ActorRef[Protocol])

  def apply(): Behavior[Command] =
    Behaviors.receive { (context, msg) =>
      case ObtainProtocolActor(replyTo) =>
        val protocolActor: ActorRef[Protocol] = context.spawnAnonymous(
          // Define the protocol actor
          Behaviors.receive[Protocol] { (context, msg) =>
            case Init(ackTo) =>
              println(s"Actor ${context.self.path} initializing")
              ackTo ! Ack
              Behaviors.same
            case Message(ackTo, msg) =>
              println(s"Actor ${context.self.path} received $msg")
              ackTo ! Ack
              Behaviors.same
            case Complete =>
              context.stop()  // Delayed until the message is processed
              ackTo ! Ack
              Behaviors.same
            case Fail(ex) =>
              println(s"Actor ${context.self.path} got failure from stream: ${ex.getMessage}")
              Behaviors.same
          })
        context.watch(protocolActor)
        replyTo ! ProtocolActorIs(protocolActor)
    }.receiveSignal {
      case (context, Terminated(ref)) =>
        println(s"Actor ${ref.path} terminated")
    }
}

val actorSystem = ActorSystem(MainSystem(), "main")

def targetActor(): ActorRef[Protocol] = Await.result(
  actorSystem.ask(MainSystem.ObtainProtocolActor(_)).map(_.replyTo),
  15.second
)

Это, вероятно, показывает два самых больших практических, но, возможно, неочевидных различия между классами. c и набран:

  • ActorSystem в набранном виде - это актер (на самом деле возможно, чтобы ActorRef[Protocol] в этом примере было ActorSystem, хотя маловероятно, что вы на самом деле хочу сделать это)
  • шаблон запроса изменен довольно драматично c способом
...