Я сейчас пытаюсь прочитать постраничный ресурс HTTP. Каждая страница является многокомпонентным документом, и ответ для этой страницы содержит ссылку next
в заголовках, если есть страница с большим содержанием. Затем автоматический анализатор может начать с самой старой страницы, а затем читать страницу за страницей, используя заголовки для создания запроса на следующую страницу.
Я использую Akka Streams и Akka Http для реализации, потому что моя цель - создать потоковое решение. Я придумал это (я включу здесь только соответствующие части кода, не стесняйтесь взглянуть на эту суть для всего кода):
def read(request: HttpRequest): Source[HttpResponse, _] =
Source.unfoldAsync[Option[HttpRequest], HttpResponse](Some(request))(Crawl.crawl)
val parse: Flow[HttpResponse, General.BodyPart, _] = Flow[HttpResponse]
.flatMapConcat(r => Source.fromFuture(Unmarshal(r).to[Multipart.General]))
.flatMapConcat(_.parts)
....
def crawl(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = reqOption match {
case Some(req) =>
Http().singleRequest(req).map { response =>
if (response.status.isFailure()) Some((None, response))
else nextRequest(response, HttpMethods.GET)
}
case None => Future.successful(None)
}
Таким образом, общая идея состоит в том, чтобы использовать Source.unfoldAsync
для обхода страниц и выполнения HTTP-запросов (Идея и реализация очень близки к тому, что описано в этом ответе . Это создаст Source[HttpResponse, _]
, который затем можно использовать (Unmarshal to Multipart, разделить на отдельные части, ...).
Моя проблема сейчас заключается в том, что потребление HttpResponse
может занять некоторое время (Разборка занимает некоторое время, если страницы большие, возможно, в конце будут некоторые запросы к базе данных, чтобы сохранить некоторые данные, ...) , Поэтому я бы хотел, чтобы Source.unfoldAsync
давил обратное давление, если нисходящий поток медленнее. По умолчанию следующий HTTP-запрос будет запущен сразу после завершения предыдущего.
Итак, мой вопрос: есть ли какой-нибудь способ создать Source.unfoldAsync
противодавление на медленном спуске? Если нет, есть ли альтернатива, которая делает возможным противодавление?
Я могу представить себе решение, которое использует клиентский API на уровне хоста, который предоставляет akka-http, как описано здесь вместе с циклическим графом, где ответ на первый запрос будет использоваться как вход для генерации второго запроса, но я еще не пробовал, и я не уверен, может ли это сработать.
РЕДАКТИРОВАТЬ: После нескольких дней игры и чтения документации и некоторых блогов, я не уверен, был ли я на правильном пути с моим предположением, что поведение противодавления Source.unfoldAsync
является первопричина. Чтобы добавить еще несколько замечаний:
- Когда поток запускается, я вижу, как выходит несколько запросов. Во-первых, это не проблема, если полученный
HttpResponse
своевременно потребляется (см. здесь для описания)
- Если я не изменю значение по умолчанию
response-entity-subscription-timeout
, я столкнусь со следующей ошибкой (я удалил URL):
[WARN] [03/30/2019 13:44:58.984] [default-akka.actor.default-dispatcher-16] [default/Pool(shared->http://....)] [1 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 seconds. Make sure to read the response entity body or call discardBytes() on it. GET ... Empty -> 200 OK Chunked
Это приводит к IllegalStateException
, который завершает поток: java.lang.IllegalStateException: Substream Source cannot be materialized more than once
- Я заметил, что демаршаллинг ответа является самой медленной частью в потоке, что может иметь смысл, потому что тело ответа является многокомпонентным документом и, следовательно, относительно большим. Тем не менее, я ожидал бы, что эта часть потока будет сигнализировать о меньшем спросе в восходящем направлении (что является частью
Source.unfoldAsync
в моем случае). Это должно привести к тому, что будет сделано меньше запросов.
- Гугл привел меня к дискуссии о проблеме, которая, кажется, описывает похожую проблему . Они также обсуждают проблемы, возникающие, когда ответ обрабатывается недостаточно быстро. связанный запрос на слияние принесет изменения документации, предлагающие полностью использовать
HttpResponse
перед продолжением потока. При обсуждении вопроса также возникают сомнения относительно , является ли хорошей идеей объединить Akka Http с Akka Streams . Поэтому, возможно, мне придется изменить реализацию, чтобы напрямую выполнять демаршаллинг внутри функции, вызываемой unfoldAsync
.