Мне нужно прослушать изменения в моей локальной базе данных (комнате), поэтому я использую flowable для получения списка моей таблицы и прослушивания изменений в любых строках моей таблицы, однако это не возвращает и прослушивает изменения в моей базе данных.
@Query("SELECT * FROM messages_chat")
Flowable<List<MessagesChat>> getMessagesChat();
public Flowable<List<MessagesModel>> getMessages() {
return appDataBase.messageDao().getMessagesChat().map(new Function<List<MessagesChat>, List<MessagesModel>>() {
@Override
public List<MessagesModel> apply(List<MessagesChat> messagesChats) throws Exception {
return ChatMapper.transformDataBaseToMessageModel(messagesChats);
}
});
}
public class GetMessagesChat extends UseCaseFlowable<List<MessagesModel>, Void> {
private final ChatRepository repository;
@Inject
public GetMessagesChat(ChatRepository repository) {
this.repository = repository;
}
@Override
Flowable<List<MessagesModel>> buildUseCaseObservable(Void aVoid) {
return repository.getMessages();
}
}
abstract class UseCaseFlowable<T, Params> {
private final CompositeDisposable disposables;
UseCaseFlowable() {
this.disposables = new CompositeDisposable();
}
/**
* Builds an {@link Flowable} which will be used when executing the current {@link UseCaseFlowable}.
*/
abstract Flowable<T> buildUseCaseObservable(Params params);
public void execute(DisposableSubscriber<T> observer, Params params) {
final Flowable<T> flowable = this.buildUseCaseObservable(params)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
addDisposable(flowable.subscribeWith(observer));
}
/**
* Dispose from current {@link CompositeDisposable}.
*/
public void dispose() {
if (!disposables.isDisposed()) {
disposables.dispose();
}
}
/**
* Dispose from current {@link CompositeDisposable}.
*/
private void addDisposable(Disposable disposable) {
disposables.add(disposable);
}
}
@Override
public void getMessages(String idChat) {
interactorGetMessagesChat.execute(new ObserverMessagesChat(), null);
}
public class DefaultFlowable<T> extends DisposableSubscriber<T> {
@Override
public void onNext(T t) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
}
private final class ObserverMessagesChat extends DefaultFlowable<List<MessagesModel>> {
@Override
public void onNext(List<MessagesModel> messages) {
super.onNext(messages);
ChatPresenter.this.listMessages(messages);
}
@Override
public void onComplete() {
super.onComplete();
}
@Override
public void onError(Throwable exception) {
super.onError(exception);
}
}
Но не возвращать никаких изменений при обновлении таблицы.