Случайно бесконечный поток и различные ошибки, в то время как параллельное построчное чтение / запись - PullRequest
0 голосов
/ 11 июля 2010

Я хочу реализовать параллельное чтение-обработка-запись построчно на основе boost :: thread, но текущая версия имеет неопределенное поведение: следующий тест читает файл CSV, заполняя очередь чтения (одновременную), которая просто переносится в очередь записи для записи в выходной файл (пока не обрабатывается).

Возникли проблемы:

  • Как в Windows, так и в Unix, программа случайным образом никогда не заканчивается (~ 3/5 раз) и генерирует SIGSEGV (~ 1/100)
  • В Unix существует много ошибок: SIGABRT при создании потока, «память, забитая перед выделенным блоком» (также -> SIGABRT) после создания, произвольно между 1 и ~ 15 строками.

Я НЕНАВИЖУ давать проблемы и коды и позволять другим отвечать (я иногда на вашей стороне темы), но, поверьте мне, я не могу придумать что-либо еще, чтобы исправить это (работа с потоками, отладка кошмар), поэтому заранее извиняюсь. Вот оно:

Main.cpp:

#include "io.hpp"

#include <iostream>

int main(int argc, char *argv[]) {
    CSV::Reader reader;
    CSV::Writer writer;

    if(reader.open("test_grandeur_nature.csv") && writer.open("output.txt")) {
        CSV::Row row;

        reader.run(); //Reads the CSV file and fills the read queue
        writer.run(); //Reads the to-be-written queue and writes it to a txt file

        //The loop is supposed to end only if the reader is finished and empty
        while(!(reader.is_finished() && reader.empty())) {
            //Transfers line by line from the read to the to-be-written queues
            reader.wait_and_pop(row);
            writer.push(row);
        }
        //The reader will likely finish before the writer, so he has to finish his queue before continuing.
        writer.finish(); 
    }
    else {
        std::cout << "File error";
    }

    return EXIT_SUCCESS;
}

Io.hpp:

#ifndef IO_H_INCLUDED
#define IO_H_INCLUDED

#include "threads.hpp"

#include <fstream>

namespace CSV {
    class Row {
        std::vector<std::string> m_data;

        friend class Iterator;
        friend void write_row(Row const &row, std::ostream &stream);

        void read_next(std::istream& csv);

        public:
            inline std::string const& operator[](std::size_t index) const {
                return m_data[index];
            }
            inline std::size_t size() const {
                return m_data.size();
            }
    };

    /** Reading *************************************************************************/

    class Iterator {
        public:
            Iterator(std::istream& csv) : m_csv(csv.good() ? &csv : NULL) {
                ++(*this);
            }
            Iterator() : m_csv(NULL) {}

            //Pre-Increment
            Iterator& operator++() {
                if (m_csv != NULL) {
                    m_row.read_next(*m_csv);
                    m_csv = m_csv->good() ? m_csv : NULL;
                }

                return *this;
            }
            inline Row const& operator*() const {
                return m_row;
            }

            inline bool operator==(Iterator const& rhs) {
                return ((this == &rhs) || ((this->m_csv == NULL) && (rhs.m_csv == NULL)));
            }
            inline bool operator!=(Iterator const& rhs) {
                return !((*this) == rhs);
            }
        private:
            std::istream* m_csv;
            Row m_row;
    };

    class Reader : public Concurrent_queue<Row>, public Thread {
        std::ifstream m_csv;

        Thread_safe_value<bool> m_finished;

        void work() {
            if(!!m_csv) {
                for(Iterator it(m_csv) ; it != Iterator() ; ++it) {
                    push(*it);
                }
                m_finished.set(true);
            }
        }

    public:
        Reader() {
            m_finished.set(false);
        }

        inline bool open(std::string path) {
            m_csv.open(path.c_str());

            return !!m_csv;
        }

        inline bool is_finished() {
            return m_finished.get();
        }
    };

    /** Writing ***************************************************************************/

    void write_row(Row const &row, std::ostream &stream);

    //Is m_finishing really thread-safe ? By the way, is it mandatory ?
    class Writer : public Concurrent_queue<Row>, public Thread {
        std::ofstream m_csv;

        Thread_safe_value<bool> m_finishing;

        void work() {
            if(!!m_csv) {
                CSV::Row row;

                while(!(m_finishing.get() && empty())) {
                    wait_and_pop(row);
                    write_row(row, m_csv);
                }
            }
        }

    public:
        Writer() {
            m_finishing.set(false);
        }

        inline void finish() {
            m_finishing.set(true);
            catch_up();
        }

        inline bool open(std::string path) {
            m_csv.open(path.c_str());

            return !!m_csv;
        }
    };
}

#endif

Io.cpp:

#include "io.hpp"

#include <boost/bind.hpp>
#include <boost/tokenizer.hpp>

