Я реализовал Reactive Asyn c REST API с Spring Web Flux и MongoDB Change Streams , который работает правильно. Но вместо передачи объекта Aggregation с критериями в ".filter(arg0)" method
, я хотел бы передать M ongoDB Aggregation JSON string в метод фильтра, такой как:
ChangeStreamOptions.builder()
.filter(Document.parse("[{$match: {age: {$gt: 18}}}, {'operationType': 'insert'}]"))
.returnFullDocumentOnUpdate().build();
Приведенный выше фрагмент кода не работает, потому что он не правильный.
Вот моя рабочая реализация ниже. И метод
public Flux<Person> watchAgeGreaterThan(Integer ageParam){...}
вызывается через конечную точку REST "/api/rest/age/greaterthan/{ageParam}"
для асинхронного возврата потока каждый раз, когда документ вставляется в MongoDB "person_collection"
и его поле «возраст» больше, чем "ageParam".
import org.springframework.data.mongodb.core.ChangeStreamEvent;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.query.Criteria;
@Service
public class Watch {
@Autowired
private ReactiveMongoTemplate reactiveMongoTemplate;
public Flux<Person> watchAgeGreaterThan(Integer ageParam) {
ChangeStreamOptions options = ChangeStreamOptions.builder()
.filter(Aggregation.newAggregation(Person.class,
Aggregation.match(
Criteria.where("operationType").is("insert")
.and("fullDocument.age").gt(ageParam)
))
).returnFullDocumentOnUpdate().build();
return reactiveMongoTemplate.changeStream("person_collection", options, Person.class)
.map(ChangeStreamEvent::getBody)
.doOnError(throwable -> log.error("Error on 'person' change stream event :: "
+ throwable.getMessage(), throwable));
}
}
Я пробовал также другую реализацию, которую вы можете увидеть ниже:
public class CustomAggregationOperation implements AggregationOperation {
private String aggregationJsonString;
public CustomAggregationOperation(String aggregationJsonString) {
this.aggregationJsonString = aggregationJsonString;
}
@Override
public Document toDocument(AggregationOperationContext context) {
return context.getMappedObject(Document.parse(aggregationJsonString));
}
}
@Service
public class Watch {
@Autowired
private ReactiveMongoTemplate reactiveMongoTemplate;
public Flux<Person> watchAgeGreaterThan(Integer ageParam) {
String queryJsonString = "{$match: {age: {$gt: 18}}}";
AggregationOperation matchOperationType = Aggregation.match(Criteria.where("operationType").is("insert"));
TypedAggregation<Person> aggregation = Aggregation.newAggregation(Person.class,
matchOperationType, new CustomAggregationOperation(queryJsonString));
ChangeStreamOptions options2 = ChangeStreamOptions.builder().filter(aggregation).returnFullDocumentOnUpdate().build();
return reactiveMongoTemplate.changeStream("person_collection", options2, Person.class)
.map(ChangeStreamEvent::getBody)
.doOnError(throwable -> log.error("Error on 'person' change stream event :: "
+ throwable.getMessage(), throwable));
}
}
Но эта не работает.