Повышение производительности при чтении файла построчно и обработке - PullRequest
0 голосов
/ 01 июня 2018

У меня есть фрагмент кода Java, который выполняет следующее -

  1. Открывает файл с данными в формате {A, B, C}, и каждый файл имеет ок.5000000 строк.
  2. Для каждой строки в файле вызовите службу, которая задает столбец D и добавьте его к {A, B, C} как {A, B, C, D}.
  3. Запишите эту запись в блок chunkedwriter, который в конечном итоге сгруппирует 10000 строк для обратной записи фрагмента в удаленное местоположение.

Сейчас выполнение кода занимает 32 часа.Этот процесс снова будет повторяться для другого файла, который гипотетически занимает еще 32 часа, но нам нужно, чтобы эти процессы выполнялись ежедневно.

Шаг 2 еще более усложняется тем, что иногда служба не имеет D, но предназначена длявозьмите D из своего хранилища суперданных, чтобы оно выдало временное исключение с просьбой подождать.У нас есть попытки обработать это, чтобы технически повторить запись 5 раз с максимальной задержкой 60000 миллисекунд.Таким образом, в худшем случае мы могли бы посмотреть на 5000000 * 5.

Комбинация {A, B, C} уникальна и, следовательно, результат D не может быть кэширован и использован повторно, и должен быть сделан новый запросчтобы каждый раз получать D.

Я пытался добавить потоки, подобные этому:

temporaryFile = File.createTempFile(key, ".tmp");
Files.copy(stream, temporaryFile.toPath(), 
       StandardCopyOption.REPLACE_EXISTING);
reader = new BufferedReader(new InputStreamReader(new 
       FileInputStream(temporaryFile), StandardCharsets.UTF_8));
String entry;
while ((entry = reader.readLine()) != null) {
   final String finalEntry = entry;
   service.execute(() -> {
         try {
             processEntry(finalEntry);
         } catch (Exception e) {
             log.error("something");
   });
   count++;
 }

Здесь метод processEntry абстрагирует детали реализации, описанные выше, и потоки определены как

ExecutorService service = Executors.newFixedThreadPool(10);

Проблема, с которой я столкнулся, заключается в первом наборе потоков, но процесс не ждет, пока все потоки не завершат свою работу и все 5000000 строк не будут завершены.Таким образом, задача, которая раньше ожидала завершения в течение 32 часов, теперь заканчивается <1 минутой, что портит состояние нашей системы.Есть ли альтернативные способы сделать это?Как я могу заставить процесс ждать завершения всех потоков? </p>

Ответы [ 3 ]

0 голосов
/ 01 июня 2018
  • Подумайте об использовании ExecutorCompletionService, если вы хотите выполнять задачи по мере их выполнения, вам нужен ExecutorCompletionService.Это действует как BlockingQueue, который позволит вам опрашивать задачи как и когда они завершатся.
  • Другое решение - дождаться завершения исполнителя, а затем закрыть его, используя: ExecutorService service = Executors.newFixedThreadPool(10); service .shutdown(); while (!service .isTerminated()) {}
0 голосов
/ 01 июня 2018

Проблема, с которой я столкнулся, заключается в том, что первый набор потоков вращается, но процесс не ждет, пока все потоки не завершат свою работу и все 5000000 строк не будут завершены.

Когдавы выполняете задания, используя ExecutorService, они добавляются в службу и запускаются в фоновом режиме.Чтобы дождаться их завершения, нужно дождаться завершения службы:

ExecutorService service = Executors.newFixedThreadPool(10);
// submit jobs to the service here
// after the last job has been submitted, we immediately shutdown the service
service.shutdown();
// then we can wait for it to terminate as the jobs run in the background
service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

Кроме того, если в этих файлах есть дерьмо, я бы порекомендовал использовать ограниченная очередь для заданий, так что вы не выбрасываете память, эффективно кэшируя все строки в файле.Это работает, только если файлы остаются и не исчезают.

// this is the same as a newFixedThreadPool(10) but with a queue of 100
ExecutorService service = new ThreadPoolExecutor(10, 10,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>(100));
// set a rejected execution handler so we block the caller once the queue is full
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

Запишите эту запись в блок chunkedwriter, который в итоге группирует 10000 строк для обратной записи фрагмента в удаленное местоположение

По завершении каждого задания A, B, C, если его нужно обработать на втором шаге, я бы также рекомендовал изучить ExecutorCompletionService, который позволяет объединять различные пулы потоков вместе, чтобы получить линиипо окончании они сразу же начнут работать на 2-й фазе обработки.

Если вместо этого chunkedWriter является просто одним потоком, то я бы рекомендовал разделить BlockingQueue<Result> и поместить потоки исполнителя в очередькак только строки сделаны, и chunkedWriter извлекает из очереди и выполняет группирование и запись результатов.В этой ситуации указание потоку записи, что это сделано, должно быть обработано осторожно - возможно, с помощью какой-то постоянной END_RESULT, помещенной в очередь главным потоком, ожидающим завершения службы.

0 голосов
/ 01 июня 2018

Одной из альтернатив является использование защелки, чтобы дождаться завершения всех задач, прежде чем завершить работу исполнителя в главном потоке.

Инициализировать CountdownLatch с 1.
После выхода из цикла, который отправляетзадач, вы вызываете latch.await ();

В задаче, которую вы запускаете, вы должны иметь обратный вызов начального класса, чтобы сообщить ему, когда задача завершена.

Обратите внимание, чтов начальном классе функция обратного вызова должна быть синхронизирована.

В начальном классе этот обратный вызов используется для подсчета количества выполненных задач.

Также внутри обратного вызова, когда все задачи завершеныВы вызываете latch.countdown () для продолжения основного потока, скажем, выключения исполнителя и выхода.

Это показывает основную концепцию, ее можно реализовать с большей детализацией и большим контролем над завершеннымЗадачи при необходимости.

Это будет примерно так:

public class StartingClass {


    CountDownLatch latch = new CountDownLatch(1);

    ExecutorService service = Executors.newFixedThreadPool(10);
    BufferedReader reader;
    Path stream;
    int count = 0;
    int completed = 0;
    public void runTheProcess() {
        File temporaryFile = File.createTempFile(key, ".tmp");
        Files.copy(stream, temporaryFile.toPath(), 
               StandardCopyOption.REPLACE_EXISTING);
        reader = new BufferedReader(new InputStreamReader(new 
               FileInputStream(temporaryFile), StandardCharsets.UTF_8));
        String entry;
        while ((entry = reader.readLine()) != null) {
           final String finalEntry = entry;
           service.execute(new Task(this,finalEntry));
           count++;
        }
        latch.await();
        service.shutdown();
    }

    public synchronized void processEntry(String entry) {

    }

    public synchronized void taskCompleted() {
        completed++;
        if(completed == count) {
            latch.countDown();;
        }
    }

    //This can be put in a different file.
    public static class Task implements Runnable {
        StartingClass startingClass;
        String finalEntry;

        public Task(StartingClass startingClass, String finalEntry) {
            this.startingClass = startingClass;
            this.finalEntry = finalEntry;
        }

        @Override
        public void run() {
            try {
                startingClass.processEntry(finalEntry);
                startingClass.taskCompleted();
             } catch (Exception e) {
                 //log.error("something");
             }; 
        }

    }

}

Обратите внимание, что вам нужно закрыть файл.Также может быть написано, что отследить исполнителя нужно подождать несколько секунд, прежде чем принудительно завершить работу.

...