Как использовать агрегаты MongoDB JSON String в качестве аргумента «arg0» в Spring Data ChangeStreamOptions.builder (). Filter (arg0) - PullRequest
1 голос
/ 19 февраля 2020

Я реализовал Reactive Asyn c REST API с Spring Web Flux и MongoDB Change Streams , который работает правильно. Но вместо передачи объекта Aggregation с критериями в ".filter(arg0)" method, я хотел бы передать M ongoDB Aggregation JSON string в метод фильтра, такой как:

ChangeStreamOptions.builder()
        .filter(Document.parse("[{$match: {age: {$gt: 18}}}, {'operationType': 'insert'}]"))
        .returnFullDocumentOnUpdate().build();

Приведенный выше фрагмент кода не работает, потому что он не правильный.

Вот моя рабочая реализация ниже. И метод

public Flux<Person> watchAgeGreaterThan(Integer ageParam){...}

вызывается через конечную точку REST "/api/rest/age/greaterthan/{ageParam}" для асинхронного возврата потока каждый раз, когда документ вставляется в MongoDB "person_collection" и его поле «возраст» больше, чем "ageParam".

import org.springframework.data.mongodb.core.ChangeStreamEvent;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.query.Criteria;


@Service
public class Watch {

    @Autowired
    private ReactiveMongoTemplate reactiveMongoTemplate;


    public Flux<Person> watchAgeGreaterThan(Integer ageParam) {

        ChangeStreamOptions options = ChangeStreamOptions.builder()
            .filter(Aggregation.newAggregation(Person.class, 
                        Aggregation.match(
                            Criteria.where("operationType").is("insert")
                                .and("fullDocument.age").gt(ageParam)
                        ))
                    ).returnFullDocumentOnUpdate().build();

        return reactiveMongoTemplate.changeStream("person_collection", options, Person.class)
                    .map(ChangeStreamEvent::getBody)
                    .doOnError(throwable -> log.error("Error on 'person' change stream event :: " 
                                                    + throwable.getMessage(), throwable));
    }

}

Я пробовал также другую реализацию, которую вы можете увидеть ниже:

public class CustomAggregationOperation implements AggregationOperation {
    private String aggregationJsonString;

    public CustomAggregationOperation(String aggregationJsonString) {
        this.aggregationJsonString = aggregationJsonString;
    }

    @Override
    public Document toDocument(AggregationOperationContext context) {
        return context.getMappedObject(Document.parse(aggregationJsonString));
    }
}


@Service
public class Watch {

    @Autowired
    private ReactiveMongoTemplate reactiveMongoTemplate;


    public Flux<Person> watchAgeGreaterThan(Integer ageParam) {
        String queryJsonString = "{$match: {age: {$gt: 18}}}";

        AggregationOperation matchOperationType = Aggregation.match(Criteria.where("operationType").is("insert"));

        TypedAggregation<Person> aggregation = Aggregation.newAggregation(Person.class, 
                                                    matchOperationType, new CustomAggregationOperation(queryJsonString));

        ChangeStreamOptions options2 = ChangeStreamOptions.builder().filter(aggregation).returnFullDocumentOnUpdate().build();

        return reactiveMongoTemplate.changeStream("person_collection", options2, Person.class)
                    .map(ChangeStreamEvent::getBody)
                    .doOnError(throwable -> log.error("Error on 'person' change stream event :: " 
                                                    + throwable.getMessage(), throwable));
    }
}

Но эта не работает.

...