Недавно я работал над реализацией алгоритма внешней сортировки слиянием ( Внешняя сортировка ), и в моей реализации требовался многопоточный подход.
Я пытался использовать ForkJoinPool вместо использования более старых реализаций в Java, таких как Thread и ExecutorService.Первый шаг алгоритма требует чтения файла, и каждые x строк собирают и отправляют для сортировки и записи в файл.Это действие (сортировка и сохранение) можно выполнить в отдельном потоке, пока основной поток читает следующий пакет.Я написал метод для этого (см. Ниже).
Меня беспокоит то, что настоящая параллельная работа не начинается, когда я использую ForkJoinPool.commonPool().submit(()->SortAndWriteToFile(lines, fileName))
, а вместо этого, только когда я вызываю task.join()
после циклазакончил.Это будет означать, что в достаточно большом цикле я буду сопоставлять задачи, которые нужно выполнить, но не получу никакого времени на их выполнение.Когда я использовал invoke
вместо submit
, мне кажется, что я не могу контролировать, где будет join
, и не могу гарантировать, что вся работа была проделана до перехода.
Есть ли более правильный способ реализацииэто?
Мой код ниже.Метод и два вспомогательных метода перечислены.Я надеюсь, что это не слишком долго.
protected int generateSortedFiles (String originalFileName, String destinationFilePrefix) {
//Number of accumulated sorted blocks of size blockSize
int blockCount = 0;
//hold bufferSize number of lines from the file
List<String> bufferLines = new ArrayList<String>();
List<ForkJoinTask<?>> taskList = new ArrayList<ForkJoinTask<?>>();
//Open file to read
try (Stream<String> fileStream = Files.lines(Paths.get(originalFileName))) {
//Iterate over BufferSize lines to add them to list.
Iterator<String> lineItr = fileStream.iterator();
while(lineItr.hasNext()) {
//Add bufferSize lines to List
for (int i=0;i<bufferSize;i++) {
if (lineItr.hasNext()) {
bufferLines.add(lineItr.next());
}
}
//submit the task to sort and write to file in a separate thread
String fileName= destinationFilePrefix+blockCount+".csv";
List<String> lines = Collections.unmodifiableList(bufferLines);
taskList.add(ForkJoinPool.commonPool().submit(
()->SortAndWriteToFile(lines, fileName)));
blockCount++;
bufferLines = new ArrayList<String>();
}
} catch (IOException e) {
System.out.println("read from file " +originalFileName + "has failed due to "+e);
} catch (ArrayIndexOutOfBoundsException e) {
System.out.println("the index prodived was not available in the file "
+originalFileName+" and the error is "+e);
}
flushParallelTaskList(taskList);
return blockCount;
}
/**
* This method takes lines, sorts them and writes them to file
* @param lines the lines to be sorted
* @param fileName the filename to write them to
*/
private void SortAndWriteToFile(List<String> lines, String fileName) {
//Sort lines
lines = lines.stream()
.parallel()
.sorted((e1,e2) -> e1.split(",")[indexOfKey].compareTo(e2.split(",")[indexOfKey]))
.collect(Collectors.toList());
//write the sorted block of lines to the destination file.
writeBuffer(lines, fileName);
}
/**
* Wait until all the threads finish, clear the list
* @param writeList
*/
private void flushParallelTaskList (List<ForkJoinTask<?>> writeList) {
for (ForkJoinTask<?> task:writeList) {
task.join();
}
writeList.clear();
}