Воссоздание ExecutorService в l oop для пакетной обработки - PullRequest
1 голос
/ 29 февраля 2020

Пример: мне нужно обрабатывать миллионы записей, я хочу обрабатывать их параллельно, чтобы ускорить обработку. Для этой цели я хочу использовать пул потоков в Executor Service. Каждое задание занимает максимум несколько секунд. Чтобы не создавать миллион потоков для каждой записи в одном пуле потоков, что в моем случае приводило к проблемам с памятью, я решил обрабатывать записи партиями.

Я хочу использовать для каждого пакета новый пул потоков. Я заставляю Executor Service подождать, пока пакетные задачи не будут завершены, а затем выключу Executor Service и создаю новую для обработки следующей партии. Я делаю что-то вроде этого:

/*...................*/
int count = 1;
ExecutorService executor = buildExecutor(CORE_THREADS, MAX_THREADS);
            while (/* there is a record */) {
                executor.execute(new ProcessRecordThread(record));
                count++;
                if (count % BATCH_SIZE == 0) {
                    executor.shutdown();
                    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
                    executor = buildExecutor(CORE_THREADS, MAX_THREADS);
                }
            }
 /*................*/

Метод создания службы Executor

private static ExecutorService buildExecutor(int corePoolSize, int maximumPoolSize) {
            return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L,
                    TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue(),
                    Executors.defaultThreadFactory());
        }

Я знаю, что создание пула потоков добавляет некоторые накладные расходы на обработку. Создание службы исполнителя в l oop считается плохой практикой. Есть ли какие-нибудь компромиссы, о которых я должен знать?

Есть ли способ, как добиться такого поведения, просто используя один пул потоков?

1 Ответ

0 голосов
/ 29 февраля 2020

Класс сборки имеет один общий c общий источник данных, например C3P0 (с открытым исходным кодом) в одноэлементном шаблоне для управления соединением.

config c3p0.maxPoolSize = x и запуск потока x для получения соединения из источника данных, а затем выполнения sql.

пример:

public class C3P0Pool {
    private static ComboPooledDataSource CPDS = new ComboPooledDataSource("c3p0");
    public static Connection getConnection() throws SQLException {
        return CPDS.getConnection();
    }
    public static DataSource getDataSource() {
        return CPDS;  // QueryRunner could constructed with DataSource
    }
}

Вам не нужно закрывать dataSource, соединение будет возвращаться к dataSource при выполнении connection.close().

...