MongoDB Change Stream читает данные, которые являются старыми обновлениями - PullRequest
0 голосов
/ 26 июня 2018

Я создал приложение, которое читает поток изменений монго для обновлений и вставок, а затем мы предпринимаем действия с измененными данными. Ниже приведен фрагмент моего кода

private void listenChangeStream() {
        Runnable changeListener = new Runnable() {
            @Override
            public void run() {
                String fullDoc = null;
                String updateInfo = null;

                while (cursor.hasNext()) {
                    try {
                        ChangeStreamDocument<Document> next = cursor.next();
                        String id = next.getDocumentKey().getString("id").getValue();
                        LOGGER.debug("Change Stream recived:{}", next);
                        String operationType = next.getOperationType().getValue();
                        if ("insert".equals(operationType) || "replace".equals(operationType)) {
                               fullDoc = next.getFullDocument().toString();
                            if (fullDoc.contains("image_info")) {
                                kafkaProducer
                                        .pushOfflineProcessingData(new DataPackets(Id, OfflineProcessType.IMAGE));
                            }
                        } else if ("update".equals(operationType)) {
                               updateInfo = next.getUpdateDescription().toString();
                            if (updateInfo.contains("image_info"))
                                kafkaProducer
                                        .pushOfflineProcessingData(new DataPackets(Id, OfflineProcessType.IMAGE));
                        } 

                    } catch (Exception ex) {
                        LOGGER.info("Exception has come in cahnge listener::", ex);
                    }
                }

            }
        };
        executor = Executors.newFixedThreadPool(1);
        executor.execute(changeListener);

    }

private MongoCursor<ChangeStreamDocument<Document>> getCursor(MongoCollection<Document> supplierCollection, List<Bson> pipeline) {
        MongoCursor<ChangeStreamDocument<Document>> cursor;     
             cursor = supplierCollection.watch(pipeline).iterator();        
        return cursor;
    }

Это работает нормально. Проблема, с которой я сталкиваюсь, заключается в том, что когда я запускаю сервер, поток изменений начинает читать старые зафиксированные изменения. Который я не хочу. Я хочу после развертывания только новые обновления должны быть выбраны этим.

Кто-нибудь может подсказать, как это сделать?

1 Ответ

0 голосов
/ 12 июля 2018

Кто-нибудь может подсказать, как это сделать?

В MongoDB v4.0 с драйвером MongoDB Java v3.8 вы можете указать для параметра startAtOperationTime значение MongoClient.watch () .

Поток изменений будет содержать только изменения, которые произошли после указанной временной отметки. Любая команда, запущенная на сервере MongoDB, вернет время работы, которое можно использовать в качестве значения параметра. Значением по умолчанию является время операции, полученное от сервера до создания потока изменений.

Кроме того, вы также можете кэшировать последнее увиденное _id из уведомления о потоке изменений. Это resumeToken, который можно передать методу resumeAfter() для возобновления уведомлений, начиная с операции, указанной в resumeToken. Например:

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();

См. Также Потоки изменений MongoDB

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...