У меня есть поток данных, который я случайно регистрирую, но в определенном состоянии моей программы мне нужны данные из потока, не последние, наблюдаемые до этого момента (я могу это сделать), но новейшие после :
val dataStream : Observable[Data] = ...
dataStream
.doOnNext(logger.trace(_))
.subscribe()
// ...
// at some state of program later:
// ...
def awaitOneElement(Observable[Data]): Data = ???
val expecting: Data = awaitOneElement(dataStream.filter(...))
Как мне реализовать def awaitOneElement(Observable[Data]): Data = ???
?
Я понимаю, что это, вероятно, идиоматически неверно, но это грязное синхронное ожидание - именно то, что мне нужно. Я тоже в порядке с Observable[Data] => Future[Data]
, на следующем шаге добавлю Await
.