Я пишу пример приложения, которое обрабатывает растровое изображение.Процесс может управляться с помощью ползунка, поэтому при изменении положения ползунка я создаю еще одно растровое изображение.
Когда пользователь перетаскивает ползунок, он генерирует около 10-20 событий в секунду.Обработка растрового изображения занимает около 1 секунды, поэтому очередь обработки быстро застревает с запросами.
Мне кажется, это хороший пример противодавления, но я не мог понять, как использовать такие вещи, как Flowable
и BackpressureStrategy
для правильной работы.Более того, я не смог заставить этот небольшой пример работать:
val pubsub = PublishSubject.create<Int>()
pubsub
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(computation())
.subscribe {
Timber.d("consume %d - %s", it, Thread.currentThread().name)
Thread.sleep(3000)
}
for (i in 0 .. 1000) {
Timber.d("emit %d - %s", i, Thread.currentThread().name)
pubsub.onNext(i)
}
Ну, я ожидаю, что этот код будет выдавать 1000 целых чисел через PublishSubject
, но, пока обработка каждого занимает 3 секунды, 999 целых чисел должныбыть отброшенным, должны быть обработаны только "0" и "1000" ...
Но в журналах я вижу, что все мои целые числа обрабатываются медленно, один за другим, и стратегия противодавления игнорируется.На самом деле выражение toFlowable(...)
, похоже, ничего не делает.С противодавлением или без него я вижу 1000 выбросов, за которыми следуют несколько минут потребления.
Что мне здесь не хватает?Как я могу отбросить промежуточные элементы и потреблять только последние доступные?
решено :
observeOn(computation())
на самом деле observeOn(computation(), delayErrors = false, bufferSize = 128)
.Чтобы увидеть реальное противодавление, уменьшите bufferSize при вызове observeOn(...)