У меня есть издатель, который выполняет длительный и большой запрос к MongoDB и возвращает данные в Flux . Объекты, помеченные в базе данных как «обработанные», будут отфильтрованы, а затем объекты будут буферизованы и переданы оператору concatMap (чтобы все буферизованные элементы ≤ обрабатывались до обработки элементов в следующем буфере ). Это выглядит примерно так:
Flux<Entity> entitiesFromMongoDb = myMongoRepository.executeLargeQuery();
entitiesFromMongoDb.filter(entity -> !entity.isProcessed())
.buffer(10)
.concatMap(bufferedEntityList ->
Flux.fromIterable(bufferedEntityList)
.flatMap(makeExternalCall)
.then()));
Где makeExternalCall
вызывает сторонний удаленный сервер , а устанавливает объект на processed
после выполнения вызова. В большинстве случаев это работает нормально, но когда удаленный сервер действительно работает медленно или имеет ошибку, makeExternalCall
будет повторять (с экспоненциальным откатом) операцию на удаленном сервере. В некоторых случаях может пройти довольно много времени, прежде чем все 10 внешних вызовов будут обработаны. Фактически, это может занять так много времени, что издатель myMongoRepository.executeLargeQuery()
перезапускается и запрос выполняется снова. Теперь мы столкнулись с проблемой, которую я попытаюсь описать здесь:
- Объект A считывается из базы данных (то есть возвращается в потоке, генерируемом
myMongoRepository.executeLargeQuery()
). Он еще не помечен как «обработанный», что означает, что entity.isProcessed()
вернет false
и будет сохранен в потоке. - Внешний сервер действительно работает медленно или не работает, так что
makeExternalCall
принудительно чтобы повторить операцию до того, как сущность A будет помечена как "обработанная" в БД. myMongoRepository.executeLargeQuery()
перезапускается и запрос выполняется снова. - Объект A снова считывается из базы данных. Но проблема в том, что уже есть другой экземпляр объекта A в полете, так как он еще не был помечен как «обработанный» предыдущим вызовом
myMongoRepository.executeLargeQuery()
. - Это означает, что будет вызван
makeExternalCall
дважды для объекта A, что не является оптимальным!
Я мог бы сделать дополнительный запрос к БД и проверить состояние processed
для каждого объекта в методе makeExternalCall
, но это приведет к дополнительная нагрузка (поскольку для каждого объекта необходим дополнительный запрос) в БД, что не является оптимальным.
Итак, мой вопрос:
Есть ли способ как-то «перезапустить» весь поток и, таким образом, очистить промежуточные буферы (т.е. удалить объект A, находящийся в полете, из текущего потока) когда запрос MongoDB, вызванный myMongoRepository.executeLargeQuery()
, перезапускается / перезапускается? Или есть лучший способ справиться с этим?
Я использую Spring Boot 2.2.4.RELEASE
, проектный реактор 3.3.2.RELEASE
и spring-boot-starter-data-mongodb-reactive
2.2.4.RELEASE
.