Как мне обновить результаты запроса MongoDB, используя внутренний запрос? - PullRequest
0 голосов
/ 30 января 2020

ПРЕДПОСЫЛКИ

У меня есть коллекция json документов, которые представляют химические соединения. Соединение будет иметь идентификатор и имя. Внешний процесс генерирует новые составные документы с интервалами, и идентификаторы могут меняться в разных итерациях. Составные документы, чьи составные идентификаторы изменились, необходимо обновить, чтобы они указывали на самые последние идентификаторы итераций, и поэтому будут добавлены поле «lastUpdated» и поле relatedCompoundIds. Для демонстрации рассмотрим следующие соединения на трех этапах:

Шаг 1 : исходный документ соединения для «ацетона» создается с id = «001».

{
    "id": "001",
    "name": "acetone",
    "lastUpdated": "2000-01-01",
}

Шаг 2 : другая итерация генерирует ацетон, но с другим идентификатором.

{
    "id": "001",
    "name": "acetone",
    "lastUpdated": "2000-01-01"
}
{
    "id": "002",
    "name": "acetone",
    "lastUpdated": "2000-01-02"
}

Шаг 3 : соединение с идентификатором "001" будет добавьте массив «relatedCompoundIds», указывающий на любые другие соединения с тем же именем.

{
    "id": "001",
    "name": "acetone",
    "lastUpdated": "2000-01-02",
    "relatedCompoundIds": ["002"]
}
{
    "id": "002",
    "name": "acetone",
    "lastUpdated": "2000-01-02"
}

Я использую MongoDB для размещения этих записей и для разрешения «указателей» relatedCompoundId. Я получаю доступ к Mon go, используя Spring ReactiveMongoTemplate. Мой процесс выглядит следующим образом:

  1. Вставка вновь сгенерированных соединений в MongoDB.
  2. Для каждой записи, в которой "lastUpdated" было раньше:
  3. Получить все связанные соединения ( поиск по имени) и установите «relatedCompoundIds».

КОД

public class App {

    public static void main(String[] args) {
        public static ReactiveMongoTemplate mongoOps = new ReactiveMongoTemplate(MongoClients.create(),
                "CompoundStore");

        Date updatedDate = new Date();
        upsertAll(updatedDate, readPath);
        setRelatedCompounds(updatedDate);
    }

    private static void upsertAll(Date updatedDate, String readPath) {
        // [upsertion code here] <- this is working fine
    }

    private static void setRelatedCompounds(Date updatedDate) {
        mongoOps.find(//
                Query.query(Criteria.where("lastUpdated").lt(updatedDate)), Compound.class, "compound")//
                .doOnNext(compound -> {
                    findRelatedCompounds(updatedDate, compound)//
                            .doOnSuccess(rc -> {
                                if (rc.size() > 0) {
                                    compound.setRelatedCompoundIDs(rc);
                                    mongoOps.save(Mono.just(compound)).subscribe();
                                }
                            })//
                            .subscribe();
                }).blockLast();
    }

    private static Mono<List<String>> findRelatedCompounds(Date updatedDate, Compound compound) {
        Query query = new Query().addCriteria(new Criteria().andOperator(//
                Criteria.where("lastUpdated").gte(updatedDate), //
                Criteria.where("name").is(compound.getName)));
        query.fields().include("id");
        return mongoOps.find(query, Compound.class)//
                .map(c -> c.getId())//
                .filter(cid -> !StringUtils.isEmpty(cid))//
                .distinct().collectSortedList();
    }

}

ОШИБКА

По при выполнении я получаю следующую ошибку:

17:08:35.957 [Thread-12] ERROR org.mongodb.driver.client - Callback onResult call produced an error
com.mongodb.MongoException: org.springframework.data.mongodb.UncategorizedMongoDbException: Too many operations are already waiting for a connection. Max number of operations (maxWaitQueueSize) of 500 has been exceeded.; nested exception is com.mongodb.MongoWaitQueueFullException: Too many operations are already waiting for a connection. Max number of operations (maxWaitQueueSize) of 500 has been exceeded.
    at com.mongodb.MongoException.fromThrowableNonNull(MongoException.java:79)

Есть ли лучший способ выполнить sh, что я пытаюсь сделать? Как мне отрегулировать противодавление, чтобы не перегружать пн go? Другой совет?

EDIT

Указанную ошибку можно устранить, добавив модификатор limitRate после метода поиска внутри setRelatedCompounds.

private static void setRelatedCompounds(Date updatedDate) {
    mongoOps.find(//
            Query.query(Criteria.where("lastUpdated").lt(updatedDate)), Compound.class, "compound")//
            .limitRate(500)//
            .doOnNext(compound -> {
                // do work here
                    }).subscribe();
        }).blockLast();
}

Still открыт для предложений по альтернативным решениям.

...