простая модель босс-работник с использованием pthreads - PullRequest
1 голос
/ 23 августа 2010

Я программист-любитель, который экспериментирует с использованием pthreads, чтобы понять, в какой степени многопоточная программа может привести к повышению эффективности в довольно длительных вычислениях, над которыми я работаю.Вычисление выполняется через объект std :: list , удаляет первый элемент списка и обрабатывает его в потоке, который что-то вычисляет с ним.Программа отслеживает активные потоки и гарантирует, что всегда работает определенное количество активных потоков.Как только список пуст, программа сортирует полученные данные, выводит файл данных и завершает работу.

Многопоточная версия программы в настоящее время не работает.Он получает 20, 40 или 200 или около того элементов в списке (в зависимости от того, какой список я ему предоставляю) и segfaults.Кажется, что segfault происходит в определенных элементах списка, то есть они не выглядят случайными в любом случае.

НО странно то, что, если я скомпилирую с символами отладки и запусту программу через gdb, программа не перестанет работать.Работает отлично.Медленно, конечно, но он работает и делает все так, как я ожидаю.

Поработав некоторое время с предложениями каждого, используя (среди прочего) инструменты Valgrind, чтобы попытаться разобраться в том, что происходит.Я заметил, что приведенный ниже упрощенный код (без каких-либо вызовов вне библиотеки std или библиотеки pthread) создает проблемы для helgrind, и это, вероятно, источник моих проблем.Так что вот только упрощенный код и жалобы Хелгринда.

#include <cstdlib>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <list>
#include <iostream>
#include <signal.h>
#include <sys/select.h>

struct thread_detail {
 pthread_t *threadID; 
 unsigned long num;
};

pthread_mutex_t coutLock;

void *ThreadToSpawn(void *threadarg)
{
   struct thread_detail *my_data;
   my_data = (struct thread_detail *) threadarg;
   int taskid = my_data->num;

   struct timeval timeout;
   for (unsigned long i=0; i < 10; i++)
    { 
     timeout.tv_sec = 0;  timeout.tv_usec = 500000; // half-second 
     select( 0, NULL, NULL, NULL, & timeout );
     pthread_mutex_lock(&coutLock);
     std::cout << taskid << " "; std::cout.flush();
     pthread_mutex_unlock(&coutLock);
    }
   pthread_exit(NULL);
}


int main (int argc, char *argv[])
{
  unsigned long comp_DONE=0; 
  unsigned long comp_START=0;
  unsigned long ms_LAG=10000; // microsecond lag between polling of threads

  // set-up the mutexes
  pthread_mutex_init( &coutLock, NULL );

  if (argc != 3) { std::cout << "Program requires two arguments: (1) number of threads to use,"
                               " and (2) tasks to accomplish. \n"; exit(1); }
  unsigned long NUM_THREADS(atoi( argv[1] ));
  unsigned long comp_TODO(atoi(argv[2]));
  std::cout << "Program will have " << NUM_THREADS << " threads. \n";
  std::list < thread_detail > thread_table;

   while (comp_DONE != comp_TODO) // main loop to set-up and track threads
    {
     // poll stack of computations to see if any have finished, 
     // extract data and remove completed ones from stack
     std::list < thread_detail >::iterator i(thread_table.begin());
     while (i!=thread_table.end())
      {
       if (pthread_kill(*i->threadID,0)!=0) // thread is dead
        { // if there was relevant info in *i we'd extract it here
         if (pthread_join(*i->threadID, NULL)!=0) { std::cout << "Thread join error!\n"; exit(1); }
         pthread_mutex_lock(&coutLock);
         std::cout << i->num << " done. "; std::cout.flush();
         pthread_mutex_unlock(&coutLock);
         delete i->threadID;
         thread_table.erase(i++);  
         comp_DONE++;
        }
       else (i++);
      }
     // if list not full, toss another on the pile
     while ( (thread_table.size() < NUM_THREADS) && (comp_TODO > comp_START) )
      {
        pthread_t *tId( new pthread_t );
        thread_detail Y; Y.threadID=tId; Y.num=comp_START;
        thread_table.push_back(Y);
        int rc( pthread_create( tId, NULL, ThreadToSpawn, (void *)(&(thread_table.back() )) ) );
        if (rc) { printf("ERROR; return code from pthread_create() is %d\n", rc); exit(-1); }
        pthread_mutex_lock(&coutLock);
       std::cout << comp_START << " start. "; std::cout.flush();
        pthread_mutex_unlock(&coutLock);
        comp_START++;
      }

     // wait a specified amount of time
     struct timeval timeout;
     timeout.tv_sec = 0;  timeout.tv_usec = ms_LAG; 
     select( 0, NULL, NULL, NULL, & timeout );
    } // the big while loop

   pthread_exit(NULL);
}

