Akka-http обработка исключений (Scala) - PullRequest
0 голосов
/ 10 октября 2019

Я использую Akka-hhtp (scala) для асинхронной отправки нескольких пакетных HTTP-запросов к API и задаюсь вопросом, как правильно обрабатывать исключения, когда код ответа отличается от 200 OK.

Ниже приведен псевдокод, демонстрирующий мою точку зрения.


/* Using For comprehension here because the server API has restriction on the amount of data we can send and the time it takes them to process each request. So they require us to send multiple mini requests instead. If one of those fails, then our entire job should fail.*/

val eventuallyResponses = for {
    batches <- postBatch(payload)
} yield batches

val eventualResponses = Future.sequence(eventuallyResponses)

/* Do I need to recover here? If I don't, will the actor system terminate? */
eventualResponses.recover { case es =>
   log.warn("some message")
   List()
}

/* As I said I need to wait for all mini batch requests to complete. If one response is different than 200, then the entire job should fail. */
val result = Await.result(eventualResponses, 10.minutes)


actorSystem.terminate().oncomplete{
  case Success(_) =>
      if (result.isEmpty) =>
          /* This doesn't seem to interrupt the program */
          throw new RuntimeException("POST failed")
      } else {
          log.info("POST Successful")
      }
   case Failure(ex) =>
      log.error("error message $ex")
      throw ex
}

def postBatch(payload) = {
    val responseFuture: Future[HttpResponse] = httpClient.post(payload)

     responseFuture.flatMap{ res =>
       res.status match {
         case StatusCodes.OK => Future.successful(res)
         case _ => Future.failed(new RuntimeException("error message"))
       }
      }
}

Приведенный выше код вызывает исключение, когда мы получаем коды состояния, отличные от OK. Он проходит через ветку result.isEmpty true, но, похоже, не останавливает / не прерывает выполнение программы. Мне нужно это сделать, поскольку это запланировано как задание Autosys, и мне нужно, чтобы задание не выполнялось, если хотя бы один из пакетных запросов возвращает ответ, отличный от 200 OK.

Если я не recover и разрешу генерировать исключение, то (когда мы получим код состояния, отличный от 200), будет ли система актеров завершена должным образом?

Знаете ли вы охороший способ сделать выше?

Спасибо:)

1 Ответ

1 голос
/ 10 октября 2019

Насколько я понимаю ваш вопрос, вам нужно выбросить исключение из main тела, если некоторые ответы не имеют статуса 200.

def postBatch(payload: HttpRequest)(implicit system: ActorSystem, ec: ExecutionContext): Future[HttpResponse] = {
    Http().singleRequest(payload).flatMap(response => response.status match {
        case StatusCodes.OK => Future.successful(response)
        case _ => Future.failed(new RuntimeException("error message"))
    })
}

val reuests: List[HttpRequest] = List(...)
/*
You don't need for comprehension here because
val eventuallyResponses = for {
  batches <- postBatch(payload)
} yield batches

is equal to
val eventuallyResponses = postBatch(payload)

For comprehension doesn't process recursive sending. If you need it you should write it yourself by flatMap on each request future.
*/
val eventualResponses: Future[List[HttpResponse]] =
    Future.sequence(reuests.map(postBatch)) //also, its better to add some throttling logic here

//as far as i understand you need to wait for all responses and stop the actor system after that
Await.ready(eventualResponses, 10 minutes) //wait for all responses
Await.ready(actorSystem.terminate(), Duration.Inf) //wait for actor system termination

//because of Await.ready(eventualResponses, 10 minutes) we can match on the future value and expect that it should be completed  
eventualResponses.value match {
    case Some(Success(responses)) =>
        log.info("All requests completed")
    case Some(Failure(exception)) =>
        log.error("Some request failed")
        throw exception //rethrow this exception
    case None =>
        log.error("Requests timed out")
        throw RuntimeException("Requests timed out")
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...