Akka Streams - Создание потока, который излучается из источника при получении ввода - PullRequest
0 голосов
/ 18 октября 2018

У меня есть Source, который предоставляет элементы типа A.

И у меня есть Flow, который получает элементы типа B.

То, что я хотел бы сделать, этокогда поток получает вход, следующий элемент из источника излучается как выход потока.

То, как я делаю это в настоящее время, заключается в том, что источник подключен к Sink.queue.Затем для каждого элемента в потоке я сопоставляю его, отбрасывая входные данные и извлекая следующее значение из очереди.Как только очередь пуста, я завершаю поток.

Я чувствую, что должен быть более простой способ, который я пропускаю.Вероятно, существует некоторый встроенный механизм, позволяющий входу запускать элемент из источника.

Например:

val source = ... some akka streams source

val queue = source.grouped(limit.toInt).runWith(Sink.queue[Seq[DataFrame]])

Flow[Message]
  .prepend(Source.single(TextMessage.Strict("start")))
  .collect {
    case TextMessage.Strict(text) => Future.successful(text)
    case TextMessage.Streamed(textStream) => textStream.runFold("")(_ + _).flatMap(Future.successful)
  }
  // swallow the future of the incoming message's text
  .mapAsync(1)(identity)
  // take the next batch
  .mapAsync(1)(_ => queue.pull())
  // swallow the option monad, and add in an end or page-end message
  .collect {
  case Some(batch) if batch.size == limit => batch.toList :+ pageend
  case Some(batch) => batch.toList :+ end
  case None => List(end)
}
  // flatten out the frames
  .mapConcat(identity)

end и pageend - это просто специальные кадры, которыепользовательский интерфейс использует.Ключевая часть вопроса связана с этим использованием очереди.

...