Akka Streams - Противодавление для Source.unfoldAsync - PullRequest
4 голосов
/ 23 марта 2019

Я сейчас пытаюсь прочитать постраничный ресурс HTTP. Каждая страница является многокомпонентным документом, и ответ для этой страницы содержит ссылку next в заголовках, если есть страница с большим содержанием. Затем автоматический анализатор может начать с самой старой страницы, а затем читать страницу за страницей, используя заголовки для создания запроса на следующую страницу.

Я использую Akka Streams и Akka Http для реализации, потому что моя цель - создать потоковое решение. Я придумал это (я включу здесь только соответствующие части кода, не стесняйтесь взглянуть на эту суть для всего кода):

def read(request: HttpRequest): Source[HttpResponse, _] =
  Source.unfoldAsync[Option[HttpRequest], HttpResponse](Some(request))(Crawl.crawl)

val parse: Flow[HttpResponse, General.BodyPart, _] = Flow[HttpResponse]
  .flatMapConcat(r => Source.fromFuture(Unmarshal(r).to[Multipart.General]))
  .flatMapConcat(_.parts)

....

def crawl(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = reqOption match {
  case Some(req) =>
    Http().singleRequest(req).map { response =>
      if (response.status.isFailure()) Some((None, response))
      else nextRequest(response, HttpMethods.GET)
    }
  case None => Future.successful(None)
}

Таким образом, общая идея состоит в том, чтобы использовать Source.unfoldAsync для обхода страниц и выполнения HTTP-запросов (Идея и реализация очень близки к тому, что описано в этом ответе . Это создаст Source[HttpResponse, _], который затем можно использовать (Unmarshal to Multipart, разделить на отдельные части, ...).

Моя проблема сейчас заключается в том, что потребление HttpResponse может занять некоторое время (Разборка занимает некоторое время, если страницы большие, возможно, в конце будут некоторые запросы к базе данных, чтобы сохранить некоторые данные, ...) , Поэтому я бы хотел, чтобы Source.unfoldAsync давил обратное давление, если нисходящий поток медленнее. По умолчанию следующий HTTP-запрос будет запущен сразу после завершения предыдущего.

Итак, мой вопрос: есть ли какой-нибудь способ создать Source.unfoldAsync противодавление на медленном спуске? Если нет, есть ли альтернатива, которая делает возможным противодавление?

Я могу представить себе решение, которое использует клиентский API на уровне хоста, который предоставляет akka-http, как описано здесь вместе с циклическим графом, где ответ на первый запрос будет использоваться как вход для генерации второго запроса, но я еще не пробовал, и я не уверен, может ли это сработать.


РЕДАКТИРОВАТЬ: После нескольких дней игры и чтения документации и некоторых блогов, я не уверен, был ли я на правильном пути с моим предположением, что поведение противодавления Source.unfoldAsync является первопричина. Чтобы добавить еще несколько замечаний:

  • Когда поток запускается, я вижу, как выходит несколько запросов. Во-первых, это не проблема, если полученный HttpResponse своевременно потребляется (см. здесь для описания)
  • Если я не изменю значение по умолчанию response-entity-subscription-timeout, я столкнусь со следующей ошибкой (я удалил URL):
    [WARN] [03/30/2019 13:44:58.984] [default-akka.actor.default-dispatcher-16] [default/Pool(shared->http://....)] [1 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 seconds. Make sure to read the response entity body or call discardBytes() on it. GET ... Empty -> 200 OK Chunked
    Это приводит к IllegalStateException, который завершает поток: java.lang.IllegalStateException: Substream Source cannot be materialized more than once
  • Я заметил, что демаршаллинг ответа является самой медленной частью в потоке, что может иметь смысл, потому что тело ответа является многокомпонентным документом и, следовательно, относительно большим. Тем не менее, я ожидал бы, что эта часть потока будет сигнализировать о меньшем спросе в восходящем направлении (что является частью Source.unfoldAsync в моем случае). Это должно привести к тому, что будет сделано меньше запросов.
  • Гугл привел меня к дискуссии о проблеме, которая, кажется, описывает похожую проблему . Они также обсуждают проблемы, возникающие, когда ответ обрабатывается недостаточно быстро. связанный запрос на слияние принесет изменения документации, предлагающие полностью использовать HttpResponse перед продолжением потока. При обсуждении вопроса также возникают сомнения относительно , является ли хорошей идеей объединить Akka Http с Akka Streams . Поэтому, возможно, мне придется изменить реализацию, чтобы напрямую выполнять демаршаллинг внутри функции, вызываемой unfoldAsync.

Ответы [ 2 ]

1 голос
/ 29 марта 2019

В соответствии с реализацией из Source.unfoldAsync переданная функция вызывается только при извлечении источника:

def onPull(): Unit = f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)

Так что, если нисходящий поток не тянет (обратное давление), функция, переданная источнику, не вызывается.

В вашем гистре вы используете runForeach (что совпадает с runWith(Sink.foreach)), который тянет вверх по течению, как только println закончится. Поэтому здесь трудно заметить противодавление.

Попробуйте изменить пример на runWith(Sink.queue), что даст вам SinkQueueWithCancel в качестве материализованного значения. Тогда, если вы не вызовете pull в очереди, поток будет подвергнут обратному давлению и не будет выдавать запросы.

Обратите внимание, что может быть один или несколько начальных запросов, пока обратное давление не распространится по всему потоку.

0 голосов
/ 01 апреля 2019

Думаю, я понял это. Как я уже упоминал при редактировании моего вопроса, я нашел этот комментарий к проблеме в Akka HTTP, где автор говорит:

... просто не рекомендуется смешивать Akka http в более крупном потоке обработки. Вместо этого вам нужна граница вокруг частей потока Akka http, которая гарантирует, что они всегда используют свой ответ, прежде чем разрешить продолжить поток внешней обработки.

Итак, я попытался это сделать: вместо того, чтобы выполнять HTTP-запрос и демаршалинг на разных этапах потока, я напрямую отменял маршализацию ответа, flatMap вставляя Future[HttpResponse] в Future[Multipart.General]. Это гарантирует, что HttpResponse используется напрямую, и позволяет избежать ошибок Response entity was not subscribed after 1 second. Функция crawl теперь выглядит немного иначе, потому что она должна возвращать немаршалированный объект Multipart.General (для дальнейшей обработки), а также исходный HttpResponse (чтобы иметь возможность построить следующий запрос из заголовков):

def crawl(reqOption: Option[HttpRequest])(implicit actorSystem: ActorSystem, materializer: Materializer, executionContext: ExecutionContext): Future[Option[(Option[HttpRequest], (HttpResponse, Multipart.General))]] = {
  reqOption match {
    case Some(request) =>
      Http().singleRequest(request)
        .flatMap(response => Unmarshal(response).to[Multipart.General].map(multipart => (response, multipart)))
        .map {
          case tuple@(response, multipart) =>
            if (response.status.isFailure()) Some((None, tuple))
            else nextRequest(response, HttpMethods.GET).map { case (req, res) => (req, (res, multipart)) }
        }
    case None => Future.successful(None)
  }
}

Из-за этого остальная часть кода должна измениться. Я создал еще одну сущность , которая содержит эквивалентный код, подобный сущности из исходного вопроса.

Я ожидал, что два проекта Akka будут лучше интегрированы (документы не упоминают об этом ограничении в данный момент, и вместо этого HTTP API, кажется, побуждает пользователя использовать Akka HTTP и Akka Streams вместе), так что это выглядит как немного как обходной путь, но это решает мою проблему на данный момент. Мне все еще нужно выяснить некоторые другие проблемы, с которыми я сталкиваюсь при интеграции этой части в мой более широкий вариант использования, но здесь это не является частью этого вопроса.

...