Вы можете позвонить dispose
на Disposable
, возвращаемом connect
, но учтите, что это может оставить наблюдателей в активном состоянии, поскольку они не получат дальнейшие события.В любом случае вы также должны утилизировать их.
ConnectableObservable co = source.publish();
Disposable d = co.connect();
Disposable d1 = co.subscribe();
Disposable d2 = co.subscribe();
d.dispose();
d1.dispose();
d2.dispose();
Если вы хотите избежать зависания, используйте takeUntil
с темой:
PublishSubject terminate = PublishSubject.create();
ConnectableObservable co = source.publish();
Disposable d = co.connect();
terminate.doOnComplete(d::dispose).subscribe();
Observable observable = co.takeUntil(terminate);
observable .subscribe(System.out::println, Throwable::printStackTrace,
() -> System.out.println("Done 1"));
observable .subscribe(System.out::println, Throwable::printStackTrace,
() -> System.out.println("Done 2"));
terminate.onComplete();