Куда отправляется сообщение после тайм-аута Patterns.ask? - PullRequest
0 голосов
/ 14 апреля 2020

Недавно были некоторые тайм-ауты с scala .concurrent.Future объектами, созданными в ожидании обработки в актере Akka, и мне было интересно, как обрабатывать эти события тайм-аута. Они действительно потеряны? Они повторяются и сохраняются в памяти или как это работает?

Чтобы добавить немного контекста, код выглядит следующим образом:

List<Future<MyMessage>> futureMessageList = plainMessages.stream()
    .map(this::toFuture)
    .collect(Collectors.toList());    

Futures.sequence(futureMessageList, ExecutionContexts.global())
                .onComplete(new OnComplete<Iterable<MyMessage>>() {
                    @Override
                    public void onComplete(Throwable throwable, Iterable<MyMessage> messages) {
                        ... // iterate futureMessageList list

В onComplete итерации по futureMessageList, который в основном состоит из Future объектов, которые инкапсулируют MyMessage.

Однако функция toFuture делает Patterns.ask() с данным диспетчером, и это, кажется, заняв больше времени, которое я отправил (60 секунд) . Примите во внимание, что время отклика зависит от базовой системы, которая может находиться под высокой нагрузкой или без самой быстрой сети в зависимости от среды, в которой она работает.

Future<MyMessage> message = Patterns.ask(actorSystem.getSampleDispatcher(), msg, TIMEOUT_60_SECS)

Поэтому мой вопрос таков: после onComplete выдает следующее исключение из-за того, что Future не обрабатывается вовремя ...

java.lang.NullPointerException
    at my.package.Clazz.onComplete(Clazz.java:4)
    at my.package.Clazz$1.onComplete(Clazz.java:5)
    at akka.dispatch.OnComplete.internal(Future.scala:258)
    at akka.dispatch.OnComplete.internal(Future.scala:256)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
    at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:32)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)

Сохраняются ли эти MyMessage ob js в памяти и затем повторяются? Должен ли я как-то обработать исключение и обработать эти сообщения с тайм-аутом с помощью списка в памяти или как мне обойти это?

1 Ответ

1 голос
/ 17 апреля 2020

Когда ask истекает из-за отсутствия ответа, завершается Future (или CompletionStage) с ошибкой. Сообщение все еще может быть где-то обрабатывается, и если есть ответ, оно будет заканчиваться пустыми буквами (https://doc.akka.io/docs/akka/current/general/message-delivery-reliability.html#dead -букв ). Другой сценарий ios, где может наступить тайм-аут, если субъект остановился или потерпел крах при обработке сообщения, запрос или ответ были потеряны (маловероятно, если отвечающий субъект удален).

Future.sequence либо завершить успешно, когда все переданные ему фьючерсы успешно завершены или потерпели неудачу, если какой-либо из них потерпел неудачу.

Это означает, что при превышении времени ожидания любого запроса вы получите null в качестве параметра сообщений и исключение из первого неудачного будущего в качестве параметра throwable в обратном вызове onComplete.

Если вы предпочитаете получить частичный список результатов, каждый из которых является либо успешным значением, либо исключением. Вы можете сделать это с помощью восстановления в каждом будущем, прежде чем передать их Future.sequence.

...