Я написал многопоточное приложение для разбора лог-файлов. Он основан на примере буфера мьютекса из http://drdobbs.com/cpp/184401518?pgno=5.
Идея состоит в том, чтобы иметь класс буфера, который имеет функции для помещения элементов в буфер и извлечения элементов из него. Синхронизация потоков чтения и записи обрабатывается с использованием условий. Пока буфер не заполнен, новые элементы записываются в буфер, а пока он не пуст, элементы считываются из него. В противном случае потоки будут ждать.
В этом примере для обработки используется фиксированное количество элементов, поэтому я изменил поток чтения для запуска, пока есть входные данные из файла, а потоки обработки запускаются, когда есть входные данные или когда в буфере остались элементы.
Моя проблема в том, что если я использую 1 поток чтения и 1 поток обработки, все в порядке и стабильно. Когда я добавляю еще один поток обработки, это значительно повышает производительность и остается стабильным даже после 10.000 тестов.
Теперь, когда я добавляю другой процессорный поток (1 чтение, 3 обработки), программа, кажется, периодически зависает (тупик?) (Но не каждый раз) и либо ожидает заполнения буфера, либо становится пустым.
Почему два потока, выполняющие одно и то же, синхронизируют стабильно, а три из них перестают работать?
Я новичок в C ++, поэтому, возможно, кто-нибудь из вас, более опытных программистов, знает, что может вызвать такое поведение?
Вот мой код:
Класс буфера:
#include "StringBuffer.h"
void StringBuffer::put(string str)
{
scoped_lock lock(mutex);
if (full == BUF_SIZE)
{
{
//boost::mutex::scoped_lock lock(io_mutex);
//std::cout << "Buffer is full. Waiting..." << std::endl;
}
while (full == BUF_SIZE)
cond.wait(lock);
}
str_buffer[p] = str;
p = (p+1) % BUF_SIZE;
++full;
cond.notify_one();
}
string StringBuffer::get()
{
scoped_lock lk(mutex);
if (full == 0)
{
{
//boost::mutex::scoped_lock lock(io_mutex);
//std::cout << "Buffer is empty. Waiting..." << std::endl;
}
while (full == 0)
cond.wait(lk);
}
string test = str_buffer[c];
c = (c+1) % BUF_SIZE;
--full;
cond.notify_one();
return test;
}
и вот главное:
Parser p;
StringBuffer buf;
Report report;
string transfer;
ifstream input;
vector <boost::regex> regs;
int proc_count = 0;
int push_count = 0;
bool pusher_done = false;
// Show filter configuration and init report by dimensioning counter vectors
void setup_report() {
for (int k = 0; k < p.filters(); k++) {
std::cout << "SID(NUM):" << k << " Name(TXT):\"" << p.name_at(k) << "\"" << " Filter(REG):\"" << p.filter_at(k) << "\"" << endl;
regs.push_back(boost::regex(p.filter_at(k)));
report.hits_filters.push_back(0);
report.names.push_back(p.name_at(k));
report.filters.push_back(p.filter_at(k));
}
}
// Read strings from sourcefiles and put them into buffer
void pusher() {
// as long as another string could be red, ...
while (input) {
// put it into buffer
buf.put(transfer);
// and get another string from source file
getline(input, transfer);
push_count++;
}
pusher_done = true;
}
// Get strings from buffer and check RegEx filters. Pass matches to report
void processor()
{
while (!pusher_done || buf.get_rest()) {
string n = buf.get();
for (unsigned sid = 0; sid < regs.size(); sid++) {
if (boost::regex_search(n, regs[sid])) report.report_hit(sid);
}
boost::mutex::scoped_lock lk(buf.count_mutex);
{
proc_count++;
}
}
}
int main(int argc, const char* argv[], char* envp[])
{
if (argc == 3)
{
// first add sourcefile from argv[1] filepath, ...
p.addSource(argv[1]);
std::cout << "Source File: *** Ok\n";
// then read configuration from argv[2] filepath, ...
p.readPipes(envp, argv[2]);
std::cout << "Configuration: *** Ok\n\n";
// and setup the Report Object.
setup_report();
// For all sourcefiles that have been parsed, ...
for (int i = 0; i < p.sources(); i++) {
input.close();
input.clear();
// open the sourcefile in a filestream.
input.open(p.source_at(i).c_str());
// check if file exist, otherwise throw error and exit
if (!input)
{
std::cout << "\nError! File not found: " << p.source_at(i);
exit(1);
}
// get start time
std::cout << "\n- started: ";
ptime start(second_clock::local_time());
cout << start << endl;
// read a first string into transfer to get the loops going
getline(input, transfer);
// create threads and pass a reference to functions
boost::thread push1(&pusher);
boost::thread proc1(&processor);
boost::thread proc2(&processor);
// start all the threads and wait for them to complete.
push1.join();
proc1.join();
proc2.join();
// calculate and output runtime and lines per second
ptime end(second_clock::local_time());
time_duration runtime = end - start;
std::cout << "- finished: " << ptime(second_clock::local_time()) << endl;
cout << "- processed lines: " << push_count << endl;
cout << "- runtime: " << to_simple_string(runtime) << endl;
float processed = push_count;
float lines_per_second = processed/runtime.total_seconds();
cout << "- lines per second: " << lines_per_second << endl;
// write report to file
report.create_filereport(); // after all threads finished write reported data to file
cout << "\nReport saved as: ./report.log\n\nBye!" << endl;
}
}
else std::cout << "Usage: \"./Speed-Extract [source][config]\"\n\n";
return 0;
}
Редактировать 1:
Большое спасибо за вашу помощь. Добавив несколько счетчиков и идентификаторов потоков к выводу, я выяснил, в чем проблема:
Я заметил, что несколько потоков могут оставаться в ожидании заполнения буфера.
Мои потоки обработки выполняются, когда остаются новые строки исходного кода, которые еще не были прочитаны, ИЛИ пока буфер не пуст. Это не хорошо.
Скажем, у меня 2 потока, ожидающих заполнения буфера. Как только читатель читает новую строку (возможно, последние несколько строк), существует 6 других потоков, которые пытаются получить эту строку (и) и заблокировать элемент, чтобы у 2 ожидающих потоков, возможно, даже не было возможности попытаться разблокировать его .
Как только они проверяют, что строка занята другим потоком, они продолжают ждать. Поток чтения не уведомляет их, когда достигает eof, а затем останавливается. Обе ожидающие потоки ждут вечно.
Функция My Reading дополнительно должна уведомлять все потоки, что она достигла eof, поэтому потоки должны оставаться в ожидании, только если буфер пуст и файл не EOF.