Пакетная обработка с реактивным драйвером Couchbase java - PullRequest
2 голосов
/ 26 февраля 2020

Предположим, у меня есть корзина, из которой мне нужно получить документы с датой старше, чем сейчас. Этот документ выглядит следующим образом:

{
id: "1",
date: "Some date",
otherObjectKEY: "key1"
}

Для каждого документа мне нужно получить другой документ, используя его otherObjectKEY , отправить последний в топику kafka c, а затем удалить оригинал document.

Используя реагирующий java драйвер 3.0 , я смог сделать это примерно так:

public void batch(){
    streamOriginalObjects()
         .flatMap(originalObject -> fetchOtherObjectUsingItsKEY(originalObject)
                       .flatMap(otherObject -> sendToKafkaAndDeleteOriginalObject(originalObject))
         )
         .subscribe();
}

streamOriginalObjects ():

public Flux<OriginalObject> streamOriginalObjects(){
        return client.query("select ... and date <= '"+ LocalDateTime.now().toString() +"'")
                .flux()
                .flatMap(result -> result.rowsAs(OriginalObject.class));
    }

Это работает, как и ожидалось, но мне интересно, есть ли лучший подход (особенно с точки зрения производительности), чем потоковая передача и обработка элемент за элементом.

1 Ответ

2 голосов
/ 27 февраля 2020

Выполнение запроса N1QL, а затем разветвление операций ключ-значение из этого - полезный и распространенный шаблон. Это должно заставить разветвление происходить параллельно:

    streamOriginalObjects()
        // Split into numberOfThreads 'rails'
        .parallel(numberOfThreads)

        // Run on an unlimited thread pool
        .runOn(Schedulers.elastic())

        .concatMap(originalObject -> fetchOtherObjectUsingItsKEY(originalObject)
            .concatMap(otherObject -> sendToKafkaAndDeleteOriginalObject(originalObject))
        )

        // Back out of parallel mode
        .sequential()
        .subscribe();
...