Рекавери на Акке спрашивать на основе отправленного сообщения - PullRequest
2 голосов
/ 06 мая 2020

Я отправляю актеру разные сообщения через ask. По таймауту я хотел бы указать значение по умолчанию, которое отличается для сообщений, запрашиваемых актеру.

Поскольку исключение тайм-аута всегда одно и то же, я не могу использовать его в recover для возврата другого значения по умолчанию значения Мне нужно, чтобы исходное сообщение было отправлено.

Как этого добиться.

Пример кода:

      val storageActorProxy = Flow[ByteString]
        .via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
        .map(TCPMessage.decode)
        .ask[OperationResponse](storageActor)
        //TODO: looking for this recover; non-existent AFAIK
        .customRecover { 
            case Op1 => DefaultResponseA()
            case Op2 => DefaultResponseB()
        }
        .map(TCPMessage.encode(_).toByteString)

1 Ответ

3 голосов
/ 06 мая 2020

Метод ask Akka на самом деле довольно легко воссоздать - это просто mapAsync с некоторым дополнительным logi c для улучшения ошибок при смерти актера ( см. Код ). Таким образом, просто используйте mapAsync вручную, чтобы вы могли исправить ошибку запроса.

val storageActorProxy = Flow[ByteString]
  .via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
  .map(TCPMessage.decode)
  .mapAsync(parallelism = 2) { decodedMessage =>
     (storageActor ? decodedMessage).recover {
       case Op1 => DefaultResponseA()
       case Op2 => DefaultResponseB()
     }
  }
  .map(TCPMessage.encode(_).toByteString)
...