Итак, у меня большой текстовый файл, в данном случае это примерно 4,5 ГБ, и мне нужно обработать весь файл как можно быстрее.Прямо сейчас у меня есть многопоточность, используя 3 потока (не включая основной поток).Поток ввода для чтения входного файла, поток обработки для обработки данных и поток вывода для вывода обработанных данных в файл.
В настоящее время узким местом является секция обработки.Поэтому я хотел бы добавить больше потоков обработки в смесь.Однако это создает ситуацию, когда несколько потоков обращаются к одному и тому же BlockingQueue, и поэтому их результаты не поддерживают порядок входного файла.
Примером нужной мне функциональности может быть что-то вроде этого: Входной файл: 1, 2, 3, 4, 5 Выходной файл: ^ то же самое.Не 2, 1, 4, 3, 5 или любая другая комбинация.
Я написал фиктивную программу, которая по функциональности идентична реальной программе, за исключением части обработки (я не могу дать вамфактическая программа из-за класса обработки, содержащего конфиденциальную информациюСледует также отметить, что все классы (Input, Processing и Output) - это все внутренние классы, содержащиеся в главном классе, который содержит метод initialise () и переменные уровня класса, упомянутые в коде основного потока, перечисленном ниже.
Основной поток:
static volatile boolean readerFinished = false; // class level variables
static volatile boolean writerFinished = false;
private void initialise() throws IOException {
BlockingQueue<String> inputQueue = new LinkedBlockingQueue<>(1_000_000);
BlockingQueue<String> outputQueue = new LinkedBlockingQueue<>(1_000_000); // capacity 1 million.
String inputFileName = "test.txt";
String outputFileName = "outputTest.txt";
BufferedReader reader = new BufferedReader(new FileReader(inputFileName));
BufferedWriter writer = new BufferedWriter(new FileWriter(outputFileName));
Thread T1 = new Thread(new Input(reader, inputQueue));
Thread T2 = new Thread(new Processing(inputQueue, outputQueue));
Thread T3 = new Thread(new Output(writer, outputQueue));
T1.start();
T2.start();
T3.start();
while (!writerFinished) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
reader.close();
writer.close();
System.out.println("Exited.");
}
Входной поток: (Пожалуйста, простите закомментированный отладочный код, он использовался для проверки правильности выполнения потока чтения).
class Input implements Runnable {
BufferedReader reader;
BlockingQueue<String> inputQueue;
Input(BufferedReader reader, BlockingQueue<String> inputQueue) {
this.reader = reader;
this.inputQueue = inputQueue;
}
@Override
public void run() {
String poisonPill = "ChH92PU2KYkZUBR";
String line;
//int linesRead = 0;
try {
while ((line = reader.readLine()) != null) {
inputQueue.put(line);
//linesRead++;
/*
if (linesRead == 500_000) {
//batchesRead += 1;
//System.out.println("Batch read");
linesRead = 0;
}
*/
}
inputQueue.put(poisonPill);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
readerFinished = true;
}
}
Обработканить: (Обычно это будет что-то делать со строкой, но для целей макета я только что сразу же сделал это для вывода в поток вывода).При необходимости мы можем смоделировать его, выполняя некоторую работу, заставляя поток спать в течение небольшого промежутка времени для каждой строки.
class Processing implements Runnable {
BlockingQueue<String> inputQueue;
BlockingQueue<String> outputQueue;
Processing(BlockingQueue<String> inputQueue, BlockingQueue<String> outputQueue) {
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
}
@Override
public void run() {
while (true) {
try {
if (inputQueue.isEmpty() && readerFinished) {
break;
}
String line = inputQueue.take();
outputQueue.put(line);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Выходной поток:
class Output implements Runnable {
BufferedWriter writer;
BlockingQueue<String> outputQueue;
Output(BufferedWriter writer, BlockingQueue<String> outputQueue) {
this.writer = writer;
this.outputQueue = outputQueue;
}
@Override
public void run() {
String line;
ArrayList<String> outputList = new ArrayList<>();
while (true) {
try {
line = outputQueue.take();
if (line.equals("ChH92PU2KYkZUBR")) {
for (String outputLine : outputList) {
writer.write(outputLine);
}
System.out.println("Writer finished - executing termination");
writerFinished = true;
break;
}
line += "\n";
outputList.add(line);
if (outputList.size() == 500_000) {
for (String outputLine : outputList) {
writer.write(outputLine);
}
System.out.println("Writer wrote batch");
outputList = new ArrayList<>();
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
}
Так что сейчас общийпоток данных очень линейный, выглядит примерно так:
Input> Processing> Output.
Но я бы хотел получить что-то вроде этого:
Но подвох в том, когда данные попадают ввывод, либо он должен быть отсортирован в правильном порядке, либо он должен быть уже в правильном порядке.
Буду очень признателен за рекомендации или примеры того, как это сделать.
В прошлом я использовал интерфейсы Future и Callable для решения задачи, связанной с параллельными потоками данных, как это, но, к сожалению,этот код не читал из одной очереди, и поэтому здесь минимальная помощь.
Я должен также добавить, что для тех из вас, кто это заметит, batchSize и toxicPill обычно определяются в основном потоке, а затемпередаваемые через переменные, они не обычно жестко запрограммированы, поскольку находятся в коде для потока ввода, а выход проверяет поток записи.Я был немного ленив, когда писал макет для экспериментов в 1 час ночи.
Редактировать: я должен также упомянуть, это требуется для использования Java 8 максимум.Функции Java 9 и выше не могут использоваться из-за того, что эти версии не установлены в средах, в которых будет запускаться эта программа.