Используя реактивный шаблон mon go, я пытаюсь прослушать потоки изменений mon go для 6 коллекций только для операций вставки. Мой код для начала прослушивания потоков изменений приведен ниже:
Инициирование потоков изменений:
//Loop for 6 collections
reactiveMongoTemplate.changeStream(collectionName, changeStreamOptions, MongoChangeStreamEvent.class)
.doOnNext(changeStreamEvent -> {
System.out.println("In doOnNext: Received a new change stream event ");
})
.map(changeStreamEvent -> {
//save resume token
})
.onErrorResume(throwable -> {
System.out.println("In onErrorResume of change stream event :: " + throwable.getMessage());
return null;
})
.subscribe();
При каждой новой записи я сохраняю токен возобновления в одной из моих коллекций:
//Document
public class MongoChangeStreamEvent implements Serializable {
@Id
private String id;
private String resumeToken;
//other fields and getters and setters
}
Используется изменение параметров потока для инициации. Если в коллекции существует токен возобновления, используйте его. Иначе, возобновите прямо сейчас.
ChangeStreamOptions changeStreamOptions;
final BsonDocument resumeToken = //getLatestResumeToken, first entry after sorting
//MongoChangeStreamEvent collection in descending order based on
//resumeToken
if (!resumeToken.isEmpty()) {
changeStreamOptions = ChangeStreamOptions.builder()
.filter(newAggregation(match(where(OPERATION_TYPE).is(INSERT_OPERATION))))
.resumeAfter(resumeToken)
.build();
} else {
changeStreamOptions = ChangeStreamOptions.builder()
.filter(newAggregation(match(where(OPERATION_TYPE).is(INSERT_OPERATION))))
.resumeAt(Instant.now())
.build();
}
Вышла ошибка:
Suppressed: com.mongodb.MongoQueryException: Query failed with error code 280 and error message
'cannot resume stream; the resume token was not found. {_data:
"825EAFCA12000000162B022C0100296E5A100440284C0343E64ADEB43522FC0552CC1446645F696400645EAFCA12B51E93000716B9300004"}'
До сих пор у меня был другой опыт работы с этой функциональностью. Приложение, запускаемое без существующего токена возобновления, всегда работало должным образом. Другие результаты, когда я перезапустил приложение с существующим токеном возобновления, приведены ниже:
- Иногда оно отлично работало для всех 6.
- Сразу после перезапуска оно инициировало несколько оставшиеся не удалось.
- Сразу после перезапуска инициация не выдавала никакой ошибки. Но при вставке документа в наблюдаемую коллекцию немногие / все ошиблись.
Я понимаю, что поток изменений зависит от истории операций, как указано в документации , Что было более удивительным для меня, так это то, что токен резюме, который был выведен с ошибкой, не соответствовал ни одному из моих существующих токенов резюме и не присутствовал в оплогах.
Я убедился, что токен резюме, отправленный в шаблон реагирующего пн go, всегда был правильным.
- Пожалуйста, дайте мне знать, если я что-то упустил.
- Кроме того, мне бы хотелось узнать, как справиться с ошибкой одного / нескольких из множества запущенных потоков изменений.