Предварительная загрузка данных из файла с использованием отдельного потока - PullRequest
5 голосов
/ 20 августа 2011

У меня есть небольшое приложение, которое обрабатывает большое количество (относительно небольших) файлов. Он запускается последовательно: загружает данные из файла, выполняет над ним операции и переходит к следующему файлу. Я заметил, что во время выполнения загрузка ЦП не составляет 100%, и я предполагаю, что это связано с тем, сколько времени занимают операции ввода-вывода на жестком диске.

Таким образом, идея заключается в том, чтобы загружать следующие данные в память параллельно с обработкой текущих данных, используя отдельный поток (данные, о которых идет речь, будут просто последовательностью int, хранящейся в векторе). Это кажется очень распространенной проблемой, но мне трудно найти простой, простой пример C ++ для этого! И теперь C ++ 0x уже в пути, и будет очень полезен простой демонстрационный код с использованием нового средства потоков без внешней библиотеки.

Кроме того, хотя я знаю, что это зависит от многих вещей, возможно ли сделать обоснованное предположение о преимуществах (или недостатках) такого подхода, например, в отношении размера файла данных для загрузки? Я предполагаю, что в случае больших файлов операции ввода-вывода на диске в любом случае выполняются очень редко, поскольку данные уже буферизированы (с помощью fstream (?))

Olivier

Ответы [ 4 ]

4 голосов
/ 20 августа 2011

Игрушечная программа о том, как использовать некоторые средства потоков и синхронизации C ++ 0x.Понятия не имею, какова эффективность этого (я рекомендую ответ Мэтта), я сосредоточен на ясности и правильности ради создания примера.

Файлы читаются отдельно, как вы и просили.Однако они не преобразуются в последовательность int, так как я чувствую, что это больше связано с обработкой, чем со строгим вводом / выводом.Таким образом, файлы выгружаются в простой std::string.

#include <fstream>
#include <sstream>
#include <string>
#include <vector>
#include <deque>
#include <future>
#include <mutex>
#include <condition_variable>

int
main()
{
    // this is shared
    std::mutex mutex;
    std::condition_variable condition;
    bool more_to_process = true;
    std::deque<std::string> to_process;

    /* Reading the files is done asynchronously */
    std::vector<std::string> filenames = /* initialize */
    auto process = std::async(std::launch::async, [&](std::vector<std::string> filenames)
    {
        typedef std::lock_guard<std::mutex> lock_type;
        for(auto&& filename: filenames) {
            std::ifstream file(filename);
            if(file) {
                std::ostringstream stream;
                stream << file.rdbuf();
                if(stream) {
                    lock_type lock(mutex);
                    to_process.push_back(stream.str());
                    condition.notify_one();
                }
            }
        }
        lock_type lock(mutex);
        more_to_process = false;
        condition.notify_one();
    }, std::move(filenames));

    /* processing is synchronous */
    for(;;) {
        std::string file;
        {
            std::unique_lock<std::mutex> lock(mutex);
            condition.wait(lock, [&]
            { return !more_to_process || !to_process.empty(); });

            if(!more_to_process && to_process.empty())
                break;
            else if(to_process.empty())
                continue;

            file = std::move(to_process.front());
            to_process.pop_front();
        }

        // use file here
    }

    process.get();
}

Некоторые примечания:

  • мьютекс, переменная условия, флаг остановки и контейнер std::string логически связаны между собой,Вы также можете заменить их потокобезопасным контейнером / каналом
  • Я использую std::async вместо std::thread, потому что он имеет лучшие характеристики безопасности исключений
  • нет обработки ошибок дляговорить о;если файл не может быть прочитан по какой-либо причине, он молча пропускается.У вас есть несколько вариантов: сигнализировать о том, что больше не нужно обрабатывать, и обрабатывать как можно быстрее;или используйте boost::variant<std::string, std::exception_ptr>, чтобы передать ошибку на сторону обработки вещей (здесь ошибка передается как исключение, но вы можете использовать error_code или что-то другое).Не исчерпывающий список любыми средствами.
2 голосов
/ 20 августа 2011

Использование многопоточности для такой связанной с IO проблемы даст вам незначительный прирост производительности. Вы можете заполнить некоторые «пробелы» в своем желании насытить доступные ресурсы ввода-вывода, заранее открыв несколько файлов и перекрыв системные вызовы через потоки, как вы указали.

Я бы порекомендовал вам вместо этого взглянуть на подсказки ядру о том, как вы собираетесь делать ввод-вывод, что улучшит скорость чтения и улучшит физическую пропускную способность чтения, например, путем проверки того, что файловая система, ядро ​​и жесткий диск (или каким бы ни был ваш источник хранения) максимально быстро.

1 голос
/ 20 августа 2011

Я бы создал два потока и два буфера:

  • первый, кто читает данные из файла в буферы
  • второй, кто обрабатывает полученные данные

Если файл не помещается в буфер, просто добавьте флаг конца файла. Если второй поток не находит его в конце буфера, он должен прочитать его из второго.

Число и размер буферов и, возможно, количество потоков являются параметрами для оптимизации. Основная идея состоит в том, чтобы позволить контроллеру диска работать непрерывно.

** РЕДАКТИРОВАТЬ **

В идеальном случае у вас есть все время, затрачиваемое на чтение данных с жесткого диска. Однако это зависит от «времени выполнения на порцию» / «времени чтения жесткого диска на порцию», поскольку оно может варьироваться.

0 голосов
/ 20 августа 2011

Так как размер вашего файла относительно меньше и вам приходится иметь дело с количеством файлов, лучшим вариантом будет создание двух потоков:

1. First thread reading and processing only files placed at even number 
in the file listing (ls -l in *nix).
2. Second thread reading the oddly placed file in the listing.

Недостаток:метод, который вы упомянули о «одном потоке, читающем данные в вектор, а другом потоке, читающем из него», состоял бы в том, что вам нужно будет заботиться о расах потоков и предотвращать их, используя переменные мьютекса и условия.

Где, поскольку этот метод не требует какой-либо блокировки [Надеюсь, что нет никакой зависимости между данными между файлами]

Кроме того, более быстрый способ чтения данных из файла состоял бы в двоичном считывании файла в буфер подходящего размера.размер.

Надеюсь, ответ поможет вам.

** РЕДАКТИРОВАНИЕ: **

В соответствии с вашим комментарием, кажется, вам придется идти с одним потоком, считывающим данные в структуру данных очереди [возможно, очередь из буфера символов] и второй потокчтение данных из очереди и их обработка.

Как упоминалось ранее, проблема заключается в чтении и записи из той же очереди, в которой контейнеры STL не являются поточно-ориентированными.

Итак, что я могу порекомендовать здесь, так это управлять вашей общей структурой данных, то есть очереди здесь, используя locaks, и все остальное:

1. Boost Lock free : <a href="http://tim.klingt.org/boost_lockfree/lockfree/introduction.html" rel="nofollow"> Boost lock free </a>
2. Write your own loack free implementation : <a href="http://drdobbs.com/architecture-and-design/210604448" rel="nofollow"> Lock free impl </a>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...