У меня вопрос по поводу преобразования rx Publisher в livingata, я программировал проект, использующий в качестве примера firebase rx livingata.
Проблема заключается в том, что при преобразовании издателя в LiveData с библиотекой LiveDataReactiveStreams, когда подписка неактивна, преобразователь очищает подписку, а при активном состоянии создает новую подписку.
Код:
/**
* Defines a {@link LiveData} object that wraps a {@link Publisher}.
*
* <p>
* When the LiveData becomes active, it subscribes to the emissions from the Publisher.
*
* <p>
* When the LiveData becomes inactive, the subscription is cleared.
* LiveData holds the last value emitted by the Publisher when the LiveData was active.
* <p>
* Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is
* added, it will automatically notify with the last value held in LiveData,
* which might not be the last value emitted by the Publisher.
*
* <p>
* Note that LiveData does NOT handle errors and it expects that errors are treated as states
* in the data that's held. In case of an error being emitted by the publisher, an error will
* be propagated to the main thread and the app will crash.
*
* @param <T> The type of data hold by this instance.
*/
private static class PublisherLiveData<T> extends LiveData<T> {
private final Publisher<T> mPublisher;
final AtomicReference<LiveDataSubscriber> mSubscriber;
PublisherLiveData(@NonNull Publisher<T> publisher) {
mPublisher = publisher;
mSubscriber = new AtomicReference<>();
}
@Override
protected void onActive() { // Problem
super.onActive();
LiveDataSubscriber s = new LiveDataSubscriber();
mSubscriber.set(s);
mPublisher.subscribe(s);
}
@Override
protected void onInactive() { // Problem
super.onInactive();
LiveDataSubscriber s = mSubscriber.getAndSet(null);
if (s != null) {
s.cancelSubscription();
}
}
final class LiveDataSubscriber extends AtomicReference<Subscription>
implements Subscriber<T> {
@Override
public void onSubscribe(Subscription s) {
if (compareAndSet(null, s)) {
s.request(Long.MAX_VALUE);
} else {
s.cancel();
}
}
@Override
public void onNext(T item) {
postValue(item);
}
@Override
public void onError(final Throwable ex) {
mSubscriber.compareAndSet(this, null);
ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
// Errors should be handled upstream, so propagate as a crash.
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
}
});
}
@Override
public void onComplete() {
mSubscriber.compareAndSet(this, null);
}
public void cancelSubscription() {
Subscription s = get();
if (s != null) {
s.cancel();
}
}
}
}
В этом классе я добавляю boolen для решения, должен ли он подписываться или нет
class CustomPublisherLiveData<T> extends LiveData<T> {
private boolean mResubscribe;
private final Publisher<T> mPublisher;
private final AtomicReference<LiveDataSubscriber> mSubscriber;
CustomPublisherLiveData(@NonNull Publisher<T> publisher, boolean resubscribe) {
mPublisher = publisher;
mResubscribe = resubscribe;
mSubscriber = new AtomicReference<>();
}
@Override
protected void onActive() {
super.onActive();
if (mResubscribe){
LiveDataSubscriber s = new LiveDataSubscriber();
mSubscriber.set(s);
mPublisher.subscribe(s);
}
}
@Override
protected void onInactive() {
super.onInactive();
LiveDataSubscriber s = mSubscriber.getAndSet(null);
if (s != null && mResubscribe) {
s.cancelSubscription();
}
}
final class LiveDataSubscriber extends AtomicReference<Subscription>
implements Subscriber<T> {
@Override
public void onSubscribe(Subscription s) {
if (compareAndSet(null, s)) {
s.request(Long.MAX_VALUE);
} else {
s.cancel();
}
}
@Override
public void onNext(T item) {
postValue(item);
}
@Override
public void onError(final Throwable ex) {
mSubscriber.compareAndSet(this, null);
//TODO silence error I have to make the error go up to the main thread
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
@Override
public void run() {
// Errors should be handled upstream, so propagate as a crash.
throw new RuntimeException("LiveData does not handle errors. Errors from "
+ "publishers should be handled upstream and propagated as "
+ "state", ex);
}
});
}
@Override
public void onComplete() {
mSubscriber.compareAndSet(this, null);
}
public void cancelSubscription() {
Subscription s = get();
if (s != null) {
s.cancel();
}
}
}
Это правильно?Я должен изменить «преобразование liveata», или я что-то упускаю.