Вывод Helgrind


==2849== Helgrind, a thread error detector
==2849== Copyright (C) 2007-2009, and GNU GPL'd, by OpenWorks LLP et al.
==2849== Using Valgrind-3.6.0.SVN-Debian and LibVEX; rerun with -h for copyright info
==2849== Command: ./thread2 2 6
==2849== 
Program will have 2 threads. 
==2849== Thread #2 was created
==2849==    at 0x64276BE: clone (clone.S:77)
==2849==    by 0x555E172: pthread_create@@GLIBC_2.2.5 (createthread.c:75)
==2849==    by 0x4C2D42C: pthread_create_WRK (hg_intercepts.c:230)
==2849==    by 0x4C2D4CF: pthread_create@* (hg_intercepts.c:257)
==2849==    by 0x401374: main (in /home/rybu/prog/regina/exercise/thread2)
==2849== 
==2849== Thread #1 is the program's root thread
==2849== 
==2849== Possible data race during write of size 8 at 0x7feffffe0 by thread #2
==2849==    at 0x4C2D54C: mythread_wrapper (hg_intercepts.c:200)
==2849==  This conflicts with a previous read of size 8 by thread #1
==2849==    at 0x4C2D440: pthread_create_WRK (hg_intercepts.c:235)
==2849==    by 0x4C2D4CF: pthread_create@* (hg_intercepts.c:257)
==2849==    by 0x401374: main (in /home/rybu/prog/regina/exercise/thread2)
==2849== 
 [0 start.]  [1 start.] 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1  [0 done.]  [1 done.]  [2 start.]  [3 start.] 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3  [2 done.]  [3 done.]  [4 start.]  [5 start.] 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5  [4 done.]  [5 done.] ==2849== 
==2849== For counts of detected and suppressed errors, rerun with: -v
==2849== Use --history-level=approx or =none to gain increased speed, at
==2849== the cost of reduced accuracy of conflicting-access information
==2849== ERROR SUMMARY: 6 errors from 1 contexts (suppressed: 675 from 37)

Возможно, я неправильно использую pthreads, но мне не очень понятно, что я делаю неправильно.Более того, я не уверен, что нужно сделать из вывода helgrind.Ранее helgrind жаловался, потому что я не вызывал pthread_join для потоков, которые по другим причинам знали, что код мертв.Добавление pthread_join позаботилось об этих жалобах.

Чтение различных обучающих программ pthread в режиме онлайн. Я обнаружил, что, вероятно, бессмысленно продолжать создавать и уничтожать потоки, как в приведенном выше коде.Вероятно, более эффективно иметь N потоков, работающих одновременно, и использовать мьютексы и разделяемую память для передачи данных между потоком «BOSS» и потоками «WORKER», уничтожая только потоки WORKER один раз в конце программы.Так что это то, что мне в конечном итоге придется скорректировать, но есть ли что-то явно не так с приведенным выше кодом?

Редактировать: некоторые ключевые слова я замечаю все чаще и чаще.Терминология для того, что я пытаюсь создать, по-видимому, пул потоков .Кроме того, существуют различные предложения для стандартных реализаций этого, например, в библиотеке boost есть boost :: threadpool, boost :: task, boost :: thread.Некоторые из них кажутся только предложениями.Я сталкиваюсь с темами, где люди упоминают , вы можете комбинировать ASIO и boost :: thread , чтобы выполнить то, что я ищу.Точно так же есть класс очереди сообщений.

