Подождать, пока подписка на поток реактивных изменений станет активной с Spring Data MongoDB? - PullRequest
1 голос
/ 10 июля 2020

При подписке на потоки изменений с использованием блокировки Spring Data Mon go реализации можно вызвать await , чтобы дождаться, пока подписка станет активной:

Subscription subscription = startBlockingMongoChangeStream();
subscription.await(Duration.of(2, SECONDS));
Document someDocument = ..
writeDocumentToMongoDb(someDocument);

startBlockingMongoChangeStream реализуется в следующих строках:

public Subscription startBlockingMongoChangeStream() {

    MessageListenerContainer container = new DefaultMessageListenerContainer(template);
    container.start();                                                                                        

    MessageListener<ChangeStreamDocument<Document>, Document> listener = System.out::println;                     
    ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty()); 

    return container.register(new ChangeStreamRequest<>(listener, options), Document.class);
}

Если await не используется в приведенном выше примере, существует вероятность (практически 100% вероятность, если JVM hot), что someDocument написано перед подписка активна, и поэтому someDocument пропущено. Таким образом, добавление await смягчает эту проблему.

Я ищу способ добиться того же при использовании реактивной реализации. Код теперь выглядит примерно так:

Disposable disposable = startReactiveMongoChangeStream().subscribe(); // (1)
Document someDocument = ..
writeDocumentToMongoDb(someDocument).subscribe(); // (2)

Проблема здесь, опять же, в том, что someDocument записывается до начала подписки, возвращаемой startReactiveMongoChangeStream, и, таким образом, документ пропущен.

Также обратите внимание, что это несколько надуманный пример, поскольку в моем собственном приложении writeDocumentToMongoDb (2) не знает о подписке startReactiveMongoChangeStream (1), поэтому я не могу просто flatMap (1) и вызвать (2) . Метод startReactiveMongoChangeStream реализуется следующим образом:

public Flux<ChangeStreamEvent<String>> startReactiveMongoChangeStream() {
    return reactiveTemplate.changeStream(String.class) 
                           .watchCollection("user")
                           .listen();    
}

Как я могу «смоделировать» функциональность await, доступную в реализации блокировки в реактивной реализации?

1 Ответ

1 голос
/ 10 июля 2020

TL; DR

В реактивном API нет средств для синхронизации

Объяснение

Во-первых, давайте рассмотрим обе реализации, чтобы понять, почему это так.

Реализация блокировки использует API курсора MongoDB для получения курсора. Получение курсора включает разговор с сервером. После того, как MessageListenerContainer получил курсоры, он переключает задачу подписки на активную, что означает, что вы ожидали этапа, на котором был выбран первый курсор.

Реактивная реализация работает с ChangeStreamPublisher. Из протокола реактивных потоков можно получить уведомление, когда элемент испускается, когда поток завершается или терпит неудачу. Уведомление о начале или завершении активности на стороне сервера недоступно. Следовательно, нельзя ждать, пока реактивный API получит первый курсор. Поскольку курсоры могут быть пустыми, первый курсор может вообще не выдавать никакого значения.

Я думаю, что драйвер MongoDB может предоставить API-интерфейс в стиле обратного вызова, чтобы получать уведомления о том, что поток активен. Однако об этом следует сообщить в системе отслеживания проблем MongoDB .

...