Обрабатывать большой текстовый файл одновременно - PullRequest
4 голосов
/ 10 июня 2019

Итак, у меня большой текстовый файл, в данном случае это примерно 4,5 ГБ, и мне нужно обработать весь файл как можно быстрее.Прямо сейчас у меня есть многопоточность, используя 3 потока (не включая основной поток).Поток ввода для чтения входного файла, поток обработки для обработки данных и поток вывода для вывода обработанных данных в файл.

В настоящее время узким местом является секция обработки.Поэтому я хотел бы добавить больше потоков обработки в смесь.Однако это создает ситуацию, когда несколько потоков обращаются к одному и тому же BlockingQueue, и поэтому их результаты не поддерживают порядок входного файла.

Примером нужной мне функциональности может быть что-то вроде этого: Входной файл: 1, 2, 3, 4, 5 Выходной файл: ^ то же самое.Не 2, 1, 4, 3, 5 или любая другая комбинация.

Я написал фиктивную программу, которая по функциональности идентична реальной программе, за исключением части обработки (я не могу дать вамфактическая программа из-за класса обработки, содержащего конфиденциальную информациюСледует также отметить, что все классы (Input, Processing и Output) - это все внутренние классы, содержащиеся в главном классе, который содержит метод initialise () и переменные уровня класса, упомянутые в коде основного потока, перечисленном ниже.

Основной поток:

static volatile boolean readerFinished = false; // class level variables
static volatile boolean writerFinished = false;

private void initialise() throws IOException {
    BlockingQueue<String> inputQueue = new LinkedBlockingQueue<>(1_000_000);
    BlockingQueue<String> outputQueue = new LinkedBlockingQueue<>(1_000_000); // capacity 1 million. 

    String inputFileName = "test.txt";
    String outputFileName = "outputTest.txt";

    BufferedReader reader = new BufferedReader(new FileReader(inputFileName));
    BufferedWriter writer = new BufferedWriter(new FileWriter(outputFileName));


    Thread T1 = new Thread(new Input(reader, inputQueue));
    Thread T2 = new Thread(new Processing(inputQueue, outputQueue));
    Thread T3 = new Thread(new Output(writer, outputQueue));

    T1.start();
    T2.start();
    T3.start();

    while (!writerFinished) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    reader.close();
    writer.close();

    System.out.println("Exited.");
}

Входной поток: (Пожалуйста, простите закомментированный отладочный код, он использовался для проверки правильности выполнения потока чтения).

class Input implements Runnable {
    BufferedReader reader;
    BlockingQueue<String> inputQueue;

    Input(BufferedReader reader, BlockingQueue<String> inputQueue) {
        this.reader = reader;
        this.inputQueue = inputQueue;
    }

    @Override
    public void run() {
        String poisonPill = "ChH92PU2KYkZUBR";
        String line;
        //int linesRead = 0;

        try {
            while ((line = reader.readLine()) != null) {
                inputQueue.put(line);
                //linesRead++;

                /*
                if (linesRead == 500_000) {
                    //batchesRead += 1;
                    //System.out.println("Batch read");
                    linesRead = 0;
                }
                */
            }

            inputQueue.put(poisonPill);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }

        readerFinished = true;

    }
}

Обработканить: (Обычно это будет что-то делать со строкой, но для целей макета я только что сразу же сделал это для вывода в поток вывода).При необходимости мы можем смоделировать его, выполняя некоторую работу, заставляя поток спать в течение небольшого промежутка времени для каждой строки.

class Processing implements Runnable {
    BlockingQueue<String> inputQueue;
    BlockingQueue<String> outputQueue;

    Processing(BlockingQueue<String> inputQueue, BlockingQueue<String> outputQueue) {
        this.inputQueue = inputQueue;
        this.outputQueue = outputQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                if (inputQueue.isEmpty() && readerFinished) {
                    break;
                }

                String line = inputQueue.take();
                outputQueue.put(line);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Выходной поток:

class Output implements Runnable {
    BufferedWriter writer;
    BlockingQueue<String> outputQueue;

    Output(BufferedWriter writer, BlockingQueue<String> outputQueue) {
        this.writer = writer;
        this.outputQueue = outputQueue;
    }

    @Override
    public void run() {
        String line;
        ArrayList<String> outputList = new ArrayList<>();

        while (true) {
            try {
                line = outputQueue.take();

                if (line.equals("ChH92PU2KYkZUBR")) {
                    for (String outputLine : outputList) {
                        writer.write(outputLine);
                    }
                    System.out.println("Writer finished - executing termination");

                    writerFinished = true;
                    break;
                }

                line += "\n";
                outputList.add(line);

                if (outputList.size() == 500_000) {
                    for (String outputLine : outputList) {
                        writer.write(outputLine);
                    }
                    System.out.println("Writer wrote batch");
                    outputList = new ArrayList<>();
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Так что сейчас общийпоток данных очень линейный, выглядит примерно так:

Input> Processing> Output.

Но я бы хотел получить что-то вроде этого:

Data flow diagram

Но подвох в том, когда данные попадают ввывод, либо он должен быть отсортирован в правильном порядке, либо он должен быть уже в правильном порядке.

Буду очень признателен за рекомендации или примеры того, как это сделать.

В прошлом я использовал интерфейсы Future и Callable для решения задачи, связанной с параллельными потоками данных, как это, но, к сожалению,этот код не читал из одной очереди, и поэтому здесь минимальная помощь.

Я должен также добавить, что для тех из вас, кто это заметит, batchSize и toxicPill обычно определяются в основном потоке, а затемпередаваемые через переменные, они не обычно жестко запрограммированы, поскольку находятся в коде для потока ввода, а выход проверяет поток записи.Я был немного ленив, когда писал макет для экспериментов в 1 час ночи.

Редактировать: я должен также упомянуть, это требуется для использования Java 8 максимум.Функции Java 9 и выше не могут использоваться из-за того, что эти версии не установлены в средах, в которых будет запускаться эта программа.

Ответы [ 3 ]

2 голосов
/ 10 июня 2019

Что вы можете сделать:

  • Возьмите X потоков для обработки, где X - количество ядер, доступных для обработки
  • Дайте каждому потоку свою собственную входную очередь.
  • Поток считывателя предоставляет записи для циклического перебора входной очереди каждого потока.
  • Поскольку выходные файлы слишком велики для памяти, вы пишете X выходных файлов, по одному для каждого потока, и каждыйимя файла содержит индекс потока, так что вы можете восстановить исходный порядок из имен файлов.
  • После завершения процесса вы объединяете выходные файлы X.Одна строка из файла для потока 1, одна из файлов для потока 2 и т. Д. Снова в циклическом порядке.Это восстанавливает исходный порядок.

В качестве дополнительного бонуса, поскольку у вас есть входная очередь на поток, у вас нет конфликта блокировок в очереди между читателями.(только между читателем и писателем) Вы могли бы даже оптимизировать это, помещая вещи в очереди ввода партиями, большими, чем 1.

1 голос
/ 10 июня 2019

Как было предложено Алексеем, вы можете создать OrderedTask:

class OrderedTask implements Comparable<OrderedTask> {

    private final Integer index;
    private final String line;

    public OrderedTask(Integer index, String line) {
        this.index = index;
        this.line = line;
    }


    @Override
    public int compareTo(OrderedTask o) {
        return index < o.getIndex() ? -1 : index == o.getIndex() ? 0 : 1;
    }

    public Integer getIndex() {
        return index;
    }

    public String getLine() {
        return line;
    }    
}

В качестве выходной очереди вы можете использовать свою собственную, поддерживаемую приоритетной очередью:

class OrderedTaskQueue {

    private final ReentrantLock lock;
    private final Condition waitForOrderedItem;
    private final int maxQueuesize;
    private final PriorityQueue<OrderedTask> backedQueue;

    private int expectedIndex;

    public OrderedTaskQueue(int maxQueueSize, int startIndex) {
        this.maxQueuesize = maxQueueSize;
        this.expectedIndex = startIndex;
        this.backedQueue = new PriorityQueue<>(2 * this.maxQueuesize);

        this.lock = new ReentrantLock();
        this.waitForOrderedItem = this.lock.newCondition();
    }


    public boolean put(OrderedTask item) {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (this.backedQueue.size() >= maxQueuesize && item.getIndex() != expectedIndex) {
                this.waitForOrderedItem.await();
            }

            boolean result = this.backedQueue.add(item);
            this.waitForOrderedItem.signalAll();
            return result;
        } catch (InterruptedException e) {
            throw new RuntimeException();
        } finally {
            lock.unlock();
        }
    }


    public OrderedTask take() {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (this.backedQueue.peek() == null || this.backedQueue.peek().getIndex() != expectedIndex) {
                this.waitForOrderedItem.await();
            }
            OrderedTask result = this.backedQueue.poll();
            expectedIndex++;
            this.waitForOrderedItem.signalAll();
            return result;
        } catch (InterruptedException e) {
            throw new RuntimeException();
        } finally {
            lock.unlock();
        }
    }
}

StartIndex - это индекс первой упорядоченной задачи, а maxQueueSize используется для остановки обработки других задач (не для заполнения памяти), когда мы ожидаем завершения какой-либо более ранней задачи.Он должен быть равен двойному / тройному числу потоков обработки, чтобы немедленно не останавливать обработку и не допускать масштабируемости.

Затем необходимо создать задачу:

int indexOrder =0;
            while ((line = reader.readLine()) != null) {
                inputQueue.put(new OrderedTask(indexOrder++,line);                    

            }

Строка за строкойиспользуется только из-за вашего примера.Вы должны изменить OrderedTask для поддержки пакета строк.

0 голосов
/ 10 июня 2019

Почему бы не обратить поток?

  1. Выходной вызов для X пакетов;
  2. Генерирует X обещание / задание (шаблон обещания), который будет случайным образом вызывать одно из процессорных ядер (сохранить номер партии, чтобы перейти к ядру ввода); пакетировать обработчик вызовов в упорядоченный список;
  3. Каждое ядро ​​обработки вызывает пакет во входном ядре;
  4. Наслаждайтесь?
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...