Запланировать вычисление одновременно для всех элементов fs2.Stream - PullRequest
2 голосов
/ 14 июля 2020

У меня есть fs2.Stream, состоящий из некоторых элементов (вероятно, бесконечных), и я хочу запланировать некоторые вычисления для всех элементов потока одновременно друг с другом. Вот что я пробовал

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO]     = IO.timer(ExecutionContext.global)

val stream = for {
  id <- fs2.Stream.emits(List(1, 2)).covary[IO]
  _ <- fs2.Stream.awakeEvery[IO](1.second)
  _ <- fs2.Stream.eval(IO(println(id)))
} yield ()

stream.compile.drain.unsafeRunSync()

Результат программы выглядит как

1
1
1
etc...

, что не соответствует ожиданиям. Я хотел бы чередовать запланированные вычисления для всех элементов исходного потока, но не дожидаться завершения первого потока (чего никогда не происходит из-за бесконечного планирования).

Ответы [ 2 ]

1 голос
/ 14 июля 2020
val str = for {
  id <- Stream.emits(List(1, 5, 7)).covary[IO]
  res = timer.sleep(id.second) >> IO(println(id))
} yield res

val stream =  str.parEvalMapUnordered(5)(identity)

stream.compile.drain.unsafeRunSync()

или

 val stream = Stream.emits(List(1, 5, 7))
   .map { id => 
     Stream.eval(timer.sleep(id.second) >> IO(println(id))) }
   .parJoinUnbounded

stream.compile.drain.unsafeRunSync()
1 голос
/ 14 июля 2020

Согласно подсказкам @ KrzysztofAtłasik и @ LuisMiguelMejíaSuárez, вот решение, которое я только что придумал:

val originalStream = fs2.Stream.emits(List(1, 2))

val scheduledComputation = originalStream.covary[IO].map({ id =>
        fs2.Stream.awakeEvery[IO](1.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten

Решение, которое @ KrzysztofAtłasik предложил в комментарии с чередованием id <- fs2.Stream.emits(List(1, 2)).covary[IO] и _ <- fs2.Stream.awakeEvery[IO](1.second) также работает, но не позволяет планировать каждый элемент по-своему.

Чтобы запланировать элементы одновременно на elementValue секунд, можно сделать следующее:

val scheduleEachElementIndividually = originalStream.covary[IO].map({ id =>
                                 //id.seconds
        fs2.Stream.awakeEvery[IO](id.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten
...