Я пытаюсь решить классическую проблему ETL с использованием потоковой передачи.У меня есть группа сегментов, каждый сегмент содержит информацию о записях, связанных с этим сегментом, таких как количество записей, URL-адрес для извлечения и т. Д., Для выдачи http-запроса на сбор данных.Мне нужно извлечь записи из источника с размером подкачки 100 записей, объединить страницы записей для каждого сегмента, обернуть в верхний и нижний колонтитулы xml.Теперь отправьте каждую полезную нагрузку xml на сегмент целевому объекту.
{http}
page 1
/ \
seg 1 > page 2 -> merge -> wrapHeaderAndFooter -> http target
/ \ /
/ page n
/
/
batch - seg 2 " -> http target
\ seg n " -> http target
val loadSegment: Flow[Segment, Response, NotUsed] = {
Flow[Segment].mapAsync(parallelism = 5) { segment =>
val pages: Source[ByteString, NotUsed] = pagedPayload(segment).map(page => page.payload)
//Using source concatenation to prepend and append
val wrappedInXML: Source[ByteString, NotUsed] = xmlRootStartTag ++ pages ++ xmlRootEndTag
val httpEntity: HttpEntity = HttpEntity(MediaTypes.`application/octet-stream`, pages)
invokeTargetLoad(httpEntity, request, segment)
}
}
def pagedPayload(segment: Segment): Source[Payload, NotUsed] = {
val totalPages: Int = calculateTotalPages(segment.instanceCount)
Source(0 until totalPages).mapAsyncUnordered(parallelism = 5)(i => {
sendPayloadRequest(request, segment, i).mapTo[Try[Payload]].map(_.get)
})
}
val batch: Batch = someBatch
Source(batch.segments)
.via(loadSegment)
.runWith(Sink.ignore)
.andThen {
case Success(value) => log("success")
case Failure(error) => report(error)
}
Есть ли лучший подход?Я пытаюсь использовать кодировку HttpEntity.Chunked
для потоковой передачи страниц.Иногда первый запрос от источника может занять больше времени из-за прогрева, и цель усекает поток без данных.Есть ли способ отложить фактическое соединение с целью, пока у нас не будет первой страницы в потоке?
Я бы больше хотел сделать что-то вроде ниже.если это возможно, как реализовать методы wrapXMLHeader & toHttpEntity
val splitPages: Flow[BuildSequenceSegment, Seq[PageRequest], NotUsed] = ???
val requestPayload: Flow[Seq[PageRequest], Seq[PageResponse], NotUsed] = ???
val wrapXMLHeader: Flow[Seq[PageResponse], Seq[PageResponse], NotUsed] = ???
val toHttpEntity: Flow[Seq[PageResponse], HttpEntity.Chunked, NotUsed] = ???
val invokeTargetLoad: Flow[HttpEntity.Chunked, RestResponse, NotUsed] = ???
Source(batch.segments)
.via(splitPages)
.via(requestPayload)
.via(wrapXMLHeader)
.via(toHttpEntity)
.via(invokeTargetLoad)
.runWith(Sink.ignore)