Запрос настраиваемого курсора создает Flux
, который никогда не завершается (никогда не генерирует событие onComplete
) и что Flux
генерирует записи при их вставке в базу данных. Из-за этого факта я думаю, что агрегация базы данных с настраиваемым курсором не допускает агрегации.
Таким образом, агрегация не имеет смысла, потому что для каждой вновь вставленной записи агрегирование необходимо пересчитать , Технически вы можете выполнить текущую агрегацию, в которой для каждой возвращенной записи вы вычисляете требуемую запись агрегирования и отправляете ее в нисходящем направлении.
Одним из возможных решений было бы выполнение агрегации программным способом для возвращенного "бесконечного" Flux
:
mongoOperation.tail(query, Structure.class)
.groupBy(Structure::id) // create independent Fluxes based on id
.flatMap(groupedFlux ->
groupedFlux.scan((result, nextStructure) -> { // scan is like reduce but emits intermediate results
log.info("intermediate result is: {}", result);
if (result.getLastUpdate() > nextStructure.getLastUpdate()) {
return result;
} else {
result.setLastUpdate(nextStructure.getLastUpdate());
return result;
}
}));
С другой стороны, вам, вероятно, следует пересмотреть ваш вариант использования и то, что вам нужно для выполнения sh здесь, и посмотреть, следует ли использовать что-то, отличное от ограниченного сбора, или, возможно, часть агрегирования избыточна (т.е. если новейшие вставленные записи всегда имеют свойство lastUpdate
больше, чем предыдущая запись).