Как уже отмечали другие, вам действительно стоит взглянуть на настоящую потоковую библиотеку, такую как fs2
или monix
. Лично я считаю, что monix хорошо подходит, если вы взаимодействуете с Future
и нуждаетесь в нем только в небольшой части вашего приложения. Он имеет отличные API и документацию для этого варианта использования.
Вот небольшая демонстрация для вашего варианта использования:
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import scala.concurrent.duration._
import scala.util.Random
// requires: libraryDependencies += "io.monix" %% "monix" % "3.0.0"
object Main {
val searchParams = (1 to 200).map(n => s"Search $n")
/**
* Simulates a query. If your library returns a Future, you can wrap it with `Task.deferFuture`
*/
def search(param: String): Task[String] =
Task(s"Result for $param").delayResult(Random.between(25, 250).milliseconds)
val results: Task[List[String]] =
Observable
.fromIterable(searchParams)
.mapParallelUnordered(parallelism = 4)(param => search(param))
.mapEval { result =>
Task(println(result)).map(_ => result) // print intermediate results as feedback
}
.toListL // collect results into List
/**
* If you aren't going all-in on monix, you probably run the stream into a Future with `results.runToFuture`
*/
def main(args: Array[String]): Unit = results.map(_ => ()).runSyncUnsafe()
}
Вы можете думать о Task
как о ленивом и более мощномFuture
. Observable
- (реактивный) поток, который автоматически обратит давление, если нисходящий поток медленный. В этом примере только 4 запроса будут выполняться параллельно, а другой будет ждать, пока «слот» не станет доступным для выполнения. Помните, что в этих библиотеках побочные эффекты (например, println
должны быть заключены в Task
(или IO
в зависимости от того, что вы используете).
Этот пример можно запустить локально, если вы предоставитезависимость от моникса и поэкспериментируйте с ней, чтобы понять, как она работает.