Как передать поток `Seq [Future [_]]` либо в `Future [Stream [_]]`, либо в "Stream [_]" так, чтобы он мог потребляться по мере его поступления? - PullRequest
1 голос
/ 04 октября 2019

В качестве первой попытки я попытался использовать Await.result на голове Seq, а затем использовать ленивый конструктор #:: Stream. Однако, похоже, что он работает не так хорошо, как ожидалось, поскольку я не нашел способа указать планировщику расставить приоритеты в порядке списка, и компилятор не распознал его как @tailrec.

  implicit class SeqOfFuture[X](seq: Seq[Future[X]]) {
    lazy val stream: Stream[X] =
      if (seq.nonEmpty) Await.result(seq.head) #:: seq.tail.stream
      else Stream.empty
  }

Я пытаюсь это сделать, поскольку Future.collect, кажется, ждет, пока все 1010 * строгие Seq не станут доступны / готовы, чтобы отобразить / отобразить / преобразовать его дальше. (И есть другие вычисления, которые я мог бы начать с потока промежуточных результатов)

(Proto) Пример использования:

val searches = [SearchParam1, SearchParam2..., SearchParam200]
// big queries that take a some 100ms each for ~20s total wait
val futureDbResult = searches.map(search => (quill)ctx.run { query(search) }).stream
// Stuff that should happen as results become available instead of blocking/waiting ~20 seconds before starting
val processedResults = futureDbResult.map(transform).filter(reduce)

// Log?
processedResults.map(result => log.info/log.trace)
//return lazy processedResults list or Future {processedResults}
???

1 Ответ

1 голос
/ 04 октября 2019

Как уже отмечали другие, вам действительно стоит взглянуть на настоящую потоковую библиотеку, такую ​​как fs2 или monix. Лично я считаю, что monix хорошо подходит, если вы взаимодействуете с Future и нуждаетесь в нем только в небольшой части вашего приложения. Он имеет отличные API и документацию для этого варианта использования.

Вот небольшая демонстрация для вашего варианта использования:

import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable

import scala.concurrent.duration._
import scala.util.Random

// requires: libraryDependencies += "io.monix" %% "monix" % "3.0.0"
object Main {

  val searchParams = (1 to 200).map(n => s"Search $n")

  /**
    * Simulates a query. If your library returns a Future, you can wrap it with `Task.deferFuture`
    */
  def search(param: String): Task[String] =
    Task(s"Result for $param").delayResult(Random.between(25, 250).milliseconds)

  val results: Task[List[String]] =
    Observable
      .fromIterable(searchParams)
      .mapParallelUnordered(parallelism = 4)(param => search(param))
      .mapEval { result =>
        Task(println(result)).map(_ => result) // print intermediate results as feedback
      }
      .toListL // collect results into List

  /**
    * If you aren't going all-in on monix, you probably run the stream into a Future with `results.runToFuture`
    */
  def main(args: Array[String]): Unit = results.map(_ => ()).runSyncUnsafe()
}

Вы можете думать о Task как о ленивом и более мощномFuture. Observable - (реактивный) поток, который автоматически обратит давление, если нисходящий поток медленный. В этом примере только 4 запроса будут выполняться параллельно, а другой будет ждать, пока «слот» не станет доступным для выполнения. Помните, что в этих библиотеках побочные эффекты (например, println должны быть заключены в Task (или IO в зависимости от того, что вы используете).

Этот пример можно запустить локально, если вы предоставитезависимость от моникса и поэкспериментируйте с ней, чтобы понять, как она работает.

...