ExecutorService Runnable, обработка Пакеты ArrayList, не завершающие обработку - PullRequest
0 голосов
/ 20 апреля 2020

РЕДАКТИРОВАТЬ: Спасибо, Марк, и для тех, у кого похожая проблема, моя проблема заключалась в том, что я сначала делал экземпляр Thread для запускаемого класса, а затем отправлял поток в службу executorservice.
Это помогло Я понимаю, что на самом деле, когда я использую ExecutorService, если есть необработанное исключение; он не сообщит вам, он отменит процесс, без уведомления. Вот почему я получаю незавершенную обработку.

У меня есть ArrayList объектов, которые я хочу обрабатывать партиями в многопоточном режиме, но ограничиваю количество потоков, запущенных в данный момент времени. Я обнаружил, что ExecutorService может справиться с этим. Но после проверки, обрабатывает ли она каждую запись, кажется, что она обрабатывает только очень небольшую часть объектов, которые я ей передаю.

РЕДАКТИРОВАТЬ: я удалил многопоточную часть, и обрабатывал объекты как обычно без использования службы executor, в небольшой партии (только 710) он работает нормально; Есть ли вероятность того, что потоки завершаются слишком быстро и обрабатываются неправильно? Это обычно позволяет обрабатывать около 300 000-800 записей одновременно; вот почему я хотел бы многопоточность.

public void processContainerRecords(ArrayList<? extends ContainerRecord> records) {
    int cores = Runtime.getRuntime().availableProcessors();
    ExecutorService executor = Executors.newFixedThreadPool(cores);
    int batchSize = Settings.LOGIC_BATCH_SIZE;//100
    int batches = (int) Math.ceil((double) records.size() / (double) batchSize);

    ArrayList<Future<?>> threads = new ArrayList<Future<?>>();
    LogicProcessor newHandler = null;
    for (int startIndex = 0; startIndex < records.size(); startIndex += batchSize + 1) {
        if (records.size() < batchSize) {
            newHandler = new LogicProcessor(mainGUI, records.subList(startIndex, records.size()));
        } else {
            int bound = (startIndex + batchSize);
            if (bound > records.size()) {
                bound = records.size();
            }
            newHandler = new LogicProcessor(mainGUI, records.subList(startIndex, bound));
        }
        Thread newThread = new Thread(newHandler);
        Future<?> f = executor.submit(newThread);
        threads.add(f);
    }
    executor.shutdown();
    int completedThreads = 0;
    while (!executor.isTerminated()) {//monitors threads and waits until completion
        completedThreads = 0;
        for (Future<?> f : threads) {
            if (f.isDone()) {
                completedThreads++;
            }
        }
        //currentProgress = completedThreads;
    }

    for (ContainerRecord record : records) {//checks if each record has been processed
        System.out.println(record.getContainer() + ":" + record.isTouched());
    }
}

Это класс LogicProcessor, который запускает экземпляры потока

    private List<? extends ContainerRecord> archive;
private GUI mainGUI;

public LogicProcessor(GUI mainGUI, List<? extends ContainerRecord> records) {
    this.mainGUI = mainGUI;
    this.archive = records;
}

@Override
public void run() {
    handleLogic();
}

private void handleLogic() {
    Iterator iterator = archive.iterator();
    while (iterator.hasNext()) {
        ContainerRecord record = (ContainerRecord) iterator.next();
        record.touch();//sets a boolean in the object to validate if it has been processed yet.
    }
}

Вывод: из 710 записей (объектов) обработано, 691 никогда не обрабатывалось / не трогалось, и только 19 имеют.

Что с этим не так? Я много чего пытался сделать, даже создав массив класса LogicProcessor и сохранив экземпляры в массиве, чтобы избежать какого-либо G C удаления экземпляра. Я не уверен, почему он не обрабатывает эти записи.

1 Ответ

0 голосов
/ 20 апреля 2020

У меня сейчас нет компьютера для запуска тестов, но я смотрю на ваш код, поэтому мой ответ основан на личном опыте и может даже не выглядеть как обзор кода, поскольку отсутствие ясности кода является источником ошибок:)

  1. Не отправляйте new Thread в службу исполнителя. Весь смысл исполнителей в том, чтобы скрыть слово с темами от пользователя. Вместо этого ваш LogicProcessor должен реализовывать интерфейс Runnable / Callable в зависимости от того, хотите ли вы вернуть значение или нет.

  2. Проверьте еще раз логи c разбиения по партиям. Если вы используете guava, у него уже есть реализация логики секционирования c. См. этот урок . Я признаю, что это скорее личное предпочтение, и ваш код может быть просто в порядке. Я не проверил подробно.

  3. Способ отключения и обработка фьючерсов могут быть упрощены.

Вызов метода shutdown приводит к тому, что служба-исполнитель прекращает принимать новые задачи для выполнения, но не закрывает службу c сразу, вместо этого она будет ждать, пока все задачи, которые у нее уже есть, будут выполнены. Обычно такой пул потоков создается в начале жизненного цикла приложения и сохраняется в течение всего времени работы приложения. Создание пула довольно дорого, поскольку оно распределяет потоки.

Если вы хотите, чтобы пул оставался открытым, но убедитесь, что все задачи выполнены, вы можете использовать l oop, чтобы перебирать фьючерсы, как и вы.

Так что я не вижу смысла использовать оба. Если вы выделяете пул только для того, чтобы отправить кучу задач - достаточно вызвать shutdown. В противном случае вы можете использовать циклы и рассматривать пул как глобальный объект и вызывать shutdown в другом месте, как я объяснил выше.

...