Мое первое многопоточное приложение - Странное поведение Boost: Threads? - PullRequest
0 голосов
/ 20 марта 2011

Я написал многопоточное приложение для разбора лог-файлов. Он основан на примере буфера мьютекса из http://drdobbs.com/cpp/184401518?pgno=5.

Идея состоит в том, чтобы иметь класс буфера, который имеет функции для помещения элементов в буфер и извлечения элементов из него. Синхронизация потоков чтения и записи обрабатывается с использованием условий. Пока буфер не заполнен, новые элементы записываются в буфер, а пока он не пуст, элементы считываются из него. В противном случае потоки будут ждать.

В этом примере для обработки используется фиксированное количество элементов, поэтому я изменил поток чтения для запуска, пока есть входные данные из файла, а потоки обработки запускаются, когда есть входные данные или когда в буфере остались элементы.

Моя проблема в том, что если я использую 1 поток чтения и 1 поток обработки, все в порядке и стабильно. Когда я добавляю еще один поток обработки, это значительно повышает производительность и остается стабильным даже после 10.000 тестов.

Теперь, когда я добавляю другой процессорный поток (1 чтение, 3 обработки), программа, кажется, периодически зависает (тупик?) (Но не каждый раз) и либо ожидает заполнения буфера, либо становится пустым.

Почему два потока, выполняющие одно и то же, синхронизируют стабильно, а три из них перестают работать?

Я новичок в C ++, поэтому, возможно, кто-нибудь из вас, более опытных программистов, знает, что может вызвать такое поведение?

Вот мой код:

Класс буфера:

#include "StringBuffer.h"

void StringBuffer::put(string str)
  {
    scoped_lock lock(mutex);
    if (full == BUF_SIZE)
    {
      {
        //boost::mutex::scoped_lock lock(io_mutex);
        //std::cout << "Buffer is full. Waiting..." << std::endl;
      }
      while (full == BUF_SIZE)
        cond.wait(lock);
    }
    str_buffer[p] = str;
    p = (p+1) % BUF_SIZE;
    ++full;
    cond.notify_one();
  }

string StringBuffer::get()
  {
    scoped_lock lk(mutex);
    if (full == 0)
    {
      {
        //boost::mutex::scoped_lock lock(io_mutex);
        //std::cout << "Buffer is empty. Waiting..." << std::endl;
      }
      while (full == 0)
        cond.wait(lk);
    }
    string test = str_buffer[c];
    c = (c+1) % BUF_SIZE;
    --full;
    cond.notify_one();
    return test;
  }

и вот главное:

Parser p;
StringBuffer buf;
Report report;
string transfer;
ifstream input;
vector <boost::regex> regs;

int proc_count = 0;
int push_count = 0;
bool pusher_done = false;

//  Show filter configuration and init report by dimensioning counter vectors

void setup_report() {
    for (int k = 0; k < p.filters(); k++) {
        std::cout << "SID(NUM):" << k << "  Name(TXT):\"" << p.name_at(k) << "\"" << "  Filter(REG):\"" << p.filter_at(k) << "\"" << endl;
        regs.push_back(boost::regex(p.filter_at(k)));
        report.hits_filters.push_back(0);
        report.names.push_back(p.name_at(k));
        report.filters.push_back(p.filter_at(k));
    }
}

// Read strings from sourcefiles and put them into buffer

void pusher() {

    // as long as another string could be red, ...

    while (input) {

        // put it into buffer

        buf.put(transfer);

        // and get another string from source file

        getline(input, transfer);
        push_count++;
    }
    pusher_done = true;
}

// Get strings from buffer and check RegEx filters. Pass matches to report

void processor()
{
    while (!pusher_done || buf.get_rest()) {
        string n = buf.get();
        for (unsigned sid = 0; sid < regs.size(); sid++) {
            if (boost::regex_search(n, regs[sid])) report.report_hit(sid);
        }
        boost::mutex::scoped_lock lk(buf.count_mutex);
        {
            proc_count++;
        }
    }
}

