Проблема во втором примере не в многоадресной рассылке.Это в операторе сбора и повторения при.
Подумайте о повторении, когда вы подключаетесь к методу onComplete как «ловушка».Этот «крючок» будет перехватывать метод onComplete и «переопределять» его, поэтому он не будет вызываться, за исключением первого раза, когда эта наблюдаемая завершена перед запуском процесса повторения.
Без вызова метода onComplete оператор сбора не знает, сколько элементов он должен собрать.Таким образом, вам придется разобраться с логикой того, как собирать предметы и где хранить их вне потока, но это будет обходной путь.
Вот пример для этого:
List<String> test = new ArrayList<>();
final String[] currentString = {""};
final int[] i = {0};
ConnectableObservable<Integer> integerObservable =
Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++))
.doOnComplete(() -> {
test.add(currentString[0]);
currentString[0] = "";
})
.repeatWhen(o -> {
return o.concatMap(v ->
Observable.timer(3, TimeUnit.SECONDS)
.doOnComplete(() -> {
test.add(currentString[0]);
currentString[0] = "";
}));
})
.publish();
integerObservable
.observeOn(Schedulers.newThread())
.subscribe(System.out::println);
integerObservable
.observeOn(Schedulers.newThread())
.map((sa) -> {
currentString[0] = currentString[0] + sa;
System.out.println(currentString[0]);
return sa;
})
.subscribe();
В этом примере мы используем метод onComplete наблюдаемой, которую мы храним таймер для сброса нашего состояния.В этом примере не учитывается вариант, когда потребители потребляют данные медленнее, чем задержка повторения (это приведет к переполнению результата из одной цепочки данных в другую).Я бы предложил использовать другие способы обработки повторяющейся части, например:
final int [] i = {0};
ConnectableObservable<Integer> integerObservable =
Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++)).publish();
Observable b = integerObservable.observeOn(Schedulers.newThread());
Observable a = integerObservable
.observeOn(Schedulers.newThread())
.collect(StringBuilder::new, (sb, x) -> sb.append(x).append(","))
.map(StringBuilder::toString)
.toObservable();
Observable
.interval(1, TimeUnit.SECONDS)
.doOnSubscribe((d) -> {
a.subscribe(System.out::println);
b.subscribe(System.out::println);
integerObservable.connect();
})
.doOnNext((d) -> {
a.subscribe(System.out::println);
b.subscribe(System.out::println);
integerObservable.connect();
})
.doOnComplete(() -> {})
.subscribe();
Первое подключение - это первый раз выполненияи onNext вызывается каждую 1 секунду, и он заново запускает весь конвейер с начала
Надеюсь, это поможет.