Я создал приложение, которое читает поток изменений монго для обновлений и вставок, а затем мы предпринимаем действия с измененными данными. Ниже приведен фрагмент моего кода
private void listenChangeStream() {
Runnable changeListener = new Runnable() {
@Override
public void run() {
String fullDoc = null;
String updateInfo = null;
while (cursor.hasNext()) {
try {
ChangeStreamDocument<Document> next = cursor.next();
String id = next.getDocumentKey().getString("id").getValue();
LOGGER.debug("Change Stream recived:{}", next);
String operationType = next.getOperationType().getValue();
if ("insert".equals(operationType) || "replace".equals(operationType)) {
fullDoc = next.getFullDocument().toString();
if (fullDoc.contains("image_info")) {
kafkaProducer
.pushOfflineProcessingData(new DataPackets(Id, OfflineProcessType.IMAGE));
}
} else if ("update".equals(operationType)) {
updateInfo = next.getUpdateDescription().toString();
if (updateInfo.contains("image_info"))
kafkaProducer
.pushOfflineProcessingData(new DataPackets(Id, OfflineProcessType.IMAGE));
}
} catch (Exception ex) {
LOGGER.info("Exception has come in cahnge listener::", ex);
}
}
}
};
executor = Executors.newFixedThreadPool(1);
executor.execute(changeListener);
}
private MongoCursor<ChangeStreamDocument<Document>> getCursor(MongoCollection<Document> supplierCollection, List<Bson> pipeline) {
MongoCursor<ChangeStreamDocument<Document>> cursor;
cursor = supplierCollection.watch(pipeline).iterator();
return cursor;
}
Это работает нормально.
Проблема, с которой я сталкиваюсь, заключается в том, что когда я запускаю сервер, поток изменений начинает читать старые зафиксированные изменения. Который я не хочу. Я хочу после развертывания только новые обновления должны быть выбраны этим.
Кто-нибудь может подсказать, как это сделать?