Rx Java повторить с задержкой только часть потока - PullRequest
0 голосов
/ 20 февраля 2020

У меня есть поток Rx, который выполняет два действия последовательно, когда происходит определенное событие:

  1. отправляет SMS на данный набор номеров - который возвращает Single<Holster>
  2. сохранить событие в локальной БД - которая возвращает Completable

вот мой код

private void saveBluetoothAlarm(@NonNull Alarm alarm, int type) {
    disposable.add( dbManager.getHolsterDAO().getCurrentHolster()
        .map(holsters -> holsters.get(0))
        .observeOn(AndroidSchedulers.mainThread())
        .flatMap(holster -> sendSmsToAll(holster, alarm.type))
        .observeOn(Schedulers.io())
        .flatMapCompletable(holster -> {
            switch (alarm.type) {
                case StatisticsEventType.EXTRACTION:
                    if (something)
                        return Completable.complete();
                    else
                        return Completable.fromAction(() -> dbManager.getAlarmDAO().insert(alarm))
                                .andThen(saveAlarmOnServer(holster.getId(), alarm));
                case StatisticsEventType.MOVEMENT:
                    if (somethingMore)
                        return Completable.complete();
                    else
                        return Completable.fromAction(() -> dbManager.getAlarmDAO().insert(alarm))
                                .andThen(saveAlarmOnServer(holster.getId(), alarm));
            }
            return Completable.complete();
        })
        .subscribe(() -> {}, Timber::e)
    );
}

все работает, теперь мне нужно повторить первое действие sendSmsToAll(holster, alarm.type) определенное количество раз, каждое из которых задерживается на определенное количество секунд, эти настройки определены в моем Holster объекте.

Я попытался отредактировать flatMap(), как показано ниже, получив sendSmsToAll() return Holster:

.flatMapObservable(holster -> Observable.just(sendSmsToAll(holster, alarm.type))
            .repeat(holster.sms_settings.repetitions_count)
            .delaySubscription(holster.sms_settings.interval, TimeUnit.SECONDS)
)

но смс отправляется только один раз, я даже пробовал много других "комбинаций" (в основном потому, что я нуб с Rx Java), но ничего не работает.

1 Ответ

0 голосов
/ 03 марта 2020

Вы пробовали что-то подобное:

.flatMapObservable(holster -> Observable.zip(Observable.defer(() -> sendSmsToAll(holster, alarm.type)),
                                             Flowable.timer(holster.sms_settings.interval, SECONDS),
                                             (x, y) -> x)
                                        .repeat(holster.sms_settings.repetitions_count))

?

...