Использование `onBackpressureLatest` для отбрасывания промежуточных сообщений при блокировке Flowable - PullRequest
0 голосов
/ 10 апреля 2019

У меня есть цепочка, где я делаю некоторые блокирующие вызовы ввода-вывода (например, HTTP-вызов).Я хочу, чтобы блокирующий вызов потреблял значение, продолжал без прерывания, но отбрасывал все накопившееся время, а затем использовал следующее значение таким же образом.

Рассмотрим следующий пример:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureLatest().map {
    Thread.sleep(1000)
    it
  }.blockingForEach { println(it) }
}

С наивной точки зрения, я бы ожидал напечатать что-то вроде 0, 10, 20, ..., но он печатает 0, 1, 2, ....

Что я делаю не так?

РЕДАКТИРОВАТЬ:

Я думал о наивном добавлении debounce, чтобы съесть входящий поток:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS)
    .debounce(0, TimeUnit.MILLISECONDS)
    .map {
      Thread.sleep(1000)
      it
    }
    .blockingForEach { println(it) }
}

Но теперь я получаю java.lang.InterruptedException: sleep interrupted.

РЕДАКТИРОВАТЬ:

Кажется, что работает следующее:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS)
    .throttleLast(0, TimeUnit.MILLISECONDS)
    .map {
      Thread.sleep(1000)
      it
    }
    .blockingForEach { println(it) }
}

Вывод, как и ожидалось 0, 10, 20, ... !!

Это правильный путь?

Iотметил, что throttleLast переключится на Computation-Scheduler.Есть ли способ вернуться к исходному планировщику?

РЕДАКТИРОВАТЬ:

Я также иногда получаю java.lang.InterruptedException: sleep interrupted с этим вариантом.

1 Ответ

0 голосов
/ 10 апреля 2019

Самый простой подход к решению проблемы:

fun <T> Flowable<T>.lossy() : Flowable<T> {
  return onBackpressureLatest().observeOn(Schedulers.io(), false, 1)
}

При вызове lossy на Flowable он начинает сбрасывать все элементы, которые поступают быстрее, чем может обработать нижестоящий потребитель.

...