Я хочу прочитать большой файл, обработать каждую строку и вставить результаты в базу данных.
Моя цель - распараллелить обработку строк, так как каждый процесс - длительная задача. Поэтому я хочу, чтобы один поток продолжал читать, несколько потоков продолжал обрабатывать, а один поток продолжал вставлять куски в дб.
Я разбил это следующим образом:
1) последовательное (простое) чтение файла строка за строкой
2) отправлять каждую строку в пул потоков (3 потока), так как обработка является длительной задачей. заблокировать дальнейшее чтение строки, пока пул потоков занят.
3) записать каждую обработанную строку из каждого theadpool в StringBuffer
4) контролировать размер буфера и записывать результаты в виде фрагментов в базу данных (например, каждые 1000 записей)
ExecutorService executor = Executors.newFixedThreadPool(3);
StringBuffer sb = new StringBuffer();
String line;
AtomicInteger count = new AtomicInteger(0);
while ((line = reader.read()) != null) {
count.getAndIncrement();
Future<String> future = executor.submit(() -> {
return processor.process(line);
});
//PROBLEM: this blocks until the future returns
sb.append(future.get());
if (count.get() == 100) {
bufferChunk = sb;
count = new AtomicInteger(0);
sb = new StringBuffer();
databaseService.batchInsert(bufferChunk.toString());
}
}
Проблемы:
future.get()
всегда будет блокировать читателя до тех пор, пока одно будущее не вернет результат
буфер "мониторинг", вероятно, не правильно
Возможно, я делаю это неправильно. Но как мне этого добиться?
Sidenote: размер файла составляет около 10 ГБ, поэтому я не могу сначала прочитать весь файл в память для подготовки параллельных задач.