Проблема LiveData + Rx (горячая наблюдаемая Firebase) - PullRequest
0 голосов
/ 28 декабря 2018

У меня вопрос по поводу преобразования rx Publisher в livingata, я программировал проект, использующий в качестве примера firebase rx livingata.

Проблема заключается в том, что при преобразовании издателя в LiveData с библиотекой LiveDataReactiveStreams, когда подписка неактивна, преобразователь очищает подписку, а при активном состоянии создает новую подписку.

Код:

   /**
 * Defines a {@link LiveData} object that wraps a {@link Publisher}.
 *
 * <p>
 * When the LiveData becomes active, it subscribes to the emissions from the Publisher.
 *
 * <p>
 * When the LiveData becomes inactive, the subscription is cleared.
 * LiveData holds the last value emitted by the Publisher when the LiveData was active.
 * <p>
 * Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is
 * added, it will automatically notify with the last value held in LiveData,
 * which might not be the last value emitted by the Publisher.
 *
 * <p>
 * Note that LiveData does NOT handle errors and it expects that errors are treated as states
 * in the data that's held. In case of an error being emitted by the publisher, an error will
 * be propagated to the main thread and the app will crash.
 *
 * @param <T> The type of data hold by this instance.
 */
private static class PublisherLiveData<T> extends LiveData<T> {
    private final Publisher<T> mPublisher;
    final AtomicReference<LiveDataSubscriber> mSubscriber;

    PublisherLiveData(@NonNull Publisher<T> publisher) {
        mPublisher = publisher;
        mSubscriber = new AtomicReference<>();
    }

    @Override
    protected void onActive() { // Problem
        super.onActive();
        LiveDataSubscriber s = new LiveDataSubscriber();
        mSubscriber.set(s);
        mPublisher.subscribe(s);
    }

    @Override
    protected void onInactive() { // Problem
        super.onInactive();
        LiveDataSubscriber s = mSubscriber.getAndSet(null);
        if (s != null) {
            s.cancelSubscription();
        }
    }

    final class LiveDataSubscriber extends AtomicReference<Subscription>
            implements Subscriber<T> {

        @Override
        public void onSubscribe(Subscription s) {
            if (compareAndSet(null, s)) {
                s.request(Long.MAX_VALUE);
            } else {
                s.cancel();
            }
        }

        @Override
        public void onNext(T item) {
            postValue(item);
        }

        @Override
        public void onError(final Throwable ex) {
            mSubscriber.compareAndSet(this, null);

            ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
                @Override
                public void run() {
                    // Errors should be handled upstream, so propagate as a crash.
                    throw new RuntimeException("LiveData does not handle errors. Errors from "
                            + "publishers should be handled upstream and propagated as "
                            + "state", ex);
                }
            });
        }

        @Override
        public void onComplete() {
            mSubscriber.compareAndSet(this, null);
        }

        public void cancelSubscription() {
            Subscription s = get();
            if (s != null) {
                s.cancel();
            }
        }
    }
}

В этом классе я добавляю boolen для решения, должен ли он подписываться или нет

class CustomPublisherLiveData<T> extends LiveData<T> {

private boolean mResubscribe;
private final Publisher<T> mPublisher;
private final AtomicReference<LiveDataSubscriber> mSubscriber;

CustomPublisherLiveData(@NonNull Publisher<T> publisher, boolean resubscribe) {
    mPublisher = publisher;
    mResubscribe = resubscribe;
    mSubscriber = new AtomicReference<>();
}

@Override
protected void onActive() {
    super.onActive();
    if (mResubscribe){
        LiveDataSubscriber s = new LiveDataSubscriber();
        mSubscriber.set(s);
        mPublisher.subscribe(s);
    }
}

@Override
protected void onInactive() {
    super.onInactive();
    LiveDataSubscriber s = mSubscriber.getAndSet(null);
    if (s != null && mResubscribe) {
        s.cancelSubscription();
    }
}

final class LiveDataSubscriber extends AtomicReference<Subscription>
        implements Subscriber<T> {

    @Override
    public void onSubscribe(Subscription s) {
        if (compareAndSet(null, s)) {
            s.request(Long.MAX_VALUE);
        } else {
            s.cancel();
        }
    }

    @Override
    public void onNext(T item) {
        postValue(item);
    }

    @Override
    public void onError(final Throwable ex) {
        mSubscriber.compareAndSet(this, null);

        //TODO silence error I have to make the error go up to the main thread
        throw new RuntimeException("LiveData does not handle errors. Errors from "
                + "publishers should be handled upstream and propagated as "
                + "state", ex);

        ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
            @Override
            public void run() {
                // Errors should be handled upstream, so propagate as a crash.
                throw new RuntimeException("LiveData does not handle errors. Errors from "
                        + "publishers should be handled upstream and propagated as "
                        + "state", ex);
            }
        });
    }

    @Override
    public void onComplete() {
        mSubscriber.compareAndSet(this, null);
    }

    public void cancelSubscription() {
        Subscription s = get();
        if (s != null) {
            s.cancel();
        }
    }
}

Это правильно?Я должен изменить «преобразование liveata», или я что-то упускаю.

...