RunnableGraph для ожидания множественного ответа от источника - PullRequest
0 голосов
/ 08 января 2020

Я использую Akka в Play Controller и выполняю ask () для актера по имени publi sh, а внутренний publi sh актер выполняет запрос для нескольких акторов и передает ссылку на отправителя. Актер контроллера должен дождаться ответа от нескольких акторов и создать список ответов.

Пожалуйста, найдите код ниже. но этот код ожидает только 1 ответ и последний завершается. Пожалуйста, предложите


// Performs ask to publish actor
Source<Object,NotUsed> inAsk = Source.fromFuture(ask(publishActor,service.getOfferVerifyRequest(request).getPayloadData(),1000));

final Sink<String, CompletionStage<String>> sink = Sink.head();

        final Flow<Object, String, NotUsed> f3 = Flow.of(Object.class).map(elem -> {
            log.info("Data in Graph is " +elem.toString());
            return elem.toString();
        });
        RunnableGraph<CompletionStage<String>> result = RunnableGraph.fromGraph(
                GraphDSL.create(
                        sink , (builder , out) ->{
                            final Outlet<Object> source = builder.add(inAsk).out();
                            builder
                                    .from(source)
                                    .via(builder.add(f3))
                                    .to(out); // to() expects a SinkShape
                            return ClosedShape.getInstance();
                        }
                ));

        ActorMaterializer mat = ActorMaterializer.create(aSystem);

        CompletionStage<String> fin = result.run(mat);


        fin.toCompletableFuture().thenApply(a->{
                log.info("Data is "+a);
                return true;
        });

        log.info("COMPLETED CONTROLLER ");

1 Ответ

0 голосов
/ 10 января 2020

Если у вас есть несколько ответов, ask не будет сокращать его, то есть только для одного запроса-ответа, где ответ заканчивается Future / CompletionStage.

. Несколько разных стратегий для ожидания всех ответов:

Одна из них - создать промежуточного субъекта, единственной задачей которого является сбор всех ответов, а затем, когда все частичные ответы получены, отвечают исходному запрашивающему, таким образом, вы можете использовать ask чтобы вернуть один агрегированный ответ.

Другой вариант - использовать Source.actorRef, чтобы получить ActorRef, который можно использовать как sender вместе с tell (и пропустить с помощью ask) , Внутри потока вы должны брать элементы до тех пор, пока не будут выполнены некоторые критерии (прошло время или элементы были замечены) Возможно, вам придется добавить оператор в mimi c тайм-аут ответа на запрос, чтобы убедиться, что поток не работает, если субъект никогда не отвечает.

Есть некоторые другие проблемы с общим кодом, один из которых создает материализатор на каждый запрос имеет жизненный цикл и будет заполнять вашу кучу с течением времени, вам лучше вводить материализатор из игры.

С данным логом c нет необходимости использовать GraphDSL, что необходим только для сложных потоков с несколькими входами и выходами или циклами. Вы должны иметь возможность составлять операторы, используя только API потока (см., Например, https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html#defining -and-running-streams )

...