Альпакка продюсерская сцена завершает и закрывает производителя - PullRequest
0 голосов
/ 02 апреля 2019

Я написал программу akka, которая использует комбинацию akka stream и alpakka.

У меня есть actorRef для Producer.plainSink.

    ActorRef outTopicWriterActor = Source.actorRef(Integer.MAX_VALUE,OverflowStrategy.fail())
            .idleTimeout(Duration.ofMinutes(time_out))
            .map(eval -> Jsonizer.getMessageFromKafkaJSON(getJSON((Evaluation)eval),applicationName,instance,computerName,"_doc",((Evaluation) eval).stopTime))
            .map(json -> new ProducerRecord<String,String>(outtopic,json.toJSONString()))
            .to(Producer.plainSink(producerSettings))
            .run(materializer);

В коде актера я отправляю сообщение на сцену с приемником продюсера с актером.

  outTopicWriterActor.tell(output,getSelf());

Кажется, это работает нормально, но через некоторое время «этап завершен» и «производитель закрыт», после чего все сообщения становятся пустыми буквами.

2019-04-01 12:30:52.920UTC DEBUG[Analyzer-akka.actor.default-dispatcher-29] a.k.i.DefaultProducerStage DefaultProducerStage(akka://Analyzer) - Stage completed
2019-04-01 12:30:52.920UTC DEBUG[Analyzer-akka.actor.default-dispatcher-29] a.k.i.DefaultProducerStage DefaultProducerStage(akka://Analyzer) - Producer closed
2019-04-01 12:30:52.934UTC DEBUG[Analyzer-akka.actor.default-dispatcher-29] a.s.i.ActorRefSourceActor akka://Analyzer/system/StreamSupervisor-0/flow-0-2-actorRefSource - stopped
2019-04-01 12:30:53.090UTC INFO [Analyzer-akka.actor.default-dispatcher-24] a.a.RepointableActorRef akka://Analyzer/system/StreamSupervisor-0/flow-0-2-actorRefSource - Message [Evaluation] from Actor[akka://RTCPAnalyzer/system/sharding/nl.phact.rtcp.actors.AnalyzerActor/38/1683109215-939#1340984343] to Actor[akka://Analyzer/system/StreamSupervisor-0/flow-0-2-actorRefSource#-195168168] was not delivered. [3] dead letters encountered. If this is not an expected behavior, then [Actor[akka://Analyzer/system/StreamSupervisor-0/flow-0-2-actorRefSource#-195168168]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Как я могу помешать "стадии продюсера" "завершить" и закрыть продюсера?

...