Драйвер Java Async MongoDB и наблюдаемые RxJava2 - PullRequest
0 голосов
/ 02 ноября 2018

Я изучаю реактивное программирование с использованием RxJava2, и у меня возник вопрос по поводу его использования с драйвером асинхронной базы данных, например MongoDB.

Если я использую блокирующий драйвер MongoDB для получения коллекции, подход будет такой:

public class MyDao{
   ...
   public Document getFirstDocument(String collectionName){
      MongoCollection<Document> collection = database.getCollection(collectionName);
      return collection.find().first();
   }
}



public class MyService {
   ...
   public Observable<Document> getFirstOf(String collectionName){
       return Observable.just(myDao.getFirstDocument(collectionName)); 
   }
}

Вместо этого, работая с асинхронным драйвером MongoDB, мой тип возврата для операции чтения - это void (а не Document или Future) с методом обратного вызова внутри, например:

collection.find().first(
        (document, throwable) -> {
            myService.myCallback(document);
        }
);

Итак, как я могу передать мои наблюдаемые документы в MyService?

public class MyDao{
   ...
   public void getFirstDocument(String collectionName){
      MongoCollection<Document> collection = database.getCollection(collectionName);
      collection.find().first(
        (document, throwable) -> {
            //SOME SORT OF CALLBACK
        }
     );
   }
}



public class MyService {
   ...
   public Observable<Document> getFirstOf(String collectionName){
       return ??????? 
   }
}

1 Ответ

0 голосов
/ 02 ноября 2018

Когда вы используете Observable.just() в

public Observable<Document> getFirstOf(String collectionName){
    return Observable.just(myDao.getFirstDocument(collectionName)); 
}

равно следующему коду

public Observable<Document> getFirstOf(String collectionName){
    Document doc = myDao.getFirstDocument(collectionName);
    return Observable.just(doc); 
}

Вы можете заметить, что это не async код, и запрос к БД выполняется в вызывающем потоке. Чтобы сделать этот код async, вам нужно переписать его вот так

public Observable<Document> getFirstOf(String collectionName){
    return Observable.fromCallable(() -> myDao.getFirstDocument(collectionName)); 
}

Если вы используете async драйвер MongoDB и хотите обернуть его в Observable, вы можете написать таким образом

public Observable<Document> getFirstDocument(String collectionName) {
    return Observable.create(emitter -> {
        MongoCollection<Document> collection = database.getCollection(collectionName);
        collection.find().first((document, throwable) -> {
            if(document != null) {
                emitter.onNext(document);
                emitter.onComplete();
            } else if(throwable != null) {
                emitter.onError(throwable);
            }
        });
    });
}
...