Reactive mon go change streams возобновляется с недействительным маркером возобновления - PullRequest
0 голосов
/ 04 мая 2020

Используя реактивный шаблон mon go, я пытаюсь прослушать потоки изменений mon go для 6 коллекций только для операций вставки. Мой код для начала прослушивания потоков изменений приведен ниже:

Инициирование потоков изменений:

//Loop for 6 collections
reactiveMongoTemplate.changeStream(collectionName, changeStreamOptions, MongoChangeStreamEvent.class)
          .doOnNext(changeStreamEvent -> {          
            System.out.println("In doOnNext: Received a new change stream event ");         
          })
          .map(changeStreamEvent -> {    
            //save resume token
          })
          .onErrorResume(throwable -> {
            System.out.println("In onErrorResume of change stream event :: " + throwable.getMessage());
            return null;
          })
          .subscribe();

При каждой новой записи я сохраняю токен возобновления в одной из моих коллекций:

//Document
public class MongoChangeStreamEvent implements Serializable {
  @Id
  private String id;
  private String resumeToken;
  //other fields and getters and setters
}

Используется изменение параметров потока для инициации. Если в коллекции существует токен возобновления, используйте его. Иначе, возобновите прямо сейчас.


ChangeStreamOptions changeStreamOptions;
    final BsonDocument resumeToken = //getLatestResumeToken, first entry after sorting 
                                     //MongoChangeStreamEvent collection in descending order based on 
                                     //resumeToken

    if (!resumeToken.isEmpty()) {
      changeStreamOptions = ChangeStreamOptions.builder()
          .filter(newAggregation(match(where(OPERATION_TYPE).is(INSERT_OPERATION))))
          .resumeAfter(resumeToken)
          .build();
    } else {
      changeStreamOptions = ChangeStreamOptions.builder()
          .filter(newAggregation(match(where(OPERATION_TYPE).is(INSERT_OPERATION))))        
          .resumeAt(Instant.now())
          .build();
    }

Вышла ошибка:

Suppressed: com.mongodb.MongoQueryException: Query failed with error code 280 and error message 
'cannot resume stream; the resume token was not found. {_data: 
"825EAFCA12000000162B022C0100296E5A100440284C0343E64ADEB43522FC0552CC1446645F696400645EAFCA12B51E93000716B9300004"}'

До сих пор у меня был другой опыт работы с этой функциональностью. Приложение, запускаемое без существующего токена возобновления, всегда работало должным образом. Другие результаты, когда я перезапустил приложение с существующим токеном возобновления, приведены ниже:

  1. Иногда оно отлично работало для всех 6.
  2. Сразу после перезапуска оно инициировало несколько оставшиеся не удалось.
  3. Сразу после перезапуска инициация не выдавала никакой ошибки. Но при вставке документа в наблюдаемую коллекцию немногие / все ошиблись.

Я понимаю, что поток изменений зависит от истории операций, как указано в документации , Что было более удивительным для меня, так это то, что токен резюме, который был выведен с ошибкой, не соответствовал ни одному из моих существующих токенов резюме и не присутствовал в оплогах.

Я убедился, что токен резюме, отправленный в шаблон реагирующего пн go, всегда был правильным.

  1. Пожалуйста, дайте мне знать, если я что-то упустил.
  2. Кроме того, мне бы хотелось узнать, как справиться с ошибкой одного / нескольких из множества запущенных потоков изменений.

1 Ответ

0 голосов
/ 04 мая 2020
final BsonDocument resumeToken = //getLatestResumeToken, first entry after sorting 
                                 //MongoChangeStreamEvent collection in descending order based on 
                                 //resumeToken

На какой документации основан этот лог c?

В текущих драйверах драйвер должен иметь возможность для передачи токена резюме для потока изменений в приложение. Существует несколько источников, из которых может исходить этот токен возобновления, и он также может иметь различные форматы. В любом случае это непрозрачный идентификатор, приложение не должно сортировать на маркере возобновления или что-то в этом роде.

Вы можете, например, увидеть , когда вы извлекал каждый токен возобновления и сортировал по этой отметке времени, но единственная операция, подходящая для токена возобновления, - это возвращение его драйверу для запуска нового потока изменений.

Пример для Ruby driver: https://docs.mongodb.com/ruby-driver/master/tutorials/ruby-driver-change-streams/#resuming -a-изменение потока

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...