IO вычисляется лениво - чтобы что-то было выполнено, оно должно быть частью выражения, которое создало окончательное значение IO.
Здесь:
def storeInQueue: Stream[IO, Unit] = {
scheduledStream ... // no side effects are run when we create this!
q1.dequeue ... // not using scheduledStream
}
value scheduledStream
is вообще не используется, поэтому это не «часть» значения, возвращаемого из storeInQueue
, поэтому, когда IOApp
превращает значение ввода-вывода в вычисления, рецепт вашей программы не содержит части, в которой сообщения помещаются в очередь, поэтому очередь всегда пуста.
Часть, которая подписывается на очередь, работает, но поскольку ничего никогда не попадает в очередь, она продолжает ждать элементов, которые никогда не будут доставлены.
Вам придется начать оба потока, «сделав их частью одного значения ввода-вывода», например:
class Tst(q1: Queue[IO, (Double, IO[Long])])(implicit timer: Timer[IO]) {
val streamData = Stream.emit(1)
val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData
def storeInQueue =
scheduledStream
.map { n =>
val entryTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n.toDouble, entryTime)
}
.through(q1.enqueue)
.evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
def takeFromQueue =
q1.dequeue
.evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
.map { n =>
val exitTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n._1, exitTime)
}
.evalMap(n => IO.delay(println(s"Pulling from queue $n")))
}
}
object Five2 extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, (Double, IO[Long])](1)
b = new Tst(q)
pushFiber <- b.storeInQueue.compile.drain.start // run as fiber
pullFiber <- b.takeFromQueue.compile.drain.start // run as fiber
} yield ()
program.as(ExitCode.Success)
}
}