RxJava - как увидеть текучесть и противодавление в действии? - PullRequest
0 голосов
/ 22 января 2019

Я пишу пример приложения, которое обрабатывает растровое изображение.Процесс может управляться с помощью ползунка, поэтому при изменении положения ползунка я создаю еще одно растровое изображение.

Когда пользователь перетаскивает ползунок, он генерирует около 10-20 событий в секунду.Обработка растрового изображения занимает около 1 секунды, поэтому очередь обработки быстро застревает с запросами.

Мне кажется, это хороший пример противодавления, но я не мог понять, как использовать такие вещи, как Flowable и BackpressureStrategy для правильной работы.Более того, я не смог заставить этот небольшой пример работать:

val pubsub = PublishSubject.create<Int>()

pubsub
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(computation())
.subscribe {
      Timber.d("consume %d - %s", it, Thread.currentThread().name)
      Thread.sleep(3000)
}

for (i in 0 .. 1000) {
      Timber.d("emit %d - %s", i, Thread.currentThread().name)
      pubsub.onNext(i)
}

Ну, я ожидаю, что этот код будет выдавать 1000 целых чисел через PublishSubject, но, пока обработка каждого занимает 3 секунды, 999 целых чисел должныбыть отброшенным, должны быть обработаны только "0" и "1000" ...

Но в журналах я вижу, что все мои целые числа обрабатываются медленно, один за другим, и стратегия противодавления игнорируется.На самом деле выражение toFlowable(...), похоже, ничего не делает.С противодавлением или без него я вижу 1000 выбросов, за которыми следуют несколько минут потребления.

Что мне здесь не хватает?Как я могу отбросить промежуточные элементы и потреблять только последние доступные?

решено :

observeOn(computation()) на самом деле observeOn(computation(), delayErrors = false, bufferSize = 128).Чтобы увидеть реальное противодавление, уменьшите bufferSize при вызове observeOn(...)

1 Ответ

0 голосов
/ 22 января 2019

Это может быть связано с observeOn(computation()). В зависимости от вспомогательной резьбы это может быть автоматически уменьшено. Эмиссия предметов ставится в очередь. Поэтому на Flowable.

обратное давление отсутствует.

Попробуйте поместить эти изменения темы до toFlowable(LATEST) или используйте другой Scheduler, который не так прост, или поместите еще больше элементов в pubsub.

Также вы можете использовать observeOn(Scheduler scheduler, boolean, int) для принудительного применения bufferSize.

...