Возможно, мне что-то не хватает (добавлен комментарий о том, что моя версия интерфейса Publisher
отличается).
Но ... вот как я бы сделал что-то подобное концептуально.
I Я собираюсь упростить интерфейсы для работы с Integers
:
// remote resource
interface Resource {
ConnectableObservable<Integer> fetch();
}
// cache
interface Cache {
Completable save(Integer data);
}
// client response connection
interface Connection {
Completable send(Integer data);
}
Я бы использовал Observable::publish
, чтобы создать ConnectableObservable
и установить sh две подписки:
@Test
public void testProxy()
{
// Override schedulers:
TestScheduler s = new TestScheduler();
RxJavaPlugins.setIoSchedulerHandler(
scheduler -> s );
RxJavaPlugins.setComputationSchedulerHandler(
scheduler -> s );
// Mock interfaces:
Resource resource = () -> Observable.range( 1, 100 )
.publish();
Cache cache = data -> Completable.fromObservable( Observable.just( data )
.delay( 100, TimeUnit.MILLISECONDS )
.doOnNext( __ -> System.out.println( String.format( "Caching %d", data ))));
Connection connection = data -> Completable.fromObservable( Observable.just( data )
.delay( 500, TimeUnit.MILLISECONDS )
.doOnNext( __ -> System.out.println( String.format( "Sending %d", data ))));
// Subscribe to resource:
ConnectableObservable<Integer> observable = resource.fetch();
observable
.observeOn( Schedulers.io() )
.concatMapCompletable( data -> connection.send( data ))
.subscribe();
observable
.observeOn( Schedulers.computation() )
.concatMapCompletable( data -> cache.save( data ))
.subscribe();
observable.connect();
// Simulate passage of time:
s.advanceTimeBy( 10, TimeUnit.SECONDS );
}
Вывод:
Caching 1
Caching 2
Caching 3
Caching 4
Sending 1
Caching 5
Caching 6
Caching 7
Caching 8
Caching 9
Sending 2
Caching 10
. . .
Обновление
Судя по вашим комментариям, похоже, что в вашем случае важно соблюдать противодавление.
Допустим, у вас есть Publisher
где-то, где учитывается противодавление , вы можете преобразовать его в Flowable
следующим образом:
Flowable<T> flowable = Flowable.fromPublisher( publisher );
Если у вас есть Flowable
, вы можете разрешить использование нескольких подписчиков, не беспокоясь о том, что каждому подписчику придется запрашивать значения из Publisher
( или любой из подписчиков от пропуска каких-либо событий при создании подписок). Вы делаете это, вызывая flowable.publish()
, чтобы создать ConnectableFlowable
.
введите описание изображения здесь
ConnectableFlowable<T> flowable = Flowable.fromPublisher( publisher ).publish();
out.send(flowable); // calls flowable.subscribe()
cache.save(flowable); // calls flowable.subscribe()
flowable.connect(); // begins emitting values