Я довольно новичок в работе с реактивными потоками и столкнулся со следующей проблемой, которая затрудняет мне решение. Цель состоит в том, чтобы получить ряд документов из базы данных MongoDB. Для каждого документа получите метаданные из БД и извлеките файл из БД (пока не в примере кода). Затем нам нужно загрузить все эти данные в s3 (объединяя все три элемента). Тем не менее, я застрял в объединении разных издателей, не испортив порядок элементов.
Publisher<Document> p = versionCollection.find();
ConnectableFlowable<Document> version = Flowable.fromPublisher(p).publish();
Observable<GridFSFile> gridFS = version
.map(extractID())
.flatMap(loadGridFSFile()).toObservable();
Observable c = version.toObservable()
.zipWith(gridFS, (Document v, GridFSFile f) -> {
// if I check here if both messages belong together, the order sometimes is messed up
return v;
});
version.connect();
Итак, в основном я пытаюсь опубликовать события по двум разным путям, один путь получает метаданные из GridFS изатем я снова пытаюсь объединить оба пути (чтобы получить доступ к исходному документу вместе с метаданными). Тем не менее, я заметил, что иногда события архивируются в другом порядке (возможно, поскольку запросы к db иногда занимают разное время).
Путь выполнения для каждого события должен быть таким:
v
|
/ | \
v query db query db
\ | /
upload aggregate
of all 3 elements
По сути, проблема в том, что при моем подходе я получаю результаты более раннего или более позднего запроса для другого элемента v. Мне, вероятно, нужно как-то убедиться, что путь выполнения синхронизирован между всеми тремя путями. для одного элемента ввода за раз, но я не знаю, как.
РЕДАКТИРОВАТЬ
Я наконец нашел подход, который, кажется, делает то, что нужно. Тем не менее, кажется немного странным, что кажется сложным обрабатывать вещи параллельно и обеспечивать их синхронизацию
Publisher<Document> p = versionCollection.find();
Observable<Document> version = Observable.fromPublisher(p);
version.flatMap(v -> {
ConnectableObservable<Document> connectableObservable = Observable.just(v).replay();
Observable o = connectableObservable
.map(extractAudioID())
.flatMap(loadGridFSFile(audioBucket));
Observable o3 = connectableObservable.zipWith(o, (Document a, GridFSFile f) -> {
// now everything seems to stay in order here
// and we can combine both results
});
o3.subscribe();
o.subscribe();
Disposable a = connectableObservable.connect();
return connectableObservable;
}, 1).blockingSubscribe();
static Function<ObjectId, ObservableSource<GridFSFile>> loadGridFSFile(GridFSBucket audioBucket) {
return id -> Observable.fromPublisher(audioBucket.find(new Document("_id", id)).first());
}