RxJava: обрабатывать параллельно и объединять результаты (не путая порядок) - PullRequest
1 голос
/ 06 ноября 2019

Я довольно новичок в работе с реактивными потоками и столкнулся со следующей проблемой, которая затрудняет мне решение. Цель состоит в том, чтобы получить ряд документов из базы данных MongoDB. Для каждого документа получите метаданные из БД и извлеките файл из БД (пока не в примере кода). Затем нам нужно загрузить все эти данные в s3 (объединяя все три элемента). Тем не менее, я застрял в объединении разных издателей, не испортив порядок элементов.

Publisher<Document> p = versionCollection.find();
ConnectableFlowable<Document> version = Flowable.fromPublisher(p).publish();

Observable<GridFSFile> gridFS = version
        .map(extractID())
        .flatMap(loadGridFSFile()).toObservable();

Observable c = version.toObservable()
        .zipWith(gridFS, (Document v, GridFSFile f) -> {

            // if I check here if both messages belong together, the order sometimes is messed up  
            return v;
        });
version.connect();

Итак, в основном я пытаюсь опубликовать события по двум разным путям, один путь получает метаданные из GridFS изатем я снова пытаюсь объединить оба пути (чтобы получить доступ к исходному документу вместе с метаданными). Тем не менее, я заметил, что иногда события архивируются в другом порядке (возможно, поскольку запросы к db иногда занимают разное время).

Путь выполнения для каждого события должен быть таким:

         v
         |
  /      |      \
v    query db   query db
  \      |      /
   upload aggregate
   of all 3 elements

По сути, проблема в том, что при моем подходе я получаю результаты более раннего или более позднего запроса для другого элемента v. Мне, вероятно, нужно как-то убедиться, что путь выполнения синхронизирован между всеми тремя путями. для одного элемента ввода за раз, но я не знаю, как.

РЕДАКТИРОВАТЬ

Я наконец нашел подход, который, кажется, делает то, что нужно. Тем не менее, кажется немного странным, что кажется сложным обрабатывать вещи параллельно и обеспечивать их синхронизацию

Publisher<Document> p = versionCollection.find();

Observable<Document> version = Observable.fromPublisher(p);
version.flatMap(v -> {

    ConnectableObservable<Document> connectableObservable = Observable.just(v).replay();

    Observable o = connectableObservable
        .map(extractAudioID())
        .flatMap(loadGridFSFile(audioBucket));

    Observable o3 = connectableObservable.zipWith(o, (Document a, GridFSFile f) -> {
            // now everything seems to stay in order here
            // and we can combine both results
    });
    o3.subscribe();
    o.subscribe();

    Disposable a = connectableObservable.connect();
    return connectableObservable;
}, 1).blockingSubscribe();


static Function<ObjectId, ObservableSource<GridFSFile>> loadGridFSFile(GridFSBucket audioBucket) {
    return id -> Observable.fromPublisher(audioBucket.find(new Document("_id", id)).first());
}

1 Ответ

1 голос
/ 08 ноября 2019

Несколько вещей, которые, кажется, решают проблему:

И теперь этот фрагмент кода выглядит намного более разумным:

ConnectableFlowable<Document> version = Flowable.fromPublisher(p).replay();

Flowable<GridFSFile> file = version
        .map(extractID())
        .concatMap(loadGridFSFile(audioBucket));

Flowable<GridFSDownloadStream> data = version
        .map(extractID())
        .map(loadGridFSData(audioBucket));

Flowable c = Flowable.zip(version, file, data, (v, f, d) -> {
    // so far everything seems to stay in order
    return v;
});

version.connect();

c.subscribe();
...