У меня проблема с попыткой реактивного понимания того, как должны работать одновременные операции с одной и той же наблюдаемой.
Сценарий следующий:
У меня есть список пользователей и кнопка удаления.
Каждый раз, когда я нажимаю remove
, я звоню в API: UsersApi.removeUser
. Можно удалить несколько пользователей одновременно. Это означает, что несколько UsersApi.removeUser
происходят одновременно.
После каждого UsersApi.removeUser
мне нужно сделать UsersApi.refreshUser
звонок
Итак, с точки зрения псевдокода, что я делаю, нажимая кнопку удаления, выглядит следующим образом:
Ведущий:
public Observable<User> removeUser(int userId) {
return UsersApi.removeUser(userId)
.flatMap(user -> UsersApi.refreshUser(userId));
}
Фрагмент:
public void removeUser() {
presenter.removeUser(userId)
.subscribe(user -> {
//remove user from ui
// update number of total users
})
}
Проблема этого подхода заключается в том, что из-за асинхронного характера удаления (допускается многократное удаление) я не могу гарантировать, что то, что достигает подписки, является самым последним. Подписка будет достигнута дважды, по одному для каждого удаления, и информация о пользователе может быть не обновлена или не последняя. Это имеет смысл?
Что я хочу, чтобы произошло:
- Параллельное / Одновременное удаление вызовов с использованием реактивного подхода (запускается несколькими нажатиями на удаление от пользователя)
- После завершения вызова на удаление начните следующий вызов на удаление
Редактировать: я хотел бы знать, как сделать / если возможно сделать решение, которое я сделал (см. Edit2), используя операторы Rx.
Edit2: Мое решение для этого состояло в том, чтобы поставить пользовательские операции в очередь (в данном случае remove
) и создать, используя PublishSubject, после завершения вызова UsersApi.refreshUser(userId)
.
Итак, в основном я сделал (псевдокод):
private final PublishSubject<UserOperation> userOperationObs;
private final ConcurrentLinkedQueue<UserOperation> pendingOperations;
private boolean executingOperation;
private void emitUserOperation(final UserOperation operation) {
if (!executingOperation) {
executingOperation = true;
userOperationObs.onNext(operation);
} else {
executingOperation.add(operation);
}
}
public Observable<User> removeUser(UserOperation operation) {
return UsersApi.removeUser(operation.getUserId)
.switchMap(user -> UsersApi.refreshUser(operation.getUserId))
.doOnNext(user -> {
executingOperation = false;
final UserOperation nextOperation = pendingOperations.poll();
if (nextOperation != null) {
userOperationObs.onNext(operation);
}
};
}