Как обрабатывать строки файла параллельно? - PullRequest
0 голосов
/ 16 мая 2018

Я хочу прочитать большой файл, обработать каждую строку и вставить результаты в базу данных. Моя цель - распараллелить обработку строк, так как каждый процесс - длительная задача. Поэтому я хочу, чтобы один поток продолжал читать, несколько потоков продолжал обрабатывать, а один поток продолжал вставлять куски в дб.

Я разбил это следующим образом:

1) последовательное (простое) чтение файла строка за строкой

2) отправлять каждую строку в пул потоков (3 потока), так как обработка является длительной задачей. заблокировать дальнейшее чтение строки, пока пул потоков занят.

3) записать каждую обработанную строку из каждого theadpool в StringBuffer

4) контролировать размер буфера и записывать результаты в виде фрагментов в базу данных (например, каждые 1000 записей)

ExecutorService executor = Executors.newFixedThreadPool(3);

StringBuffer sb = new StringBuffer();

String line;
AtomicInteger count = new AtomicInteger(0);
while ((line = reader.read()) != null) {
    count.getAndIncrement();
    Future<String> future = executor.submit(() -> {
        return processor.process(line);
    });

    //PROBLEM: this blocks until the future returns
    sb.append(future.get());

    if (count.get() == 100) {
        bufferChunk = sb;
        count = new AtomicInteger(0);
        sb = new StringBuffer();

        databaseService.batchInsert(bufferChunk.toString());
    }
}

Проблемы:

  • future.get() всегда будет блокировать читателя до тех пор, пока одно будущее не вернет результат

  • буфер "мониторинг", вероятно, не правильно

Возможно, я делаю это неправильно. Но как мне этого добиться?

Sidenote: размер файла составляет около 10 ГБ, поэтому я не могу сначала прочитать весь файл в память для подготовки параллельных задач.

Ответы [ 3 ]

0 голосов
/ 16 мая 2018

Я считаю следующее решение элегантным.Это только один из многих возможных, но концептуально он простой и

  • регулирует чтение,
  • накапливает только минимальное количество состояний, готовых для отчета в конце
  • не требует явной обработки потоков

Я размещаю здесь только настоящий метод тестирования с полной настройкой теста и вспомогательными структурами данных, доступными в выделенном репозитории GitHub :

private final AtomicInteger count = new AtomicInteger();

private final Consumer<String> processor = (value) -> {
    count.incrementAndGet();
};

@Test
public void onlyReadWhenExecutorAvailable() throws Exception {

    Executor executor = Executors.newCachedThreadPool();

    CompletableFuture<Void> done = CompletableFuture.completedFuture(null);
    for (Semaphore semaphore = new Semaphore(CONCURRENCY_LEVEL); ; ) {
        String value = reader.read();
        if (value == null) {
            break;
        }

        semaphore.acquire();

        CompletableFuture<Void> future = CompletableFuture.completedFuture(value)
            .thenAcceptAsync(v -> {
                processor.accept(v);
                semaphore.release();
            }, executor);

        done = done.thenCompose($ -> future);
    }
    done.get();

    assertEquals(ENTRIES, count.get());
}
0 голосов
/ 17 мая 2018

После более глубокого исследования я обнаружил, что BlockingExecutor, представленный в этом ответе, наиболее близок к тому, чего я пытаюсь достичь:

https://stackoverflow.com/a/43109689/1194415

Это в основном extends ThreadPoolExecutor в сочетании сSemaphore замок.

0 голосов
/ 16 мая 2018
  1. Считать размер файла. (Метод File.length ()) и разделите его на количество потоков.
  2. Используйте RandomAccessFile для поиска любых символов новой строки, предшествующих указателям, найденным в @ 1. https://docs.oracle.com/javase/7/docs/api/java/io/RandomAccessFile.html
  3. Отправка каждому потоку новых индексов / смещений + RandomAccessFile с доступом для чтения к каждому.
  4. Подкласс InputStream для создания нового InputStream поверх RandomAccessFile и начала чтения.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...