Как получить следующий элемент из наблюдаемой monix? - PullRequest
1 голос
/ 21 февраля 2020

У меня есть поток данных, который я случайно регистрирую, но в определенном состоянии моей программы мне нужны данные из потока, не последние, наблюдаемые до этого момента (я могу это сделать), но новейшие после :

val dataStream : Observable[Data] = ...
dataStream
 .doOnNext(logger.trace(_))
 .subscribe()

// ...
// at some state of program later:
// ...
def awaitOneElement(Observable[Data]): Data = ???
val expecting: Data = awaitOneElement(dataStream.filter(...))

Как мне реализовать def awaitOneElement(Observable[Data]): Data = ????

Я понимаю, что это, вероятно, идиоматически неверно, но это грязное синхронное ожидание - именно то, что мне нужно. Я тоже в порядке с Observable[Data] => Future[Data], на следующем шаге добавлю Await.

Ответы [ 2 ]

0 голосов
/ 22 февраля 2020

Мое решение:

  implicit val scheduler                     = mx
  val pendingPromise: AtomicReference[Option[Promise[A]]] = new AtomicReference(None)
  val lastSubject: ConcurrentSubject[A, A]   = ConcurrentSubject.publish[A]

  o.doOnNext(a => if (pendingPromise.get().nonEmpty) lastSubject.onNext(a))
    .subscribe()

  lastSubject
    .filter(filter)
    .doOnNext(a => pendingPromise.getAndSet(None).map(_.success(a)))
    .subscribe()

  def getNext(duration: Duration): Try[A] = {
    val promise = Promise[A]()
    pendingPromise.set(Some(promise))
    Try(Await.result(promise.future, duration))
  }
}

Когда вызывается getNext, создается Promise и возвращается будущее. Когда в Observable происходит желаемое событие, обещание заполняется и удаляется.

0 голосов
/ 21 февраля 2020

Если вам нужен ровно один элемент, вы можете использовать что-то вроде Consumer.head или Consumer.headOption:

val first: Task[Option[Int]] =
  Observable.empty[Int].consumeWith(Consumer.headOption)]

И теперь вы можете конвертировать задачу в будущее. Если вам нужны все элементы, вы можете использовать foldLeft с аккумулятором:

Consumer.foldLeft[T,Vector[T]](Vector.empty[T])((vec, t) => vec :+ t )

или реализовать пользовательский и обратный вызов при необходимости. Взято из документов: https://monix.io/docs/3x/reactive/consumer.html

UPD: для последнего элемента вы можете использовать такой потребитель:

Consumer.foldLeft[T,Option[T]](Option.empty[T])((opt, t) => Some(t))
...