Проблема с OOM при копировании документов базы данных из одного сегмента в другой с использованием реактивного программирования - PullRequest
0 голосов
/ 05 марта 2019

Мы пытаемся скопировать данные из одного сегмента в другой с помощью реактивного программирования (около 1 миллиона документов).Мы получаем OOM в этом куске кода.Я не эксперт по rxjava и нуждаюсь в помощи, чтобы предотвратить ООМ.Я думаю, что чтение происходит быстрее, чем запись, и это вызывает OOM из-за переполнения буфера.Код выглядит следующим образом:

CountDownLatch countDownLatch5 = new CountDownLatch(1);
Observable
        .from(n1qlKeysForDocsGPC)
        .flatMap(new Func1<String, Observable<JsonDocument>>() {
            @Override
            public Observable<JsonDocument> call(String key) {
                return readPrimaryMainAsyncBucket
                        .get(key, 10, TimeUnit.SECONDS)
                        .onErrorResumeNext(readPrimaryMainAsyncBucket.get(key, 10, TimeUnit.SECONDS))
                        .retry(50)
                        .switchIfEmpty(Observable.empty())
                        .onErrorResumeNext(Observable.empty());
            }
        })
        .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
            @Override
            public Observable<JsonDocument> call(JsonDocument jsonDocument) {
                return readPrimaryBackupAsyncBucket.upsert(jsonDocument, 10, TimeUnit.SECONDS).retry(50);
            }
        })
        .last()
        .doOnTerminate(new Action0() {
            @Override
            public void call() {
                countDownLatch5.countDown();
            }
        })
        .subscribe();
try {
    countDownLatch5.await();
    logger.info("DataRecoverySchedulers | Completed countDownLatch5");
} catch (InterruptedException e) {
    e.printStackTrace();
}

1 Ответ

0 голосов
/ 06 марта 2019

Версии Java SDK Couchbase до 3.x (которые еще не вышли на момент написания этой статьи) используют RxJava версии 1.

Вызовы flatmap, как они у вас сейчас, будут публиковать операцииво внутренний буфер для асинхронного выполнения, возвращая Observable для отслеживания каждого из них.Это означает, что первый flatmap будет использовать вывод вашего from вызова неограниченным образом.Другими словами, он будет читать весь список намного быстрее, чем операции.Я ожидаю, что ошибка OOM, которую вы видите, происходит из-за переполнения внутреннего буфера Couchbase.

Чтобы исправить это, вы можете использовать вариант flatmap, который ограничивает количество ожидающих подписок.Вы просто добавляете второй целочисленный параметр к вашему вызову flatmap.Таким образом, у вас будет .flatmap(new Func1<~>..., 10), чтобы ограничить себя 10 невыполненными операциями за раз.

Стандартный буфер в Couchbase составляет около 16000 невыполненных операций, но это гораздо больше, чем нужно для насыщения большинства систем.

Для справки см. Связанную статью Пост переполнения стека об ограничении пропускной способности для загрузки файлов.

...