Хм, похоже, я нацарапываю на поверхность темы, о которых многие думают в наше время, но это кажется зародышевым, как ООП в 1989 году или что-то в этом роде.

Ответы [ 3 ]

2 голосов
/ 23 августа 2010

Попробуйте включить дампы ядра (ulimit -c unlimited), затем запустите вашу программу без GDB. В случае сбоя он должен оставить основной файл, который затем можно открыть с помощью gdb и начать расследование (gdb <executable-file> <core-file>).

1 голос
/ 23 августа 2010

Вы уверены, что это полный код?Я не вижу, где вы создаете потоки или откуда вызывается BuildKCData.

У вас должен быть барьер памяти после pthread_kill (), хотя я сомневаюсь, что это имеет значение в этом случае.

РЕДАКТИРОВАТЬ: Вы путаете выполнение по порядку и согласованность кэша.

Согласованность кэша: x86 (в настоящее время) гарантирует, что выровненный 4-обращения к байту являются атомарными, поэтому a[0]=123 в потоке A и a[1]=456 в потоке B будут работать - поток C в конечном итоге увидит «123,456».Существуют различные протоколы согласованности кэша, но я полагаю, что это примерно блокировка MRSW.

Выполнение вне порядка: x86 не гарантирует упорядочение операций чтения (и, возможно, записи;была дискуссия о том, нужен ли sfence в ядре Linux).Это позволяет процессору выполнять предварительную выборку данных более эффективно, но это означает, что a[0]=123,a[1] в потоке A и a[1]=456,a[0] в потоке B могут возвращать 0, поскольку выборка из [1] может происходить до загрузки [0].Есть два основных способа исправить это:

  • Доступ к общим данным только тогда, когда вы удерживаете блокировку.В частности, не читайте общие данные вне блокировки.Означает ли это, что блокировка для каждой записи или блокировка для всего массива зависит от вас, и как вы думаете, как будет выглядеть конфликт блокировок (совет: он обычно не очень большой).
  • Вставьте памятьбарьер между вещами, которые должны быть в порядке.Это трудно понять (pthread даже не имеет барьеров памяти; pthread_barrier больше похоже на точку синхронизации).

Хотя барьеры памяти - недавняя тенденция, блокировка - далеко проще понять ( Я держу замок, поэтому больше никто не может изменять данные у меня под ногами ).Барьеры памяти все время бушуют в некоторых кругах, но многое еще нужно сделать правильно ( Я надеюсь, что это чтение атомарное , Я надеюсь, что другие потоки пишут атомарно , Я надеюсьдругие потоки используют барьер и о да, мне тоже нужно использовать барьер ).

И если блокировка слишком медленная, уменьшение конкуренции будет гораздо более эффективным, чем замена блокировокс барьерами и надеясь, что вы правильно поняли.

1 голос
/ 23 августа 2010

Что касается top, сколько потоков вы используете? Я не вижу данных в моем верхнем выводе, но видел всплывающее окно виртуального столбца при использовании потоков. Мое понимание (и, возможно, я должен спросить, чтобы быть уверенным), что каждый поток имеет свое собственное пространство памяти, которое он может потенциально использовать. Эта память фактически не используется, она просто доступна при необходимости, поэтому это число может быть достаточно большим, не вызывая проблем. Само по себе воспоминание, вероятно, не катастрофическое. Вы должны увидеть, линейно ли масштабируется использование данных с количеством потоков, которые вы используете.

Относительно GDB. Как вы заметили, GDB не исправит ваш код, хотя может повредить ваши ошибки, если вы повреждаете память. Если повреждение происходит в области, в которую вы не возвращаетесь или которую вы уже освободили и никогда не пытаетесь повторно использовать симптомы ваших проблем, они исчезнут. Уходите, пока вам не понадобится продемонстрировать или использовать свой код в какой-то критической области.

Кроме того, вы захотите взглянуть на helgrind , часть valgrind. Если у вас проблема с блокировкой, то это хлеб с маслом:

Helgrind - это инструмент Valgrind для обнаружения ошибок синхронизации в программах на C, C ++ и Fortran, использующих потоковые примитивы POSIX pthreads.

Просто сделай:

valgrind --tool=helgrind {your program}
...