Распараллелить цикл for в Java - PullRequest
0 голосов
/ 28 марта 2019

У меня есть цикл for, который перебирает список коллекций.Внутри цикла некоторые запросы на выбор / обновление выполняются в коллекции, которые не относятся к другим коллекциям.Так как каждая коллекция имеет много данных для обработки, я бы хотел распараллелить ее.

Фрагмент кода выглядит примерно так:

//Some variables that are used within the for loop logic
 for(String collection : collections) {
    //Select queries on collection
    //Update queries on collection
}

Как мне добиться этого в Java?

Ответы [ 3 ]

2 голосов
/ 28 марта 2019

Вы можете использовать метод parallelStream () (начиная с Java 8):

collections.parallelStream().forEach((collection) -> {
    //Select queries on collection
    //Update queries on collection
});

Больше информации о потоках .


Еще один способ сделать это - использовать Executors:

    try
    {
        final ExecutorService exec = Executors.newFixedThreadPool(collections.size());
        for (final String collection : collections)
        {
            exec.submit(() -> {
                // Select queries on collection
                // Update queries on collection
            });
        }

        // We want to wait that the jobs are done.
        final boolean terminated = exec.awaitTermination(500, TimeUnit.MILLISECONDS);
        if (terminated == false)
        {
            exec.shutdownNow();
        }

    } catch (final InterruptedException e)
    {
        e.printStackTrace();
    }

Этот пример более мощный, так как вы можете легко узнать, когда работа выполнена, принудительное завершение ... и более .

0 голосов
/ 28 марта 2019

Есть ряд вопросов, которые вы должны задать себе, чтобы найти правильный ответ:

Если бы у меня было столько потоков, сколько число ядер моего процессора, этого было бы достаточно?

Использование parallelStream() даст вам столько же потоков, сколько и ядра вашего процессора.

Будет ли параллелизация цикла повышать производительность или есть узкое место в БД?

Вы можете раскрутить 100 потоков, параллельно обрабатывая их, но это не значит, что вы будете делать вещи в 100 раз быстрее, если ваша БД или сеть не смогут обработать том.Блокировка БД также может быть проблемой здесь.

Нужно ли обрабатывать мои данные в определенном порядке?

Если вам нужно обрабатывать данные в определенном порядке, это может ограничить ваш выбор.Например, forEach() не гарантирует, что элементы вашей коллекции будут обрабатываться в определенном порядке, но forEachOrdered() (с затратами на производительность).

Может ли мой источник данных извлекать данныереактивно?

Бывают случаи, когда наш источник данных может предоставлять данные в виде потока.В этом случае вы всегда можете обработать этот поток, используя такие технологии, как RxJava или WebFlux.Это позволит вам по-другому подходить к вашей проблеме.

Сказав все вышесказанное, вы можете выбрать подход, который вам нужен (исполнители, RxJava и т. Д.), Который лучше соответствует вашим целям.

0 голосов
/ 28 марта 2019
final int numberOfThreads = 32;

final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);

// List to store the 'handles' (Futures) for all tasks:
final List<Future<MyResult>> futures = new ArrayList<>();

// Schedule one (parallel) task per String from "collections":
for(final String str : collections) {
  futures.add(executor.submit(() -> { return doSomethingWith(str); }));
}


// Wait until all tasks have completed:
for ( Future<MyResult> f : futures ) {
  MyResult aResult = f.get(); // Will block until the result of the task is available.
  // Optionally do something with the result...
}

executor.shutdown(); // Release the threads held by the executor.

// At this point all tasks have ended and we can continue as if they were all executed sequentially

Отрегулируйте numberOfThreads по мере необходимости для достижения наилучшей пропускной способности. Чем больше потоков, тем лучше будет использовать локальный процессор, но это может привести к увеличению нагрузки на удаленном конце. Чтобы получить хорошее использование локального ЦП, вы хотите иметь (намного) больше потоков, чем ЦП (/ ядер), чтобы всякий раз, когда один поток ожидал, например, для ответа из БД для выполнения на ЦП может быть включен другой поток.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...