Последовательно обрабатывать отправку элементов в очередь с помощью fs2 Stream Scala - PullRequest
0 голосов
/ 16 июня 2020

Последовательная обработка в очереди. В fs2.Stream есть различные методы для управления испусканием элементов. fixedRate, fixedDelay, awakeEvery, awakeDelay. Я хочу последовательную обработку. например, никакие элементы не помещаются в очередь до тех пор, пока не будет извлечен элемент внутри нее.

Я обнаружил, что методы fixedDelay позволяют это делать, но когда я их использовал, это не работает так, как ожидалось. Вот что я пробовал:

import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2._
import fs2.concurrent.Queue

import scala.concurrent.duration._
import scala.util.Random
class Tst(q1: Queue[IO, (Double, IO[Long])])(
  implicit timer: Timer[IO]
) {

  val streamData = Stream.emit(1)
  val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData

  def storeInQueue: Stream[IO, Unit] = {
    scheduledStream
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
      .map { n =>
        val entryTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n.toDouble, entryTime)
      }
      .through(q1.enqueue)
  }
  def getFromQueue: Stream[IO, Unit] = {
    q1.dequeue
      .evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
      .map { n =>
        val exitTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n._1, exitTime)
      }
      .evalMap(n => IO.delay(println(s"Pulling from queue $n")))
  }
}

object Five2 extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    val program = for {
      q <- Queue.bounded[IO, (Double, IO[Long])](1)
      b = new Tst(q)
      _ <- b.storeInQueue.compile.drain.start

      _ <- b.getFromQueue.compile.drain

    } yield ()
    program.as(ExitCode.Success)
  }
}

Получаю:

Pushing 1 to Queue
Pushing 1 to Queue
Pulling from queue (1.0,IO$301748227)
Pushing 1 to Queue
Pulling from queue (1.0,IO$914911384)
Pushing 1 to Queue
Pulling from queue (1.0,IO$1022209005)
Pushing 1 to Queue
Pushing 1 to Queue
Pulling from queue (1.0,IO$1801897948)

Ожидаемый результат:

Pushing 1 to Queue
Pulling from queue (1.0,IO$1022209005)
Pushing 1 to Queue
Pulling from queue (1.0,IO$1801897948)
Pushing 1 to Queue
Pulling from queue (1.0,IO$1801897948)
...