Я пытаюсь отправить поток аудио из службы A в службу B с использованием ссылок akka stream (версия библиотеки akka-streams: 2.6.3). Все работает довольно хорошо, за исключением того факта, что раз в месяц исключение (при ежедневном использовании этого сервиса около 50 тыс. Вызовов в день или около того) выбрасывается в ссылку на поток akka, и я не могу найти причину проблема.
Трассировка стека для ошибки следующая:
Caused by: java.lang.IllegalStateException: [SourceRef-46] Saw RemoteStreamCompleted(37) while in state UpstreamTerminated(Actor[akka://system-name@serviceA:34363/system/Materializers/StreamSupervisor-3/$$S4-SinkRef-3405#-939568637]), should never happen
at akka.stream.impl.streamref.SourceRefStageImpl$$anon$1.$anonfun$receiveRemoteMessage$1(SourceRefImpl.scala:285)
at akka.stream.impl.streamref.SourceRefStageImpl$$anon$1.$anonfun$receiveRemoteMessage$1$adapted(SourceRefImpl.scala:196)
at akka.stream.stage.GraphStageLogic$StageActor.internalReceive(GraphStage.scala:243)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1(GraphStage.scala:202)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1$adapted(GraphStage.scala:202)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:466)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:497)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:599)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:768)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:783)
at akka.actor.Actor.aroundReceive(Actor.scala:534)
at akka.actor.Actor.aroundReceive$(Actor.scala:532)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:690)
... 11 common frames omitted
Код, отвечающий за передачу звука через SourceRef в службе A:
Materializer materializer = Materializer.createMaterializer(actorSystem);
AudioExtractor extractor = new AudioExtractorImpl("/path/to/audio/file"); // gets all audio bytes from audio file and puts them into chunks (byte arrays of certain length)
List<AudioChunk> audioChunkList = extractor.getChunkedBytesIntoList();
SourceRef<AudioChunk> sourceRef = Source.from(audioChunkList)
.runWith(StreamRefs.sourceRef(), materializer);
// wrap the sourceRef into msg
serviceBActor.tell(wrappedAudioSourceRefInMsg, getSelf());
Принимая во внимание код, отвечающий Принятие аудио в сервисе B:
private final List<AudioChunk> audioChunksBuffer = new ArrayList<>();
private final Materializer materializer;
public Receive createReceive() {
return receiveBuilder.match(WrappedAudioSourceRefInMsg.class, response -> {
response.getSourceRef()
.getSource()
.runWith(Sink.forEach(chunk -> audioChunksBuffer.add(chunk)), materializer);
}).build();
}
Я подтвердил, что эта ошибка всегда происходит после того, как весь звук был отправлен из сервиса A, и поток завершен. Я не могу понять, почему SourceRef получает RemoteStreamCompleted
в состоянии UpstreamTerminated
. Особенно расстраивает то, что should never happen
в сообщении. : |
Любая помощь с этим будет приветствоваться.