Я экспериментирую с fs2.Stream
одновременными функциями и получил некоторое недопонимание о том, как это работает. Я хотел бы отправить потоковое содержимое через какой-то приемник параллельно. Вот что я попробовал:
object TestParallelStream extends App {
val secondsOnStart = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
val stream = fs2.Stream.emits(List(1, 2, 3, 4, 5, 6, 7, 8, 9)).covary[IO]
val sink: fs2.Sink[IO, Int] = _.evalMap(i => IO {
println(s"[${TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart} second]: $i")
Thread.sleep(5000)
})
val executor = Executors.newFixedThreadPool(4)
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.fromExecutor(executor))
stream.observeAsync(3)(sink).compile.drain.unsafeRunSync() //1
executor.shutdown()
}
//1
печатает следующее содержимое:
[1 second]: 1
[6 second]: 2
[11 second]: 3
[16 second]: 4
[21 second]: 5
[26 second]: 6
[31 second]: 7
[36 second]: 8
[41 second]: 9
Как видно из вывода, каждый элемент отправляется через sink
последовательно.
Но если я изменю приемник следующим образом:
// 5 limit and parEvalMap
val sink: fs2.Sink[IO, Int] = _.parEvalMap(5)(i => IO {
println(s"[${TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart} second]: $i")
Thread.sleep(5000)
})
Вывод:
[1 second]: 3
[1 second]: 2
[1 second]: 4
[1 second]: 1
[6 second]: 5
[6 second]: 6
[6 second]: 7
[6 second]: 8
[11 second]: 9
Теперь у нас есть 4 элемента, которые отправляются через приемник параллельно одновременно (несмотря на установку 3
в качестве ограничения observerAsync
).
Даже если я заменим observerAsync
на observe
, я получу тот же эффект распараллеливания.
Не могли бы вы уточнить, как на самом деле работают раковины?