Что такое поведение ParallelStream Queue? - PullRequest
0 голосов
/ 04 сентября 2018

Я использую ParallelsStream для параллельной загрузки некоторых файлов, некоторые из них большие, некоторые маленькие. Я заметил, что не все рабочие используются.

Сначала все работает нормально, все потоки используются (я установил для параметра параллелизма значение 16). Затем в определенный момент (когда он попадает в файлы большего размера) он использует только один поток

упрощенный код:

files.parallelStream().forEach((file) -> {
    try (FileInputStream fileInputStream = new FileInputStream(file)) {
                IDocumentStorageAdaptor uploader = null;

                try {
                    logger.debug("Adaptors before taking: " + uploaderPool.size());
                    uploader = uploaderPool.take();
                    logger.debug("Took an adaptor!");
                    logger.debug("Adaptors after taking: " + uploaderPool.size());
                    uploader.addNewFile(file);
                } finally {
                    if (uploader != null) {
                        logger.debug("Adding one back!");
                        uploaderPool.put(uploader);
                        logger.debug("Adaptors after putting: " + uploaderPool.size());
                    }
                }
            } catch (InterruptedException | IOException e) {
                throw new UploadException(e);
            }
});

uploaderPool - это ArrayBlockingQueue. журналы:

[ForkJoinPool.commonPool-worker-8] - Adaptors before taking: 0
[ForkJoinPool.commonPool-worker-15] - Adding one back!
[ForkJoinPool.commonPool-worker-8] - Took an adaptor!
[ForkJoinPool.commonPool-worker-15] - Adaptors after putting: 0
...
...
...
[ForkJoinPool.commonPool-worker-10] - Adding one back!
[ForkJoinPool.commonPool-worker-10] - Adaptors after putting: 16
[ForkJoinPool.commonPool-worker-10] - Adaptors before taking: 16
[ForkJoinPool.commonPool-worker-10] - Took an adaptor!
[ForkJoinPool.commonPool-worker-10] - Adaptors after taking: 15
[ForkJoinPool.commonPool-worker-10] - Adding one back!
[ForkJoinPool.commonPool-worker-10] - Adaptors after putting: 16
[ForkJoinPool.commonPool-worker-10] - Adaptors before taking: 16
[ForkJoinPool.commonPool-worker-10] - Took an adaptor!
[ForkJoinPool.commonPool-worker-10] - Adaptors after taking: 15

Похоже, что вся работа (элементы в списке) распределяется между 16 потоками, и вещи, делегированные одному потоку, будут просто ждать, пока поток будет свободен, а не использовать доступный поток. Есть ли способ изменить способ параллельной работы своей очереди? Я читаю документы в forkjoinpool, и там упоминается о краже работы, но только для порожденных подзадач.

Мой другой план состоял в том, чтобы, возможно, рандомизировать сортировку списка, в котором я использую parallelStream, и, возможно, это сбалансировало бы вещи.

Спасибо!

1 Ответ

0 голосов
/ 04 сентября 2018

Эвристика split-vs-compute для параллельных потоков настроена для параллельных операций с данными, а не для параллельных операций ввода-вывода. (Другими словами, они настроены так, чтобы поддерживать занятость процессоров, но не генерировать больше задач, чем у вас.) В настоящее время нет вариантов переопределения этих вариантов.

...