При подписке на потоки изменений с использованием блокировки Spring Data Mon go реализации можно вызвать await , чтобы дождаться, пока подписка станет активной:
Subscription subscription = startBlockingMongoChangeStream();
subscription.await(Duration.of(2, SECONDS));
Document someDocument = ..
writeDocumentToMongoDb(someDocument);
startBlockingMongoChangeStream
реализуется в следующих строках:
public Subscription startBlockingMongoChangeStream() {
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();
MessageListener<ChangeStreamDocument<Document>, Document> listener = System.out::println;
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty());
return container.register(new ChangeStreamRequest<>(listener, options), Document.class);
}
Если await
не используется в приведенном выше примере, существует вероятность (практически 100% вероятность, если JVM hot), что someDocument
написано перед подписка активна, и поэтому someDocument
пропущено. Таким образом, добавление await
смягчает эту проблему.
Я ищу способ добиться того же при использовании реактивной реализации. Код теперь выглядит примерно так:
Disposable disposable = startReactiveMongoChangeStream().subscribe(); // (1)
Document someDocument = ..
writeDocumentToMongoDb(someDocument).subscribe(); // (2)
Проблема здесь, опять же, в том, что someDocument
записывается до начала подписки, возвращаемой startReactiveMongoChangeStream
, и, таким образом, документ пропущен.
Также обратите внимание, что это несколько надуманный пример, поскольку в моем собственном приложении writeDocumentToMongoDb
(2) не знает о подписке startReactiveMongoChangeStream
(1), поэтому я не могу просто flatMap
(1) и вызвать (2) . Метод startReactiveMongoChangeStream
реализуется следующим образом:
public Flux<ChangeStreamEvent<String>> startReactiveMongoChangeStream() {
return reactiveTemplate.changeStream(String.class)
.watchCollection("user")
.listen();
}
Как я могу «смоделировать» функциональность await
, доступную в реализации блокировки в реактивной реализации?