Я возился с переносом интерфейса слушателя старого стиля, используя Rx Java. То, что я придумала, похоже, работает, но использование Observable.using выглядит немного неловко.
Требования:
- Только одна подписка на идентификатор для базового service.
- Последнее значение для данного идентификатора должно кэшироваться и передаваться новым подписчикам.
- Мы должны отписаться от базовой службы, если ничего не прослушивается для идентификатора.
Есть ли лучший способ? Вот что у меня есть.
static class MockServiceRXAdapterImpl1 implements MockServiceRXAdapter {
PublishSubject<MockResponse> mockResponseObservable = PublishSubject.create();
MockService mockService = new MockService(mockResponse -> mockResponseObservable.onNext(mockResponse));
final ConcurrentMap<String, Observable<String>> subscriptionMap = new ConcurrentHashMap<>();
public Observable<String> getObservable(String id) {
return Observable.using(() -> subscriptionMap.computeIfAbsent(
id,
key -> mockResponseObservable.filter(mockResponse -> mockResponse.id.equals(id))
.doOnSubscribe(disposable -> mockService.subscribe(id))
.doOnDispose(() -> {
mockService.unsubscribe(id);
subscriptionMap.remove(id);
})
.map(mockResponse -> mockResponse.value)
.replay(1)
.refCount()),
observable -> observable,
observable -> {
}
);
}
}