• 1000 Я переключаюсь с akka classi c на актеров akka и делаю что-то вроде этого
object FlowActor {
val log = LoggerFactory.getLogger(this.getClass)
val FlowServiceKey = ServiceKey[FlowActor.TransformText]("FlowActor")
sealed trait Command
final case class TransformText(text: String, replyTo: ActorRef[TextTransformed]) extends Command with CborSerializable
final case class TextTransformed(text: String) extends CborSerializable
def apply(): Behavior[Command] =
Behaviors.setup { ctx =>
// each worker registers themselves with the receptionist
ctx.log.info("Registering myself with receptionist")
ctx.system.receptionist ! Receptionist.Register(FlowServiceKey, ctx.self)
Behaviors.receiveMessage {
case TransformText(text, replyTo) =>
replyTo ! TextTransformed(text.capitalize)
Behaviors.same
}
}
}
object SinkActor {
sealed trait Event
case object StartWork extends Event
trait Ack
object Ack extends Ack
trait Protocol
case class Init(ackTo: ActorRef[Ack]) extends Protocol
case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Throwable) extends Protocol
def apply(actorMaterializer: ActorMaterializer): Behavior[Event] = Behaviors.setup { ctx =>
implicit val mat = actorMaterializer
implicit val timeout=Timeout(5, SECONDS)
Behaviors.receiveMessage {
case STartWork=>
val ref = ctx
.spawn( (FlowActor()) ,"flowActor"
)
import FlowActor._
val askFlow: Flow[String, TextTransformed, NotUsed] =
ActorFlow.ask(ref)(TransformText.apply)
def targetActor(): ActorRef[Protocol] = ???
val actor: ActorRef[Protocol] = targetActor()
val source = Source(List("hello","from","akka","streams!"))
val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure(
ref = actor,
onCompleteMessage = Complete,
onFailureMessage = Fail.apply,
messageAdapter = Message.apply,
onInitMessage = Init.apply,
ackMessage = Ack)
val stream = source.via(askFlow).runWith(sink)
Behaviors.same
}
}
}
в последней строке я получаю ошибку времени компиляции
found : akka.stream.scaladsl.Sink[String,akka.NotUsed]
[error]
required: akka.stream.Graph[akka.stream.SinkShape[sample.cluster.akkastreams.FlowActor.TextTransformed],?]
[error] val stream = source.via(askFlow).runWith(sink)
what i
расскажите, пожалуйста, что мне здесь не хватает?