int main(int argc, const char* argv[], char* envp[])
{

    if (argc == 3)
    {
        //  first add sourcefile from argv[1] filepath, ...

        p.addSource(argv[1]);
        std::cout << "Source File: *** Ok\n";

        //  then read configuration from argv[2] filepath, ...

        p.readPipes(envp, argv[2]);
        std::cout << "Configuration: *** Ok\n\n";

        // and setup the Report Object.

        setup_report();

        // For all sourcefiles that have been parsed, ...

        for (int i = 0; i < p.sources(); i++) {
            input.close();
            input.clear();

            // open the sourcefile in a filestream.

            input.open(p.source_at(i).c_str());

            // check if file exist, otherwise throw error and exit

            if (!input)
            {
                std::cout << "\nError! File not found: " <<  p.source_at(i);
                exit(1);
            }

            // get start time

            std::cout << "\n- started:  ";
            ptime start(second_clock::local_time());
            cout << start << endl;

            // read a first string into transfer to get the loops going

            getline(input, transfer);

            // create threads and pass a reference to functions

            boost::thread push1(&pusher);
            boost::thread proc1(&processor);
            boost::thread proc2(&processor);


            // start all the threads and wait for them to complete.

            push1.join();
            proc1.join();
            proc2.join();


            // calculate and output runtime and lines per second

            ptime end(second_clock::local_time());
            time_duration runtime = end - start;
            std::cout << "- finished: " << ptime(second_clock::local_time()) << endl;
            cout << "- processed lines: " << push_count << endl;
            cout << "- runtime: " << to_simple_string(runtime) << endl;
            float processed = push_count;
            float lines_per_second = processed/runtime.total_seconds();
            cout << "- lines per second: " << lines_per_second << endl;

            // write report to file

            report.create_filereport();         // after all threads finished write reported data to file
            cout << "\nReport saved as: ./report.log\n\nBye!" << endl;
        }
    }
    else std::cout << "Usage: \"./Speed-Extract [source][config]\"\n\n";
    return 0;
}

Редактировать 1:

Большое спасибо за вашу помощь. Добавив несколько счетчиков и идентификаторов потоков к выводу, я выяснил, в чем проблема:

Я заметил, что несколько потоков могут оставаться в ожидании заполнения буфера.

Мои потоки обработки выполняются, когда остаются новые строки исходного кода, которые еще не были прочитаны, ИЛИ пока буфер не пуст. Это не хорошо.

Скажем, у меня 2 потока, ожидающих заполнения буфера. Как только читатель читает новую строку (возможно, последние несколько строк), существует 6 других потоков, которые пытаются получить эту строку (и) и заблокировать элемент, чтобы у 2 ожидающих потоков, возможно, даже не было возможности попытаться разблокировать его .

Как только они проверяют, что строка занята другим потоком, они продолжают ждать. Поток чтения не уведомляет их, когда достигает eof, а затем останавливается. Обе ожидающие потоки ждут вечно.

Функция My Reading дополнительно должна уведомлять все потоки, что она достигла eof, поэтому потоки должны оставаться в ожидании, только если буфер пуст и файл не EOF.

Ответы [ 2 ]

1 голос
/ 20 марта 2011

Как @Martin я не вижу очевидной проблемы в вашем коде. Единственная идея, которая у меня есть, это то, что вы можете попытаться использовать отдельные условные переменные для записи в буфер и чтения из него. Как и сейчас, каждый раз, когда поток завершает получение элемента, потенциально могут также сигнализироваться другие потоки, которые ожидали в методе get.

Учтите следующее. Буфер заполнен, поэтому устройство записи ожидает сигнала cond. Теперь читатели опустошают очередь, и писатель не получает ни единого сигнала. Это возможно, поскольку они используют одну и ту же переменную условия и становятся более вероятными, чем больше читателей. Каждый раз, когда читатель удаляет элемент из буфера, он вызывает notify_one. Это может разбудить писателя, но также может разбудить читателя. Предположим, случайно все уведомления разбудили читателей. Автор никогда не будет выпущен. В конце все потоки будут ждать сигнала, и у вас возникнет тупик.

Если это правильно, у вас есть два возможных исправления:

  1. Используйте разные сигналы, чтобы не дать читателям «украсть» уведомления, предназначенные для писателя.
  2. Используйте notify_all вместо notify_one, чтобы убедиться, что читатель получает шанс при каждом удалении предмета.
0 голосов
/ 20 марта 2011

На самом деле я не вижу проблемы.

Но следует помнить одну вещь: только то, что поток освобождается из условной переменной с сигналом, не означает, что он начинает работать.

После освобождения он должен получить мьютекс перед продолжением, но каждый раз, когда запланирован запуск потока, кто-то другой может потенциально заблокировать мьютекс (учитывая, насколько плотными являются эти циклы, что не было бы сюрпризом), таким образом, он приостанавливает ожидание следующего планированияслот.Это может быть причиной вашего конфликта.

Проблема в том, что вставка операторов печати в код не поможет, так как операторы печати влияют на время (они дороги), и, таким образом, у вас будет другое поведение.Что-то дешевое, например, подсчет того, какое действие каждого потока может быть достаточно дешевым, чтобы оно не влияло на время, но помогло вам определить проблему. Примечание : вывод результатов только после завершения.

...