Остановить поток fs2 после тайм-аута - PullRequest
5 голосов
/ 16 мая 2019

Я хочу использовать функцию, аналогичную take(n: Int), но в измерении времени: consume(period: Duration. Поэтому я хочу, чтобы поток прекратился, если истекло время ожидания. Я знаю, что могу скомпилировать поток в что-то вроде IO[List[T]] и отменить его, но тогда я потеряю результат. На самом деле я хочу преобразовать бесконечный поток в ограниченный и сохранить результаты.

Подробнее о более широком масштабе проблемы. У меня есть бесконечный поток событий от брокера обмена сообщениями, но у меня также есть вращающиеся учетные данные для подключения к брокеру. Поэтому я хочу использовать поток событий в течение некоторого времени, затем остановиться, получить новые учетные данные, снова подключиться к брокеру, создав новый поток, и объединить два потока в один.

Ответы [ 2 ]

1 голос
/ 17 мая 2019

Существует метод, который делает именно это:

/**
    * Interrupts this stream after the specified duration has passed.
    */
  def interruptAfter[F2[x] >: F[x]: Concurrent: Timer](duration: FiniteDuration): Stream[F2, O]
1 голос
/ 16 мая 2019

Вам нужно что-то подобное

  import scala.util.Random
  import scala.concurrent.ExecutionContext
  import fs2._
  import fs2.concurrent.SignallingRef
  implicit val ex = ExecutionContext.global
  implicit val t: Timer[IO] = IO.timer(ex)
  implicit val cs: ContextShift[IO] = IO.contextShift(ex)

  val effect: IO[Long] = IO.sleep(1.second).flatMap(_ => IO{
  val next = Random.nextLong()
  println("NEXT: " + next)
  next
 })
 val signal = SignallingRef[IO, Boolean](false).unsafeRunSync()
 val timer = Stream.sleep(10.seconds).flatMap(_ => 
 Stream.eval(signal.set(true)).flatMap(_ => 
 Stream.emit(println("Finish")).covary[IO]))

 val stream = timer concurrently 
 Stream.repeatEval(effect).interruptWhen(signal)

 stream.compile.drain.unsafeRunSync()

Также, если вы хотите сохранить свой результат публикации данных, вам нужно иметь некоторую неограниченную очередь из fs2 для преобразования опубликованных данных в ваш результат через queue.stream

...