Rx Java несколько потребителей одного издателя - PullRequest
4 голосов
/ 16 июня 2020

Пишу какой-то промежуточный HTTP-прокси с кешем. Рабочий процесс:

  1. Клиент запрашивает этот прокси-сервер для ресурса
  2. Если resurce существует в кеше, прокси возвращает его
  3. Если ресурс не был найден, удаленная загрузка прокси ресурс и возвращается пользователю. Прокси сохраняет этот ресурс в кеш при загрузке данных.

Мои интерфейсы имеют поток Publisher<ByteBuffer> для удаленного ресурса, кеш, который принимает Publisher<ByteBuffer> для сохранения, и соединение клиентов, которое принимает Publisher<ByteBuffer> как ответ:

// remote resource
interface Resource {
  Publisher<ByteBuffer> fetch();
}

// cache
interface Cache {
  Completable save(Publisher<ByteBuffer> data);
}

// clien response connection
interface Connection {
  Completable send(Publisher<ByteBuffer> data);
}

Моя проблема в том, что мне нужно лениво сохранять этот поток байтовых буферов в кеш при отправке ответа клиенту, поэтому клиент должен отвечать за запрос ByteByffer фрагменты из удаленного ресурса, не кеш .

Я пытался использовать метод Publisher::cache, но для меня это не лучший выбор, потому что он сохраняет все полученные данные в памяти, это неприемлемо, поскольку размер кешированных данных может составлять несколько ГБ.

В качестве обходного пути я создал Subject, заполненный следующими элементами, полученными от Resource:

private final Cache cache;
private final Connection out;

Completable proxy(Resource res) {
  Subject<ByteBuffer> mirror = PublishSUbject.create();
  return Completable.mergeArray(
    out.send(res.fetch().doOnNext(mirror::onNext),
    cache.save(mirror.toFlowable(BackpressureStrategy.BUFFER))
  );
}

Это Возможно ли повторное использование одного и того же Publisher без кэширования элементов в памяти, и где только один подписчик будет отвечать за запросы элементов у издателя?

1 Ответ

1 голос
/ 25 июня 2020

Возможно, мне что-то не хватает (добавлен комментарий о том, что моя версия интерфейса Publisher отличается).

Но ... вот как я бы сделал что-то подобное концептуально.

I Я собираюсь упростить интерфейсы для работы с Integers:

// remote resource
interface Resource {
  ConnectableObservable<Integer> fetch();
}

// cache
interface Cache {
  Completable save(Integer data);
}

// client response connection
interface Connection {
  Completable send(Integer data);
}

Я бы использовал Observable::publish, чтобы создать ConnectableObservable и установить sh две подписки:

@Test
public void testProxy()
{
    // Override schedulers:
    TestScheduler s = new TestScheduler();
    
    RxJavaPlugins.setIoSchedulerHandler(
            scheduler -> s );
    RxJavaPlugins.setComputationSchedulerHandler(
            scheduler -> s );
    
    // Mock interfaces:
    Resource resource = () -> Observable.range( 1, 100 )
            .publish();
    
    Cache cache = data -> Completable.fromObservable( Observable.just( data )
                .delay( 100, TimeUnit.MILLISECONDS )
                .doOnNext( __ -> System.out.println( String.format( "Caching %d", data ))));
    
    Connection connection = data -> Completable.fromObservable( Observable.just( data )
                .delay( 500, TimeUnit.MILLISECONDS )
                .doOnNext( __ -> System.out.println( String.format( "Sending %d", data ))));
    
    // Subscribe to resource:
    ConnectableObservable<Integer> observable = resource.fetch();
    
    observable
        .observeOn( Schedulers.io() )
        .concatMapCompletable( data -> connection.send( data ))
        .subscribe();
    
    observable
        .observeOn( Schedulers.computation() )
        .concatMapCompletable( data -> cache.save( data ))
        .subscribe();
    
    observable.connect();
    
    // Simulate passage of time:
    s.advanceTimeBy( 10, TimeUnit.SECONDS );
}

Вывод:

Caching 1
Caching 2
Caching 3
Caching 4
Sending 1
Caching 5
Caching 6
Caching 7
Caching 8
Caching 9
Sending 2
Caching 10
. . . 

Обновление

Судя по вашим комментариям, похоже, что в вашем случае важно соблюдать противодавление.

Допустим, у вас есть Publisher где-то, где учитывается противодавление , вы можете преобразовать его в Flowable следующим образом:

Flowable<T> flowable = Flowable.fromPublisher( publisher );

Если у вас есть Flowable, вы можете разрешить использование нескольких подписчиков, не беспокоясь о том, что каждому подписчику придется запрашивать значения из Publisher ( или любой из подписчиков от пропуска каких-либо событий при создании подписок). Вы делаете это, вызывая flowable.publish(), чтобы создать ConnectableFlowable.

введите описание изображения здесь

ConnectableFlowable<T> flowable = Flowable.fromPublisher( publisher ).publish();
out.send(flowable);   // calls flowable.subscribe()
cache.save(flowable); // calls flowable.subscribe()
flowable.connect();   // begins emitting values
...