У меня есть Source
, который предоставляет элементы типа A.
И у меня есть Flow
, который получает элементы типа B.
То, что я хотел бы сделать, этокогда поток получает вход, следующий элемент из источника излучается как выход потока.
То, как я делаю это в настоящее время, заключается в том, что источник подключен к Sink.queue
.Затем для каждого элемента в потоке я сопоставляю его, отбрасывая входные данные и извлекая следующее значение из очереди.Как только очередь пуста, я завершаю поток.
Я чувствую, что должен быть более простой способ, который я пропускаю.Вероятно, существует некоторый встроенный механизм, позволяющий входу запускать элемент из источника.
Например:
val source = ... some akka streams source
val queue = source.grouped(limit.toInt).runWith(Sink.queue[Seq[DataFrame]])
Flow[Message]
.prepend(Source.single(TextMessage.Strict("start")))
.collect {
case TextMessage.Strict(text) => Future.successful(text)
case TextMessage.Streamed(textStream) => textStream.runFold("")(_ + _).flatMap(Future.successful)
}
// swallow the future of the incoming message's text
.mapAsync(1)(identity)
// take the next batch
.mapAsync(1)(_ => queue.pull())
// swallow the option monad, and add in an end or page-end message
.collect {
case Some(batch) if batch.size == limit => batch.toList :+ pageend
case Some(batch) => batch.toList :+ end
case None => List(end)
}
// flatten out the frames
.mapConcat(identity)
end
и pageend
- это просто специальные кадры, которыепользовательский интерфейс использует.Ключевая часть вопроса связана с этим использованием очереди.