как разрешить ошибку akka.stream.Graph приемника потока akka, используя набранный akka - PullRequest
1 голос
/ 16 июня 2020
• 1000 Я переключаюсь с akka classi c на актеров akka и делаю что-то вроде этого
object FlowActor {
  val log = LoggerFactory.getLogger(this.getClass)
  val FlowServiceKey = ServiceKey[FlowActor.TransformText]("FlowActor")

  sealed trait Command
  final case class TransformText(text: String, replyTo: ActorRef[TextTransformed]) extends Command with CborSerializable
  final case class TextTransformed(text: String) extends CborSerializable

  def apply(): Behavior[Command] =
    Behaviors.setup { ctx =>
      // each worker registers themselves with the receptionist
      ctx.log.info("Registering myself with receptionist")
      ctx.system.receptionist ! Receptionist.Register(FlowServiceKey, ctx.self)

      Behaviors.receiveMessage {
        case TransformText(text, replyTo) =>
          replyTo ! TextTransformed(text.capitalize)
          Behaviors.same
      }
    }
}

object SinkActor {

  sealed trait Event
  case object StartWork extends Event
  trait Ack
  object Ack extends Ack

  trait Protocol
  case class Init(ackTo: ActorRef[Ack]) extends Protocol
  case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
  case object Complete extends Protocol
  case class Fail(ex: Throwable) extends Protocol
def apply(actorMaterializer: ActorMaterializer): Behavior[Event] = Behaviors.setup { ctx =>
    implicit val mat = actorMaterializer
    implicit val timeout=Timeout(5, SECONDS)
  Behaviors.receiveMessage {
    case STartWork=>
  val ref = ctx
        .spawn( (FlowActor()) ,"flowActor"
        )
        import FlowActor._
        val askFlow: Flow[String, TextTransformed, NotUsed] =
          ActorFlow.ask(ref)(TransformText.apply)
def targetActor(): ActorRef[Protocol] = ???

        val actor: ActorRef[Protocol] = targetActor()

        val source = Source(List("hello","from","akka","streams!"))

        val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure(
          ref = actor,
          onCompleteMessage = Complete,
          onFailureMessage = Fail.apply,
          messageAdapter = Message.apply,
          onInitMessage = Init.apply,
          ackMessage = Ack)

        val stream = source.via(askFlow).runWith(sink)
Behaviors.same
    }
  }
  }

в последней строке я получаю ошибку времени компиляции

found   : akka.stream.scaladsl.Sink[String,akka.NotUsed]
[error]  
required: akka.stream.Graph[akka.stream.SinkShape[sample.cluster.akkastreams.FlowActor.TextTransformed],?]
[error]         val stream = source.via(askFlow).runWith(sink)

what i

расскажите, пожалуйста, что мне здесь не хватает?

1 Ответ

0 голосов
/ 19 июня 2020

Это происходит потому, что код пытается соединить Flow, который испускает FlowActor.TextTransformed элементов, с Sink, который ожидает String элементов. Ему необходимо преобразовать данные TextTransformed в объекты String:

val stream = source.via(askFlow)
    .map(element => element.text) // or equivalently: .map(_.text)
    .runWith(sink)

Есть как минимум два других возможных способа сделать это, но я думаю, что приведенный выше пример является лучшим подходом, по крайней мере, для более реалистично c и сложные приложения.

  1. FlowActor может ответить String напрямую вместо TextTransformed, изменив тип поля replyTo в TransformText на ActorRef[String]. Это было бы ближе к тому, как работает исходный пример classi c, но рекомендуется использовать типы оболочки, такие как TextTransformed, в протоколах субъектов, поскольку это повышает безопасность типов и может упростить развитие протокола.
  2. Приемник мог ожидать TextTransformed элементов, а не String, либо с помощью дескриптора функции messageAdaptor, разворачивающего String из объекта TextTransformed, либо за счет использования класса Message TextTransformed вместо String в поле msg. Недостатком этого подхода является то, что он плотно соединяет сток с потоком.
...