Наконец, с помощью @akarnokd я нашел решение этой проблемы.Вот код:
Subscriber sub1=new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(1);
System.out.println("onSubscribe done");
}
@Override
public void onNext(Integer t) {
System.out.println("Sub 1 Processing: "+t);
sleep(1000);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) { }
@Override
public void onComplete() { }
};
Subscriber sub2=new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(1);
System.out.println("onSubscribe done");
}
@Override
public void onNext(Integer t) {
System.out.println("Sub2 Processing: "+t);
sleep(500);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) { }
@Override
public void onComplete() { }
};
// ***** Magic happens here!! *****
DispatchWorkProcessor<Integer> dwp = DispatchWorkProcessor.create(Schedulers.io());
Flowable.range(1, 20).subscribe(dwp);
dwp.subscribe(sub1);
dwp.subscribe(sub2);
// ********************************
sleep(Integer.MAX_VALUE);
}
Хитрость заключалась в использовании request(1)
для реализации подхода извлечения и DispatchWorkProcessor
для использования элементов один раз из потока.Несмотря на то, что скорость одного потребителя вдвое больше, чем на выходе, ожидается:
onSubscribe done
onSubscribe done
Sub 1 Processing: 1
Sub2 Processing: 2
Sub2 Processing: 3
Sub 1 Processing: 4
Sub2 Processing: 5
Sub2 Processing: 6
Sub 1 Processing: 7
Sub2 Processing: 8
Sub2 Processing: 9
Sub 1 Processing: 10
Sub2 Processing: 11
Sub2 Processing: 12
Sub 1 Processing: 13
Sub2 Processing: 14
Sub2 Processing: 15
Sub 1 Processing: 16
Sub2 Processing: 17
Sub2 Processing: 18
Sub 1 Processing: 19
Sub2 Processing: 20
Все элементы потока обрабатываются один раз, а Sub 2 обрабатывает двойное число элементов, что приятно!