Я пытаюсь реализовать пейджинг с использованием Akka Streams. В настоящее время у меня есть
case class SomeObject(id:Long, next_page:Option[Map[String,String]])
def chainRequests(uri: Uri): Future[Option[(Uri, T)]] = {
if (uri.isEmpty) return Future.successful(None)
val response: Future[Response[T]] = sendWithRetry(prepareRequest(HttpMethods.GET, uri)).flatMap(unmarshal)
response.map { resp =>
resp.next_page match {
case Some(next_page) => Some(next_page("uri"), resp.data)
case _ => Some(Uri.Empty, resp.data)
}
}
}
Source.single(SomeObject).map(Uri(s"object/${_.id}")).map(uri => Source.unfoldAsync(url)(chainRequest)).map(...some processing goes here)
Проблема заключается в том, что если я делаю source.take (1000), а подкачка страниц содержит много элементов (страниц), то нисходящий поток не получает новые элементы, пока не завершится Source.unfoldAsync.
Я пытался использовать циклы в потоках, как
val in = builder.add(Flow[Uri])
val out = builder.add[Flow[T]]
val partition = b.add(Partition[Response[T]](2,r => r.next_page match {case Some(_)=>1; case None => 0}))
val merge = b.add(Merge[Response[T]],2)
in ~> mergeUri ~> sendRequest ~> partition
mergeUri.preferred <~ extractNextUri <~ partition.out(1)
partition.out(0) ~> Flow[Response[T]].map(_.data) ~> out
FlowShape(in.in, out.out)
Но приведенный выше код не работает.
Я застрял в создании собственного GraphStage. UnfoldAsync берет первый элемент, но в решении Flow у меня нет «первого» элемента. Какие-либо предложения?
Спасибо