Я предполагаю, что вы говорите о потоках изменений MongoDB.
Да, вы можете зарегистрировать слушателя:
Императивный стиль
События потока изменений могут потребляться с использованием MessageListener, зарегистрированного в MessageListenerContainer. Контейнер заботится о запуске задачи в отдельном потоке, отправляя события в MessageListener.
@Configuration
class Config {
@Bean
MessageListenerContainer messageListenerContainer(MongoTemplate template) {
return new DefaultMessageListenerContainer(template);
}
}
Как только MessageListenerContainer будет создан, MessageListeners могут быть зарегистрированы.
MessageListener<ChangeStreamDocument<Document>, Person> messageListener = (message) -> {
System.out.println("Hello " + message.getBody().getFirstname());
};
ChangeStreamRequest<Person> request = ChangeStreamRequest.builder()
.collection("person")
.filter(newAggregation(match(where("operationType").is("insert"))))
.publishTo(messageListener)
.build();
Subscription subscription = messageListenerContainer.register(request, Person.class);
// ...
Реактивный стиль
События потока изменений непосредственно потребляются через Flux, подключенный к потоку изменений.
Flux changeStream = reactiveTemplate
.changeStream(newAggregation(match(where("operationType").is("insert"))),
Person.class, ChangeStreamOptions.empty(), "person");
changeStream.doOnNext(event -> System.out.println("Hello " + event.getBody().getFirstname()))
.subscribe();
Подробнее об этом:
https://github.com/spring-projects/spring-data-examples/tree/master/mongodb/change-streams