Текучие не вернуть изменения - PullRequest
0 голосов
/ 03 октября 2019

Мне нужно прослушать изменения в моей локальной базе данных (комнате), поэтому я использую flowable для получения списка моей таблицы и прослушивания изменений в любых строках моей таблицы, однако это не возвращает и прослушивает изменения в моей базе данных.

@Query("SELECT * FROM messages_chat")
Flowable<List<MessagesChat>> getMessagesChat();



public Flowable<List<MessagesModel>> getMessages() {

    return appDataBase.messageDao().getMessagesChat().map(new Function<List<MessagesChat>, List<MessagesModel>>() {
        @Override
        public List<MessagesModel> apply(List<MessagesChat> messagesChats) throws Exception {
            return ChatMapper.transformDataBaseToMessageModel(messagesChats);
        }
    });

}



public class GetMessagesChat extends UseCaseFlowable<List<MessagesModel>, Void> {

    private final ChatRepository repository;

    @Inject
    public GetMessagesChat(ChatRepository repository) {
        this.repository = repository;
    }


    @Override
    Flowable<List<MessagesModel>> buildUseCaseObservable(Void aVoid) {
        return repository.getMessages();
    }
}


abstract class UseCaseFlowable<T, Params> {

    private final CompositeDisposable disposables;

    UseCaseFlowable() {
        this.disposables = new CompositeDisposable();
    }

    /**
     * Builds an {@link Flowable} which will be used when executing the current {@link UseCaseFlowable}.
     */
    abstract Flowable<T> buildUseCaseObservable(Params params);


    public void execute(DisposableSubscriber<T> observer, Params params) {

        final Flowable<T> flowable = this.buildUseCaseObservable(params)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());

        addDisposable(flowable.subscribeWith(observer));
    }


    /**
     * Dispose from current {@link CompositeDisposable}.
     */
    public void dispose() {
        if (!disposables.isDisposed()) {
            disposables.dispose();
        }
    }

    /**
     * Dispose from current {@link CompositeDisposable}.
     */
    private void addDisposable(Disposable disposable) {
        disposables.add(disposable);
    }
}


@Override
public void getMessages(String idChat) {
    interactorGetMessagesChat.execute(new ObserverMessagesChat(), null);
}



public class DefaultFlowable<T> extends DisposableSubscriber<T> {



    @Override
    public void onNext(T t) {

    }

    @Override
    public void onError(Throwable t) {

    }

    @Override
    public void onComplete() {

    }
}




private final class ObserverMessagesChat extends DefaultFlowable<List<MessagesModel>> {




    @Override
    public void onNext(List<MessagesModel> messages) {
        super.onNext(messages);
        ChatPresenter.this.listMessages(messages);
    }

    @Override
    public void onComplete() {
        super.onComplete();
    }

    @Override
    public void onError(Throwable exception) {
        super.onError(exception);
    }
}

Но не возвращать никаких изменений при обновлении таблицы.

...