Поток элементов, каждый из которых создает {http запрос}> merge {response}> wrapInHeaderAndFooter {data}> http-запрос - PullRequest
1 голос
/ 09 мая 2019

Я пытаюсь решить классическую проблему ETL с использованием потоковой передачи.У меня есть группа сегментов, каждый сегмент содержит информацию о записях, связанных с этим сегментом, таких как количество записей, URL-адрес для извлечения и т. Д., Для выдачи http-запроса на сбор данных.Мне нужно извлечь записи из источника с размером подкачки 100 записей, объединить страницы записей для каждого сегмента, обернуть в верхний и нижний колонтитулы xml.Теперь отправьте каждую полезную нагрузку xml на сегмент целевому объекту.


                 {http}
                 page 1
                 /      \
       seg 1 >  page 2  -> merge -> wrapHeaderAndFooter -> http target 
          /      \      /
         /       page n
        / 
       /
batch -  seg 2                    "                     -> http target
       \ seg n                    "                     -> http target

val loadSegment: Flow[Segment, Response, NotUsed] = {
    Flow[Segment].mapAsync(parallelism = 5) { segment =>
      val pages: Source[ByteString, NotUsed] = pagedPayload(segment).map(page => page.payload)
//Using source concatenation to prepend and append
      val wrappedInXML: Source[ByteString, NotUsed] = xmlRootStartTag ++ pages ++ xmlRootEndTag
      val httpEntity: HttpEntity = HttpEntity(MediaTypes.`application/octet-stream`, pages)
        invokeTargetLoad(httpEntity, request, segment)
    }
  }
def pagedPayload(segment: Segment): Source[Payload, NotUsed] = {
    val totalPages: Int =   calculateTotalPages(segment.instanceCount)
      Source(0 until totalPages).mapAsyncUnordered(parallelism = 5)(i => {
        sendPayloadRequest(request, segment, i).mapTo[Try[Payload]].map(_.get)
      })
  }

val batch: Batch = someBatch
  Source(batch.segments)
    .via(loadSegment)
    .runWith(Sink.ignore)
    .andThen {
      case Success(value) => log("success")
      case Failure(error) => report(error)
    }

Есть ли лучший подход?Я пытаюсь использовать кодировку HttpEntity.Chunked для потоковой передачи страниц.Иногда первый запрос от источника может занять больше времени из-за прогрева, и цель усекает поток без данных.Есть ли способ отложить фактическое соединение с целью, пока у нас не будет первой страницы в потоке?

Я бы больше хотел сделать что-то вроде ниже.если это возможно, как реализовать методы wrapXMLHeader & toHttpEntity

val splitPages: Flow[BuildSequenceSegment, Seq[PageRequest], NotUsed] = ???
  val requestPayload: Flow[Seq[PageRequest], Seq[PageResponse], NotUsed] = ???
  val wrapXMLHeader: Flow[Seq[PageResponse], Seq[PageResponse], NotUsed] = ???
  val toHttpEntity: Flow[Seq[PageResponse], HttpEntity.Chunked, NotUsed] = ???
  val invokeTargetLoad: Flow[HttpEntity.Chunked, RestResponse, NotUsed] = ???

  Source(batch.segments)
    .via(splitPages)
    .via(requestPayload)
    .via(wrapXMLHeader)
    .via(toHttpEntity)
    .via(invokeTargetLoad)
    .runWith(Sink.ignore)
...