У меня есть наблюдаемая, которая непрерывно испускает элементы, и мне нужно обрабатывать каждый из них (функция процесса занимает некоторое время).Таким образом, в то же время при обработке элемента, если другой элемент излучает с тем же значением, я могу игнорировать его, поскольку он уже выполняется.Но как только текущий элемент обработан (и вызван onNext).и позже, если придет тот же запрос, я должен это разрешить.Я использовал оператор distinctUntildChanged
, но я вижу, что он не позволит, если текущий элемент совпадает с последним, даже если последний элемент завершил обработку и вызвал onNext
.
IУ меня есть образец, чтобы продемонстрировать проблему
У меня есть класс
class User {
String id;
String name;
public User(String id, String name) {
this.id = id;
this.name = name;
}
@Override
public boolean equals(Object obj) {
User obj1 = (User) obj;
return id.equals(obj1.id);
}
@Override
public String toString() {
return name;
}
}
И наблюдаемый (субъект)
Subject<User> mSubject = PublishSubject.create();
И моя цепочка наблюдаемых -
mSubject
.doOnNext(i -> Log.d(TAG, "emitted: " + i))
.observeOn(Schedulers.io())
.distinctUntilChanged()
.map(user -> {
Log.d(TAG, "processing " + user);
Thread.sleep(5000); // processing takes 5 seconds
return user;
}).subscribe(user -> Log.d(TAG, "onNext: " + user.name));
И я выдаю такие значения
for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
mSubject.onNext(new User(String.valueOf(1), "User " + i)); // all `User`s have same id
}
В результате получается
emitted: User 0
processing User 0
emitted: User 1
emitted: User 2
emitted: User 3
emitted: User 4
onNext: User 0
emitted: User 5
emitted: User 6
emitted: User 7
emitted: User 8
emitted: User 9
emitted: User 10
emitted: User 11
emitted: User 12
emitted: User 13
emitted: User 14
emitted: User 15
emitted: User 16
emitted: User 17
emitted: User 18
emitted: User 19
Все объекты здесь одинаковые (метод проверки идентификаторов равен id).Как вы можете видеть, это заняло user0
в первый раз и заняло 5 секунд для обработки, в течение этого времени я могу игнорировать входящие элементы, но после этого onNext: User 0
я должен разрешить тот же запрос пользователя, но distinctUntilChanged
не позволяеттак как его кодирование последнего значения равно тому же пользователю, как я могу это сделать?Надеюсь, мой вопрос ясен.