Я использую concatMap
для обработки потока элементов по одному с длительной операцией.В какой-то момент мне нужно «прервать» эту длительную операцию, но только для текущего элемента:
@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();
Thread.sleep(10000);
System.out.println("interrupt NOW");
// Now I need to interrupt whichever longRunningOperation in
// progress, but I don't want to interrupt the whole stream.
// In other words, I want to force it to move onto the next
// integer.
}
Observable<String> doLongRunningOperation(final Integer integer) {
return Observable
.just("\tStart working on " + integer,
"\tStill working on " + integer,
"\tAlmost done working on " + integer)
// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("\tfinally for " + integer));
}
Я попытался решить эту проблему, сохранив одноразовый «внутренний» поток и утилизировав его вправильное времяНо это не сработало.Внутренний поток удаляется, но concatMap
никогда не переходит к элементу обработки 3. Тест просто зависает (поскольку внешняя наблюдаемая никогда не завершает / не завершает / не удаляет либо)
Disposable disposable = Disposables.empty();
@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();
Thread.sleep(10000);
System.out.println("interrupt NOW");
disposable.dispose();
test.awaitTerminalEvent();
System.out.println("terminal event");
}
Observable<String> doLongRunningOperation(final Integer integer) {
return Observable
.just("\tStart working on " + integer,
"\tStill working on " + integer,
"\tAlmost done working on " + integer)
// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("\tfinally for " + integer))
.doOnSubscribe(disposable -> {
// save disposable so we can "interrupt" later
System.out.println("Saving disposable for " + integer);
Example.this.disposable = disposable;
});
}
Даже если это сработалоэто казалось немного взломанным, полагаясь на побочный эффект.Каков наилучший способ сделать это?