Spring WebFlux + MongoDB: настраиваемый курсор и агрегация - PullRequest
0 голосов
/ 27 апреля 2020

Я новичок в WebFlux и MongoDB. Я пытаюсь использовать агрегацию в закрытой коллекции с настраиваемым курсором, но ничего не получается. Я хотел бы выполнить этот запрос mongoDB:

db.structures.aggregate(
   [
     {
        $match: {
            id: { $in: [8244, 8052]}
        }    
     },  
     { $sort: { id: 1, lastUpdate: 1} },
     {
       $group:
         {
           _id: {id: "$id"},
           lastUpdate: { $last: "$lastUpdate" }
         }
     }
   ]
)

ReactiveMongoOperations дает мне возможность "хвост" или "агрегация".

Я могу выполнить агрегацию:

    MatchOperation match = new MatchOperation(Criteria.where("id").in(8244, 8052));
    GroupOperation group = Aggregation.group("id", "$id").last("$lastUpdate").as("lastUpdate");
    Aggregation aggregate = Aggregation.newAggregation(match, group);

    Flux<Structure> result = mongoOperation.aggregate(aggregate,
            "structures", Structure.class);

Или хвостовой курсор

    Query query = new Query();
    query.addCriteria(Criteria.where("id").in(8244, 8052));
    Flux<Structure> result = mongoOperation.tail(query, Structure.class);

Возможно ли это? Хвост и агрегация вместе?

Использование агрегации было способом, который я нашел, чтобы получить только последний вставленный документ для каждого идентификатора.

Без агрегации я получаю:

запрос без агрегации

С агрегацией:

запрос с агрегацией

Ткс заранее

1 Ответ

0 голосов
/ 28 апреля 2020

Запрос настраиваемого курсора создает Flux, который никогда не завершается (никогда не генерирует событие onComplete) и что Flux генерирует записи при их вставке в базу данных. Из-за этого факта я думаю, что агрегация базы данных с настраиваемым курсором не допускает агрегации.

Таким образом, агрегация не имеет смысла, потому что для каждой вновь вставленной записи агрегирование необходимо пересчитать , Технически вы можете выполнить текущую агрегацию, в которой для каждой возвращенной записи вы вычисляете требуемую запись агрегирования и отправляете ее в нисходящем направлении.

Одним из возможных решений было бы выполнение агрегации программным способом для возвращенного "бесконечного" Flux:

mongoOperation.tail(query, Structure.class)
  .groupBy(Structure::id) // create independent Fluxes based on id
  .flatMap(groupedFlux -> 
    groupedFlux.scan((result, nextStructure) -> { // scan is like reduce but emits intermediate results
    log.info("intermediate result is: {}", result);
    if (result.getLastUpdate() > nextStructure.getLastUpdate()) {
        return result;
    } else {
        result.setLastUpdate(nextStructure.getLastUpdate());
        return result;
    }
}));

С другой стороны, вам, вероятно, следует пересмотреть ваш вариант использования и то, что вам нужно для выполнения sh здесь, и посмотреть, следует ли использовать что-то, отличное от ограниченного сбора, или, возможно, часть агрегирования избыточна (т.е. если новейшие вставленные записи всегда имеют свойство lastUpdate больше, чем предыдущая запись).

...