Очистить летные элементы в потоке при перезапуске вышестоящего издателя в Spring Project Reactor? - PullRequest
2 голосов
/ 09 марта 2020

У меня есть издатель, который выполняет длительный и большой запрос к MongoDB и возвращает данные в Flux . Объекты, помеченные в базе данных как «обработанные», будут отфильтрованы, а затем объекты будут буферизованы и переданы оператору concatMap (чтобы все буферизованные элементы ≤ обрабатывались до обработки элементов в следующем буфере ). Это выглядит примерно так:

Flux<Entity> entitiesFromMongoDb = myMongoRepository.executeLargeQuery();
entitiesFromMongoDb.filter(entity -> !entity.isProcessed())
                   .buffer(10)
                   .concatMap(bufferedEntityList ->  
                                    Flux.fromIterable(bufferedEntityList)
                                        .flatMap(makeExternalCall)
                                        .then()));

Где makeExternalCall вызывает сторонний удаленный сервер , а устанавливает объект на processed после выполнения вызова. В большинстве случаев это работает нормально, но когда удаленный сервер действительно работает медленно или имеет ошибку, makeExternalCall будет повторять (с экспоненциальным откатом) операцию на удаленном сервере. В некоторых случаях может пройти довольно много времени, прежде чем все 10 внешних вызовов будут обработаны. Фактически, это может занять так много времени, что издатель myMongoRepository.executeLargeQuery() перезапускается и запрос выполняется снова. Теперь мы столкнулись с проблемой, которую я попытаюсь описать здесь:

  1. Объект A считывается из базы данных (то есть возвращается в потоке, генерируемом myMongoRepository.executeLargeQuery()). Он еще не помечен как «обработанный», что означает, что entity.isProcessed() вернет false и будет сохранен в потоке.
  2. Внешний сервер действительно работает медленно или не работает, так что makeExternalCall принудительно чтобы повторить операцию до того, как сущность A будет помечена как "обработанная" в БД.
  3. myMongoRepository.executeLargeQuery() перезапускается и запрос выполняется снова.
  4. Объект A снова считывается из базы данных. Но проблема в том, что уже есть другой экземпляр объекта A в полете, так как он еще не был помечен как «обработанный» предыдущим вызовом myMongoRepository.executeLargeQuery().
  5. Это означает, что будет вызван makeExternalCall дважды для объекта A, что не является оптимальным!

Я мог бы сделать дополнительный запрос к БД и проверить состояние processed для каждого объекта в методе makeExternalCall, но это приведет к дополнительная нагрузка (поскольку для каждого объекта необходим дополнительный запрос) в БД, что не является оптимальным.

Итак, мой вопрос:

Есть ли способ как-то «перезапустить» весь поток и, таким образом, очистить промежуточные буферы (т.е. удалить объект A, находящийся в полете, из текущего потока) когда запрос MongoDB, вызванный myMongoRepository.executeLargeQuery(), перезапускается / перезапускается? Или есть лучший способ справиться с этим?

Я использую Spring Boot 2.2.4.RELEASE, проектный реактор 3.3.2.RELEASE и spring-boot-starter-data-mongodb-reactive 2.2.4.RELEASE.

1 Ответ

1 голос
/ 15 марта 2020

Не уверен, полностью ли я понял проблему. Но пытаясь ответить так, как это звучит интересно.

Поскольку вам нужно знать о запросах, которые уже обрабатываются makeExternalCall, можете ли вы поддерживать локальный / локальный кеш, который содержит сущности, которые находятся в процессе обработан?

Set<Entity> inProgress = new HashSet<>(); 

Flux<Entity> entitiesFromMongoDb = myMongoRepository.executeLargeQuery();

entitiesFromMongoDb.filter(entity -> !entity.isProcessed())
                   .buffer(10)
                   .map(bufferedEntityList -> {  // remove the inprogress requests to avoid redundant processing
                        bufferedEntityList.removeIf(inProgress::contains);
                        return bufferedEntityList;
                   })
                   .concatMap(bufferedEntityList ->  
                                    inProgress.addAll(bufferedEntityList);
                                    Flux.fromIterable(bufferedEntityList)
                                        .flatMap(makeExternalCall) //assuming once processed, it emits the entity object
                                        .map(entity -> {   //clear growing set
                                            inProgress.remove(entity);
                                            return entity;
                                        })
                                        .then()));

Этот подход не является хорошим решением, когда вам необходимо масштабировать приложение по горизонтали. В этом случае вместо поддержки локального кэша вы можете go для внешнего сервера кэша, например redis.

...