Прервать одну наблюдаемую в concatMap - PullRequest
6 голосов
/ 07 апреля 2019

Я использую concatMap для обработки потока элементов по одному с длительной операцией.В какой-то момент мне нужно «прервать» эту длительную операцию, но только для текущего элемента:

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    // Now I need to interrupt whichever longRunningOperation in
    // progress, but I don't want to interrupt the whole stream.
    // In other words, I want to force it to move onto the next
    // integer.
}

Observable<String> doLongRunningOperation(final Integer integer) {
    return Observable
            .just("\tStart working on " + integer,
                    "\tStill working on " + integer,
                    "\tAlmost done working on " + integer)

            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("\tfinally for " + integer));
}

Я попытался решить эту проблему, сохранив одноразовый «внутренний» поток и утилизировав его вправильное времяНо это не сработало.Внутренний поток удаляется, но concatMap никогда не переходит к элементу обработки 3. Тест просто зависает (поскольку внешняя наблюдаемая никогда не завершает / не завершает / не удаляет либо)

Disposable disposable = Disposables.empty();

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    disposable.dispose();

    test.awaitTerminalEvent();
    System.out.println("terminal event");
}

Observable<String> doLongRunningOperation(final Integer integer) {
    return Observable
            .just("\tStart working on " + integer,
                    "\tStill working on " + integer,
                    "\tAlmost done working on " + integer)

            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("\tfinally for " + integer))
            .doOnSubscribe(disposable -> {
                // save disposable so we can "interrupt" later
                System.out.println("Saving disposable for " + integer);
                Example.this.disposable = disposable;
            });
}

Даже если это сработалоэто казалось немного взломанным, полагаясь на побочный эффект.Каков наилучший способ сделать это?

Ответы [ 2 ]

0 голосов
/ 27 июня 2019

Вам просто нужно реализовать Observable типа семафоров, который может вызывать события остановки.

Реализуйте этот семафор, например:

PublishSubject<Boolean> semaphore = PublishSubject.create()

Изменить Observable, созданный в .concatMap (), следующим образом:

.concatMap(string ->
                        Observable.just(string)
                                .delay(2, TimeUnit.SECONDS)
                                .takeUntil(semaphore)) //Here is the stop trigger

Когда вам нужно отменить текущий элемент обработки - просто позвоните:

   semaphore.onNext(true);
0 голосов
/ 14 мая 2019

У меня был почти такой же вопрос, как и Как отменить отдельный сетевой запрос в Retrofit с помощью RxJava? .Я могу использовать PublishSubject, чтобы "прервать"

private PublishSubject interrupter;

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    interrupter.onComplete();

    test.awaitTerminalEvent();
    System.out.println("terminal event");
}

Observable<String> doLongRunningOperation(final Integer integer) {
    interrupter = PublishSubject.create();

    return Observable
            .just("Start working on " + integer,
                    "Still working on " + integer,
                    "Almost done working on " + integer)
            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("Finally for " + integer))
            .takeUntil(interrupter);
}
...