почему мой код ничего не возвращает? Scala фс2 - PullRequest
0 голосов
/ 19 июня 2020

Программа позволяет сдвигать Mapping Ints в Double и определять время выхода из очереди. Программа не показывает ошибок, но ничего не печатает. Что мне не хватает?

import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2._
import fs2.concurrent.Queue

import scala.concurrent.duration._
import scala.util.Random
class Tst(q1: Queue[IO, (Double, IO[Long])])(implicit timer: Timer[IO]) {

  val streamData = Stream.emit(1)
  val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData

  def storeInQueue: Stream[IO, Unit] = {
    scheduledStream
      .map { n =>
        val entryTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n.toDouble, entryTime)
      }
      .through(q1.enqueue)
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue")))

    q1.dequeue
      .evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
      .map { n =>
        val exitTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n._1, exitTime)
      }
      .evalMap(n => IO.delay(println(s"Pulling from queue $n")))
  }
}

object Five2 extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    val program = for {
      q <- Queue.bounded[IO, (Double, IO[Long])](1)
      b = new Tst(q)
      _ <- b.storeInQueue.compile.drain

    } yield ()
    program.as(ExitCode.Success)
  }
}

1 Ответ

2 голосов
/ 19 июня 2020

IO вычисляется лениво - чтобы что-то было выполнено, оно должно быть частью выражения, которое создало окончательное значение IO.

Здесь:

  def storeInQueue: Stream[IO, Unit] = {
    scheduledStream ... // no side effects are run when we create this!

    q1.dequeue ... // not using scheduledStream
  }

value scheduledStream is вообще не используется, поэтому это не «часть» значения, возвращаемого из storeInQueue, поэтому, когда IOApp превращает значение ввода-вывода в вычисления, рецепт вашей программы не содержит части, в которой сообщения помещаются в очередь, поэтому очередь всегда пуста.

Часть, которая подписывается на очередь, работает, но поскольку ничего никогда не попадает в очередь, она продолжает ждать элементов, которые никогда не будут доставлены.

Вам придется начать оба потока, «сделав их частью одного значения ввода-вывода», например:

class Tst(q1: Queue[IO, (Double, IO[Long])])(implicit timer: Timer[IO]) {

  val streamData = Stream.emit(1)
  val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData

  def storeInQueue =
    scheduledStream
      .map { n =>
        val entryTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n.toDouble, entryTime)
      }
      .through(q1.enqueue)
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue")))

  def takeFromQueue =
    q1.dequeue
      .evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
      .map { n =>
        val exitTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n._1, exitTime)
      }
      .evalMap(n => IO.delay(println(s"Pulling from queue $n")))
  }
}

object Five2 extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    val program = for {
      q <- Queue.bounded[IO, (Double, IO[Long])](1)
      b = new Tst(q)
      pushFiber <- b.storeInQueue.compile.drain.start // run as fiber
      pullFiber <- b.takeFromQueue.compile.drain.start // run as fiber
    } yield ()
    program.as(ExitCode.Success)
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...