void CSV::Row::read_next(std::istream& csv) {
    std::string row;
    std::getline(csv, row);

    boost::tokenizer<boost::escaped_list_separator<char> > tokenizer(row, boost::escaped_list_separator<char>('\\', ';', '\"'));
    m_data.assign(tokenizer.begin(), tokenizer.end());
}

void CSV::write_row(Row const &row, std::ostream &stream) {
    std::copy(row.m_data.begin(), row.m_data.end(), std::ostream_iterator<std::string>(stream, ";"));
    stream << std::endl;
}

Threads.hpp:

#ifndef THREADS_HPP_INCLUDED
#define THREADS_HPP_INCLUDED

#include <boost/bind.hpp>
#include <boost/thread.hpp>

class Thread {
protected:
    boost::thread *m_thread;

    virtual void work() = 0;

    void do_work() {
        work();
    }

public:
    Thread() : m_thread(NULL) {}
    virtual ~Thread() {
        catch_up();
        if(m_thread != NULL) {
            delete m_thread;
        }
    }

    inline void catch_up() {
        if(m_thread != NULL) {
            m_thread->join();
        }
    }

    void run() {
        m_thread = new boost::thread(boost::bind(&Thread::do_work, boost::ref(*this)));
    }
};

/** Thread-safe datas **********************************************************/

#include <queue>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>

template <class T>
class Thread_safe_value : public boost::noncopyable {
    T m_value;
    boost::mutex m_mutex;

    public:
        T const &get() {
            boost::mutex::scoped_lock lock(m_mutex);
            return m_value;
        }
        void set(T const &value) {
            boost::mutex::scoped_lock lock(m_mutex);
            m_value = value;
        }
};

template<typename Data>
class Concurrent_queue {
    std::queue<Data> m_queue;
    mutable boost::mutex m_mutex;
    boost::condition_variable m_cond;

public:
    void push(Data const& data) {
        boost::mutex::scoped_lock lock(m_mutex);
        m_queue.push(data);
        lock.unlock();
        m_cond.notify_one();
    }

    bool empty() const {
        boost::mutex::scoped_lock lock(m_mutex);
        return m_queue.empty();
    }

    void wait_and_pop(Data& popped) {
        boost::mutex::scoped_lock lock(m_mutex);
        while(m_queue.empty()) {
            m_cond.wait(lock);
        }

        popped = m_queue.front();
        m_queue.pop();
    }
};

#endif // THREAD_HPP_INCLUDED

Этот проект важен, и я был бы очень признателен, если бы вы могли мне помочь =)

Заранее спасибо.

С уважением,

Мистер Мистер.

Ответы [ 2 ]

1 голос
/ 12 июля 2010

В вашей логике завершения есть ошибка.

Цикл main() читает последнюю запись из очереди и блокирует ожидание следующей записи до того, как установлен флаг m_finished.

Если вы вставите изрядное ожидание перед вызовом m_finished.set(true) (например, sleep(5) в Linux или Sleep(5000) в Windows в течение 5 секунд ожидания), тогда ваш код будет зависать каждый раз.

(Это не относится к ошибкам сегмента или ошибкам выделения памяти, что, вероятно, является чем-то другим)

Проблемное выполнение выглядит так:

  1. поток чтения читает последний элемент из файла и помещает в очередь.
  2. основной поток извлекает последний элемент из очереди.
  3. основной поток помещает последний элемент в очередь для потока записи.
  4. круглая основная нить; m_finished не установлен, поэтому он вызывает wait_and_pop.
  5. поток чтения понимает, что он находится в конце файла и устанавливает m_finished.
  6. основной поток теперь заблокирован в ожидании другого элемента в очереди считывателя, но считыватель не предоставит его.

Спящий вызов вызывает этот порядок событий, помещая большую задержку между шагами 1 и 5 в потоке считывателя, поэтому основной поток имеет много возможностей для выполнения шагов 2-4. Это полезная техника отладки для условий гонки.

0 голосов
/ 11 июля 2010

После быстрого прочтения единственная очевидная проблема, которую я обнаружил, которая, вероятно, объясняет некоторые (но, возможно, не все) ваши проблемы, заключается в том, что вы неправильно сигнализируете о своем состоянии в Concurrent_queue::push.

Каждый раз, когда вы обнаруживаете, что звоните unlock() на мьютекс с ограничением, это должно указывать на то, что что-то не так. Одним из основных моментов использования мьютексов с заданной областью является то, что блокировки и разблокировки неявны, когда объект входит / выходит из области видимости. Если вам понадобится разблокировка, возможно, вам придется реструктурировать код.

Однако в этом случае вам фактически не нужно реструктурировать код. В этом случае разблокировка просто неверна. Когда условие сигнализируется, мьютекс должен быть заблокирован. Разблокируется после появления сигнала. Таким образом, вы можете заменить разблокировку с этим кодом:

void push(Data const& data) {
    boost::mutex::scoped_lock lock(m_mutex);
    m_queue.push(data);
    m_cond.notify_one();
}

Который разблокирует мьютекс при возврате функции после того, как условие было сигнализировано.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...