`DifferentUntilChanged`, как разрешить один и тот же элемент при вызове onNext - PullRequest
0 голосов
/ 06 декабря 2018

У меня есть наблюдаемая, которая непрерывно испускает элементы, и мне нужно обрабатывать каждый из них (функция процесса занимает некоторое время).Таким образом, в то же время при обработке элемента, если другой элемент излучает с тем же значением, я могу игнорировать его, поскольку он уже выполняется.Но как только текущий элемент обработан (и вызван onNext).и позже, если придет тот же запрос, я должен это разрешить.Я использовал оператор distinctUntildChanged, но я вижу, что он не позволит, если текущий элемент совпадает с последним, даже если последний элемент завершил обработку и вызвал onNext.

IУ меня есть образец, чтобы продемонстрировать проблему

У меня есть класс

class User {
    String id;
    String name;

    public User(String id, String name) {
        this.id = id;
        this.name = name;
    }

    @Override
    public boolean equals(Object obj) {
        User obj1 = (User) obj;
        return id.equals(obj1.id);
    }

    @Override
    public String toString() {
        return name;
    }
}

И наблюдаемый (субъект)

Subject<User> mSubject = PublishSubject.create();

И моя цепочка наблюдаемых -

 mSubject
.doOnNext(i -> Log.d(TAG, "emitted: " + i))
.observeOn(Schedulers.io())
.distinctUntilChanged()
.map(user -> {
    Log.d(TAG, "processing " + user);
    Thread.sleep(5000); // processing takes 5 seconds
    return user;
}).subscribe(user -> Log.d(TAG, "onNext: " + user.name));    

И я выдаю такие значения

    for (int i = 0; i < 20; i++) {
        Thread.sleep(1000);            
        mSubject.onNext(new User(String.valueOf(1), "User " + i)); // all `User`s have same id
    }

В результате получается

emitted: User 0
processing User 0
emitted: User 1
emitted: User 2
emitted: User 3
emitted: User 4
onNext: User 0
emitted: User 5
emitted: User 6
emitted: User 7
emitted: User 8
emitted: User 9
emitted: User 10
emitted: User 11
emitted: User 12
emitted: User 13
emitted: User 14
emitted: User 15
emitted: User 16
emitted: User 17
emitted: User 18
emitted: User 19

Все объекты здесь одинаковые (метод проверки идентификаторов равен id).Как вы можете видеть, это заняло user0 в первый раз и заняло 5 секунд для обработки, в течение этого времени я могу игнорировать входящие элементы, но после этого onNext: User 0 я должен разрешить тот же запрос пользователя, но distinctUntilChanged не позволяеттак как его кодирование последнего значения равно тому же пользователю, как я могу это сделать?Надеюсь, мой вопрос ясен.

Ответы [ 2 ]

0 голосов
/ 06 декабря 2018

Таким образом, вы можете достичь этого с Flowable и правильным BackpressureStrategy.Проблема в том, что вы не устанавливаете размер буфера при выполнении observeOn.Вы можете попробовать это (хотя Kotlin):

Observable.interval(100, TimeUnit.MILLISECONDS)
    .doOnNext { println("emitting $it") }
    .toFlowable(BackpressureStrategy.LATEST)
    .observeOn(Schedulers.io(), false,1)
    .subscribeOn(Schedulers.io())
    .subscribe {
        println("consuming $it")
        Thread.sleep(500)
    }

Вывод будет выглядеть следующим образом:

emitting 0
consuming 0
emitting 1
emitting 2
emitting 3
emitting 4
emitting 5
consuming 5
emitting 6
emitting 7
emitting 8
emitting 9
emitting 10
consuming 10
emitting 11
emitting 12
emitting 13
emitting 14

При вызове observeOn(Scheduler) размер буфера по умолчанию для противодавления долженбыть 128, если я не ошибаюсь.

Вы можете попробовать изменить размер буфера в приведенном выше примере, скажем, на 3. Вы получите:

emitting 0
consuming 0
emitting 1
...
emitting 5
consuming 1
emitting 6
...
emitting 10
consuming 2
emitting 11
...
emitting 15
consuming 15
emitting 16
...
emitting 20
consuming 16
emitting 21
...
0 голосов
/ 06 декабря 2018

Вы можете использовать оператор groupBy() для разделения запросов по User.В каждой наблюдаемой вы можете организовать обработку только самой последней эмиссии.

mSubject
  .doOnNext(i -> Log.d(TAG, "emitted: " + i))
  .observeOn(Schedulers.io())
  .groupBy( user -> user )
  .flatMap( userObserverable -> userObservable
                                  .onBackpressureDrop()
                                  .map(user -> {
                                    Log.d(TAG, "processing " + user);
                                    Thread.sleep(5000); // processing takes 5 seconds
                                    return user;
                                  })
  .subscribe(user -> Log.d(TAG, "onNext: " + user.name));

Оператор groupBy() создает одну наблюдаемую для каждого пользователя.Для нового пользователя будет создана новая наблюдаемая.Каждый пользователь будет излучаться по своему собственному наблюдаемому значению, и onBackpressureDrop() удалит пользователя, если нисходящий поток не принимает выбросы.

...