Есть ли более простой способ адаптировать стандартную службу наблюдателей / слушателей, используя Rx Java? - PullRequest
0 голосов
/ 09 апреля 2020

Я возился с переносом интерфейса слушателя старого стиля, используя Rx Java. То, что я придумала, похоже, работает, но использование Observable.using выглядит немного неловко.

Требования:

  1. Только одна подписка на идентификатор для базового service.
  2. Последнее значение для данного идентификатора должно кэшироваться и передаваться новым подписчикам.
  3. Мы должны отписаться от базовой службы, если ничего не прослушивается для идентификатора.

Есть ли лучший способ? Вот что у меня есть.

static class MockServiceRXAdapterImpl1 implements MockServiceRXAdapter {
    PublishSubject<MockResponse> mockResponseObservable = PublishSubject.create();
    MockService mockService = new MockService(mockResponse -> mockResponseObservable.onNext(mockResponse));
    final ConcurrentMap<String, Observable<String>> subscriptionMap = new ConcurrentHashMap<>();

    public Observable<String> getObservable(String id) {
        return Observable.using(() -> subscriptionMap.computeIfAbsent(
                id,
                key -> mockResponseObservable.filter(mockResponse -> mockResponse.id.equals(id))
                        .doOnSubscribe(disposable -> mockService.subscribe(id))
                        .doOnDispose(() -> {
                            mockService.unsubscribe(id);
                            subscriptionMap.remove(id);
                        })
                        .map(mockResponse -> mockResponse.value)
                        .replay(1)
                        .refCount()),
                observable -> observable,
                observable -> {
                }
        );
    }
}

Ответы [ 2 ]

1 голос
/ 09 апреля 2020

Вы можете использовать Observable.create

Таким образом, код может выглядеть следующим образом

final Map<String, Observable<String>> subscriptionMap = new HashMap<>();
MockService mockService = new MockService();

public Observable<String> getObservable(String id) {
    log.info("looking for root observable");
    if (subscriptionMap.containsKey(id)) {
        log.info("found root observable");
        return subscriptionMap.get(id);
    } else {
        synchronized (subscriptionMap) {
            if (!subscriptionMap.containsKey(id)) {
                log.info("creating new root observable");
                final Observable<String> responseObservable = Observable.create(emitter -> {
                    MockServiceListener listener = emitter::onNext;
                    mockService.addListener(listener);
                    emitter.setCancellable(() -> {
                        mockServices.removeListener(listener);
                        mockService.unsubscribe(id);
                        synchronized (subscriptionMap) {
                            subscriptionMap.remove(id);
                        }
                    });
                    mockService.subscribe(id);
                })
                        .filter(mockResponse -> mockResponse.id.equals(id))
                        .map(mockResponse -> mockResponse.value)
                        .replay(1)
                        .refCount();

                subscriptionMap.put(id, responseObservable);
            } else {
                log.info("Another thread created the observable for us");
            }
            return subscriptionMap.get(id);
        }
    }
}
0 голосов
/ 11 апреля 2020

Я думаю, что заставил его работать, используя .groupBy(...).

В моем случае Response.getValue() возвращает int, но вы понимаете:

class Adapter
{
    Subject<Response> msgSubject;
    ThirdPartyService service;
    Map<String, Observable<Integer>> observables;
    Observable<GroupedObservable<String, Response>> groupedObservables;

    public Adapter()
    {
        msgSubject = PublishSubject.<Response>create().toSerialized();
        service = new MockThirdPartyService( msgSubject::onNext );
        groupedObservables = msgSubject.groupBy( Response::getId );
        observables = Collections.synchronizedMap( new HashMap<>() );
    }

    public Observable<Integer> getObservable( String id )
    {
        return observables.computeIfAbsent( id, this::doCreateObservable );
    }

    private Observable<Integer> doCreateObservable( String id )
    {
        service.subscribe( id );

        return groupedObservables
                .filter( group -> group.getKey().equals( id ))
                .doOnDispose( () -> {
                    synchronized ( observables )
                    {
                        service.unsubscribe( id );
                        observables.remove( id );
                    }
                } )
                .concatMap( Functions.identity() )
                .map( Response::getValue )
                .replay( 1 )
                .refCount();
    }
}
...