RxJava одновременные операции удаления - PullRequest
0 голосов
/ 04 мая 2018

У меня проблема с попыткой реактивного понимания того, как должны работать одновременные операции с одной и той же наблюдаемой.

Сценарий следующий:

У меня есть список пользователей и кнопка удаления. Каждый раз, когда я нажимаю remove, я звоню в API: UsersApi.removeUser. Можно удалить несколько пользователей одновременно. Это означает, что несколько UsersApi.removeUser происходят одновременно.

После каждого UsersApi.removeUser мне нужно сделать UsersApi.refreshUser звонок

Итак, с точки зрения псевдокода, что я делаю, нажимая кнопку удаления, выглядит следующим образом:

Ведущий:

public Observable<User> removeUser(int userId) {
        return UsersApi.removeUser(userId)
                .flatMap(user -> UsersApi.refreshUser(userId));
    }

Фрагмент:

public void removeUser() {
   presenter.removeUser(userId)
     .subscribe(user -> {
        //remove user from ui
        // update number of total users
   })
}

Проблема этого подхода заключается в том, что из-за асинхронного характера удаления (допускается многократное удаление) я не могу гарантировать, что то, что достигает подписки, является самым последним. Подписка будет достигнута дважды, по одному для каждого удаления, и информация о пользователе может быть не обновлена ​​или не последняя. Это имеет смысл?

Что я хочу, чтобы произошло:

  1. Параллельное / Одновременное удаление вызовов с использованием реактивного подхода (запускается несколькими нажатиями на удаление от пользователя)
  2. После завершения вызова на удаление начните следующий вызов на удаление

Редактировать: я хотел бы знать, как сделать / если возможно сделать решение, которое я сделал (см. Edit2), используя операторы Rx.

Edit2: Мое решение для этого состояло в том, чтобы поставить пользовательские операции в очередь (в данном случае remove) и создать, используя PublishSubject, после завершения вызова UsersApi.refreshUser(userId).

Итак, в основном я сделал (псевдокод):

private final PublishSubject<UserOperation> userOperationObs;
private final ConcurrentLinkedQueue<UserOperation> pendingOperations;
private boolean executingOperation;

private void emitUserOperation(final UserOperation operation) {
      if (!executingOperation) {
            executingOperation = true;
            userOperationObs.onNext(operation);
        } else {
            executingOperation.add(operation);
        }
    }

public Observable<User> removeUser(UserOperation operation) {
        return UsersApi.removeUser(operation.getUserId)
                .switchMap(user -> UsersApi.refreshUser(operation.getUserId))
                .doOnNext(user -> {
                   executingOperation = false;
                   final UserOperation nextOperation = pendingOperations.poll();
                   if (nextOperation != null) {
                       userOperationObs.onNext(operation);
                   }
};
    }

1 Ответ

0 голосов
/ 11 июля 2018

Вы можете превратить ваш пользовательский интерфейс в Observable (например, используя RxBinding ). После этого вы можете использовать оператор concatMap для выполнения вызова API, поэтому он начнет следующий сетевой вызов после завершения текущего вызова API.

// emit clicks as stream
Observable<?> clicks = RxView.clicks(removeView)

// listen clicks then perform network call in sequence
clicks.concatMap(ignored -> usersApi.refreshUser(userId))
...