Akka StreamRefs - IllegalStateException (видел RemoteStreamCompleted в состоянии UpstreamTermination) - PullRequest
0 голосов
/ 30 марта 2020

Я пытаюсь отправить поток аудио из службы A в службу B с использованием ссылок akka stream (версия библиотеки akka-streams: 2.6.3). Все работает довольно хорошо, за исключением того факта, что раз в месяц исключение (при ежедневном использовании этого сервиса около 50 тыс. Вызовов в день или около того) выбрасывается в ссылку на поток akka, и я не могу найти причину проблема.

Трассировка стека для ошибки следующая:

        Caused by: java.lang.IllegalStateException: [SourceRef-46] Saw RemoteStreamCompleted(37) while in state UpstreamTerminated(Actor[akka://system-name@serviceA:34363/system/Materializers/StreamSupervisor-3/$$S4-SinkRef-3405#-939568637]), should never happen
            at akka.stream.impl.streamref.SourceRefStageImpl$$anon$1.$anonfun$receiveRemoteMessage$1(SourceRefImpl.scala:285)
            at akka.stream.impl.streamref.SourceRefStageImpl$$anon$1.$anonfun$receiveRemoteMessage$1$adapted(SourceRefImpl.scala:196)
            at akka.stream.stage.GraphStageLogic$StageActor.internalReceive(GraphStage.scala:243)
            at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1(GraphStage.scala:202)
            at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1$adapted(GraphStage.scala:202)
            at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:466)
            at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:497)
            at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:599)
            at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:768)
            at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:783)
            at akka.actor.Actor.aroundReceive(Actor.scala:534)
            at akka.actor.Actor.aroundReceive$(Actor.scala:532)
            at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:690)
            ... 11 common frames omitted

Код, отвечающий за передачу звука через SourceRef в службе A:

Materializer materializer = Materializer.createMaterializer(actorSystem);
AudioExtractor extractor = new AudioExtractorImpl("/path/to/audio/file"); // gets all audio bytes from audio file and puts them into chunks (byte arrays of certain length)
List<AudioChunk> audioChunkList = extractor.getChunkedBytesIntoList();
SourceRef<AudioChunk> sourceRef = Source.from(audioChunkList)
      .runWith(StreamRefs.sourceRef(), materializer);
// wrap the sourceRef into msg
serviceBActor.tell(wrappedAudioSourceRefInMsg, getSelf());

Принимая во внимание код, отвечающий Принятие аудио в сервисе B:

private final List<AudioChunk> audioChunksBuffer = new ArrayList<>();
private final Materializer materializer;

public Receive createReceive() {
      return receiveBuilder.match(WrappedAudioSourceRefInMsg.class, response -> {
            response.getSourceRef()
                  .getSource()
                  .runWith(Sink.forEach(chunk -> audioChunksBuffer.add(chunk)), materializer); 
      }).build();
} 

Я подтвердил, что эта ошибка всегда происходит после того, как весь звук был отправлен из сервиса A, и поток завершен. Я не могу понять, почему SourceRef получает RemoteStreamCompleted в состоянии UpstreamTerminated. Особенно расстраивает то, что should never happen в сообщении. : |

Любая помощь с этим будет приветствоваться.

1 Ответ

0 голосов
/ 31 марта 2020

Закрытие, ошибка в akka сообщается здесь: https://github.com/akka/akka/issues/28852

...