Ошибка сегментации при увеличении размера вектора при параллельной сортировке слиянием - PullRequest
1 голос
/ 05 марта 2020

Когда я пытаюсь запустить программу, используя 2 потока и размер более 4,5 миллионов, это создает ошибку сегментации. Все, что ниже этого числа, проходит гладко. Короче говоря, очень большие числа порождают ошибку, и я не знаю почему. Я хотел бы знать, связана ли ошибка с созданием потока или распределением работы по потокам. Некоторая помощь будет оценена. Ниже приведен код.

#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <ctime>
#include <algorithm>

/* define variables for the problem */
#define UPPER_LIM 10
#define LOWER_LIM  1

using namespace std;

/* function definitions */

int generate_random_number(unsigned int lower_limit, unsigned int upper_limit);
void merge_sort(vector<int>& array, int left, int right);
void merge(vector<int>& array, int left, int middle, int right);
void thread_merge_sort(vector<int>& array, int thread_id, int n, int p, int q);
bool isTest_array_is_in_order(vector<int>& array, int LENGTH);

int main(int argc, const char *argv[]) {
    int NUM_THREADS = atoi(argv[1]);
    int LENGTH = atoi(argv[2]);
    int NUMBERS_PER_THREAD = LENGTH / NUM_THREADS;
    int OFFSET = LENGTH % NUM_THREADS;

    srand(time(0));

    std::vector<int> array;
    array.reserve(LENGTH);

    /* initialize array with random numbers */
    for (int i = 0; i < LENGTH; i++) {
        array.push_back(generate_random_number(LOWER_LIM, UPPER_LIM));
    }

    /* begin timing */
    auto start = std::chrono::high_resolution_clock::now();

    const size_t nthreads = NUM_THREADS; //std::thread::hardware_concurrency();
    {
        // Pre loop
        std::cout << "parallel(" << nthreads << " threads):" << std::endl;
        std::vector<std::thread> workers;

        for (std::size_t t = 0; t < nthreads; t++) {
            workers.push_back(thread(thread_merge_sort, ref(array), t, nthreads, NUMBERS_PER_THREAD, OFFSET));
        }

        for (thread& t: workers) { // await thread termination
            t.join();
        }
    }

    auto elapsed = std::chrono::high_resolution_clock::now() - start;
    auto usec    = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count();

    // and print time
    std::cout << "Spent " << usec << " executing  " << nthreads << " in parallel " << " array size " << LENGTH << std::endl;

    /* end timing */

    /* test to ensure that the array is in sorted order */
    if (!isTest_array_is_in_order(array, LENGTH)) {
        fprintf(stderr, "Error: array is not sorted!!\n");
        return 0;
    }
}

/* generate random numbers within the specified limit */
int generate_random_number(unsigned int lower_limit, unsigned int upper_limit) {
    //srand(time(NULL));
    return lower_limit + (upper_limit - lower_limit) * ((double)rand() / RAND_MAX);
}

/** assigns work to each thread to perform merge sort */
void thread_merge_sort(vector<int> &arr, int thread_id, int NUM_THREADS, int NUMBERS_PER_THREAD, int OFFSET) {
    int left = thread_id * (NUMBERS_PER_THREAD);
    int right = (thread_id + 1) * (NUMBERS_PER_THREAD) - 1;
    if (thread_id == NUM_THREADS - 1) {
        right += OFFSET;
    }
    int middle = left + (right - left) / 2;
    if (left < right) {
        merge_sort(arr, left, right);
        merge_sort(arr, left + 1, right);
        merge(arr, left, middle, right);
    }
}

/* test to ensure that the array is in sorted order */
bool isTest_array_is_in_order(vector<int>& arr, int LENGTH) {
    for (int i = 1; i < LENGTH; i++) {
        if (arr[i] >= arr[i - 1]) {
            return true;
        } else {
            return false;
        }
    }
}

/* perform merge sort */
void merge_sort(vector<int>& arr, int left, int right) {
    if (left < right) {
        int middle = left + (right - left) / 2;
        merge_sort(arr, left, middle);
        merge_sort(arr, middle + 1, right);
        merge(arr, left, middle, right);
    }
}

/* merge function */
void merge(vector<int>& arr, int left, int middle, int right) {
    int i = 0;
    int j = 0;
    int k = 0;
    int left_length = middle - left + 1;
    int right_length = right - middle;
    int left_array[left_length];
    int right_array[right_length];

    /* copy values to left array */
    for (int i = 0; i < left_length; i++) {
        left_array[i] = arr[left + i];
    }

    /* copy values to right array */
    for (int j = 0; j < right_length; j++) {
        right_array[j] = arr[middle + 1 + j];
    }

    i = 0;
    j = 0;
    /** chose from right and left arrays and copy */
    while (i < left_length && j < right_length) {
        if (left_array[i] <= right_array[j]) {
            arr[left + k] = left_array[i];
            i++;
        } else {
            arr[left + k] = right_array[j];
            j++;
        }
        k++;
    }
    /* copy the remaining values to the array */
    while (i < left_length) {
        arr[left + k] = left_array[i];
        k++;
        i++;
    }
    while (j < right_length) {
        arr[left + k] = right_array[j];
        k++;
        j++;
    }
}

Ответы [ 2 ]

0 голосов
/ 08 марта 2020

В вашем коде есть несколько проблем:

  • в merge, вы выделяете подмассивы с автоматическим c хранилищем с объединенным размером, равным размеру сортируемого фрагмента, вызывая переполнение стека для больших массивов.
  • функция isTest_array_is_in_order сравнивает только первые 2 элемента массива.
  • разделение массива на N слайсов для индивидуальной сортировки будет только сделать это: сортировать части массива. Вам нужна заключительная фаза, где вы объединяете фрагменты в полностью отсортированный массив.
  • функция потока thread_merge_sort должна просто вызвать merge_sort() на своем слайсе:

    /** assigns work to each thread to perform merge sort */
    void thread_merge_sort(vector<int> &arr, int thread_id, int NUM_THREADS, int NUMBERS_PER_THREAD, int OFFSET) {
        int left = thread_id * NUMBERS_PER_THREAD;
        int right = left + NUMBERS_PER_THREAD - 1;
        if (thread_id == NUM_THREADS - 1) {
            right += OFFSET;
        }
        merge_sort(arr, left, right);
    }
    
  • указание срезов с включенными left и right подвержено ошибкам: вы должны осторожно корректировать длины, тогда как если right было исключено, код будет гораздо более регулярным, так как длина может быть вычислена напрямую как right - left.

  • границы среза и индексные переменные должны быть напечатаны как size_t, а не int.

Здесь является модифицированной версией, которая использует векторы для временных копий:

#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <ctime>
#include <algorithm>

/* generate random numbers in the range 1..9 */
#define LOWER_LIM  1
#define UPPER_LIM 10

using namespace std;

/* function declarations */
int generate_random_number(unsigned int lower_limit, unsigned int upper_limit);
void merge_sort(vector<int>& array, size_t start, size_t end);
void merge(vector<int>& array, size_t start, size_t middle, size_t end);
void thread_merge_sort(vector<int>& array, size_t start, size_t end);
bool is_vector_sorted(vector<int>& array, size_t length);

int main(int argc, const char *argv[]) {
    const size_t NUM_THREADS = argc > 1 ? strtol(argv[1], NULL, 0) : 1;
    const size_t LENGTH = argc > 2 ? strtol(argv[2], NULL, 0) : 1000;

    std::vector<int> array(LENGTH);

    /* initialize array with random numbers */
    srand(time(0));
    for (size_t i = 0; i < LENGTH; i++) {
        array[i] = generate_random_number(LOWER_LIM, UPPER_LIM);
    }

    const size_t nthreads = NUM_THREADS; //std::thread::hardware_concurrency();
    const size_t NUMBERS_PER_THREAD = LENGTH / nthreads;
    const size_t REMAINDER = LENGTH % nthreads;

    /* Output message before the loop */
    std::cout << "Sorting " << LENGTH << " elements in parallel" <<
        " using " << nthreads << " threads:" << std::endl;

    /* begin timing */
    auto start_clock = std::chrono::high_resolution_clock::now();

    std::vector<std::thread> workers;

    for (size_t t = 0; t < nthreads; t++) {
        size_t start = t * NUMBERS_PER_THREAD;
        size_t end = start + NUMBERS_PER_THREAD;
        if (t == nthreads - 1)
            end += REMAINDER;
        workers.push_back(thread(thread_merge_sort, ref(array), start, end));
    }
    /* await thread termination */
    for (thread& t: workers) {
        t.join();
    }
    /* merge thread slices */
    for (size_t size = NUMBERS_PER_THREAD; size < LENGTH; size += size) {
        for (size_t middle = size; middle < LENGTH; middle += size + size) {
            size_t end = min(middle + size, LENGTH);
            merge(array, middle - size, middle, end);
        }
    }

    /* end timing */
    auto elapsed = std::chrono::high_resolution_clock::now() - start_clock;
    auto usec    = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count();

    /* print time */
    std::cout << "Sorting time: " << usec << " microseconds." << std::endl;

    /* test to ensure that the array is in sorted order */
    if (!is_vector_sorted(array, LENGTH)) {
        fprintf(stderr, "Error: array is not sorted!!\n");
    }
    return 0;
}

/* generate random numbers within the specified limit */
int generate_random_number(unsigned int lower_limit, unsigned int upper_limit) {
    return int(lower_limit + double(upper_limit - lower_limit) * rand() / RAND_MAX);
}

/** assign work to each thread to perform merge sort */
void thread_merge_sort(vector<int> &arr, size_t start, size_t end) {
    merge_sort(arr, start, end);
}

/* test to ensure that the array is in sorted order */
bool is_vector_sorted(vector<int>& arr, size_t length) {
    for (size_t i = 1; i < length; i++) {
        if (arr[i] < arr[i - 1]) {
            return false;
        }
    }
    return true;
}

/* perform merge sort */
void merge_sort(vector<int>& arr, size_t start, size_t end) {
    if (end - start >= 2) {
        size_t middle = start + (end - start) / 2;
        merge_sort(arr, start, middle);
        merge_sort(arr, middle, end);
        merge(arr, start, middle, end);
    }
}

/* merge function */
void merge(vector<int>& arr, size_t start, size_t middle, size_t end) {
    size_t i, j, k;
    size_t left_length = middle - start;
    size_t right_length = end - middle;
    vector<int> left_array(&arr[start], &arr[middle]);
    vector<int> right_array(&arr[middle], &arr[end]);

    i = 0;
    j = 0;
    k = start;
    /** chose from right and left arrays and copy */
    while (i < left_length && j < right_length) {
        if (left_array[i] <= right_array[j]) {
            arr[k++] = left_array[i++];
        } else {
            arr[k++] = right_array[j++];
        }
    }
    /* copy the remaining values to the array */
    while (i < left_length) {
        arr[k++] = left_array[i++];
    }
    while (j < right_length) {
        arr[k++] = right_array[j++];
    }
}

Производительность может быть улучшена примерно на 40% путем сохранения только левой части среза в merge: элементы правой часть фрагмента не перезаписывается, если только она не была скопирована. Вот модифицированная версия:

/* merge function */
void merge(vector<int>& arr, size_t start, size_t middle, size_t end) {
    size_t i, j, k;
    size_t left_length = middle - start;
    /* save the left part of the slice */
    vector<int> left_array(&arr[start], &arr[middle]);

    i = 0;
    j = middle;
    k = start;
    /** chose from right and left arrays and copy */
    while (i < left_length && j < end) {
        if (left_array[i] <= arr[j]) {
            arr[k++] = left_array[i++];
        } else {
            arr[k++] = arr[j++];
        }
    }
    /* copy the remaining values to the array */
    while (i < left_length) {
        arr[k++] = left_array[i++];
    }
}
0 голосов
/ 06 марта 2020

Итак, во-первых, я поставил в тупик ваш merge_sort и объединить, запустив с двумя задачами и 4500000 элементов: без ошибки.

Для полной отладки вы можете предоставить полный код ...

здесь мой компилятор, слегка модифицированный код:

    #include <iostream>
    #include <thread>
    #include <vector>
    #include <chrono>
    #include <ctime>
    #include <cstdint>
    #include <algorithm>

    #include <thread>

    /* define variables for the problem */
    constexpr size_t const UPPER_LIM = 10;
    constexpr size_t const LOWER_LIM =  1;

    using namespace std;

    /* function definitions */

    void merge_sort(vector<int>& array, int left, int right){
      return;
    }

    void merge(vector<int>& array, int left, int middle, int right){
      return;
    }

    bool isTest_array_is_in_order(vector<int>& array, int LENGTH){
      return false;
    }

    /**
     * generate random numbers within the specified limit
     */
    int generate_random_number(unsigned int lower_limit, unsigned int upper_limit){
      return lower_limit + (upper_limit - lower_limit) * ((double)rand() / RAND_MAX);
    }

    /**
     * assigns work to each thread to perform merge sort
     */
    void thread_merge_sort(vector<int> &arr, int thread_id, int NUM_THREADS, int NUMBERS_PER_THREAD, int OFFSET){
      int left = thread_id * (NUMBERS_PER_THREAD);
      int right = (thread_id + 1) * (NUMBERS_PER_THREAD) - 1;
      if (thread_id == NUM_THREADS - 1) {
        right += OFFSET;
      }
      int middle = left + (right - left) / 2;
      if (left < right) {
        merge_sort(arr, left, right);
        merge_sort(arr, left + 1, right);
        merge(arr, left, middle, right);
      }
    }

    int main(int argc, const char * argv[]){

      int const NUM_THREADS        = 2; //atoi (argv[1]);
      int const LENGTH             = 4500000; //atoi(argv[2]);
      int const NUMBERS_PER_THREAD = LENGTH / NUM_THREADS;
      int const OFFSET             = LENGTH % NUM_THREADS;

      cout << sizeof(int) << "\n";

      srand(time(0)) ;

      std::vector<int> array;
      array.reserve(LENGTH);

      /* initialize array with random numbers */
      for(int ii=0; ii < LENGTH; ++ii){
        array.push_back(generate_random_number(LOWER_LIM, UPPER_LIM));
      }

      /* begin timing */
      auto start = std::chrono::high_resolution_clock::now();

      const size_t nthreads = NUM_THREADS; //std::thread::hardware_concurrency();
      {
        // Pre loop
        std::cout<<"parallel ("<<nthreads<<" threads):"<<std::endl;
        std::vector<std::thread> workers;

        for(std::size_t tt=0; tt<nthreads; ++tt){
          workers.push_back(thread(thread_merge_sort, ref(array), tt, nthreads, NUMBERS_PER_THREAD, OFFSET));
        }

        // await thread termination
        for(thread& t: workers) {
          t.join();
        }
      }

      auto elapsed = std::chrono::high_resolution_clock::now() - start;
      auto usec    = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count();

      // and print time
      std::cout << "Spent " << usec << " executing  " << nthreads << " in parallel " << " array size " << array.size() << "\n";

      /* end timing */
      return 0;
    }

[Edit]:

Ваш isTest_array_is_in_order не может работать, так как вы возвращаетесь сразу после сравнения первых двух элементов.

    bool isTest_array_is_in_order(vector<int>& arr, int LENGTH)
    {
        for(int i=1;i<LENGTH;i++){
            if(arr[i]>=arr[i-1]){
                return true;
    }   else{

        return false;
        }

        }
    }

вот версия, которая должна работать:

    /**
     * test to ensure that the array is in sorted order
     */
    bool isTest_array_is_in_order(vector<int>& arr, int LENGTH){
      bool unorderd = false;
      for(int ii=1;ii<LENGTH;++ii){
        if(arr[ii]<arr[ii-1]){
          unorderd = true;
          break;
        }
      }
      return !unorderd;
    }

[Редактировать]:

Итак, сначала по вашему коду я смог подтвердить ваши ошибки segfaults

Я изменил код, и теперь он, кажется, работает довольно хорошо, только что проверил 16 потоков для 44999999 элементов, прекрасно работает

После просмотра вашего кода он вылетел прямо здесь:

    /* merge function */
    void merge(vector<int>& arr, int left, int middle, int right) {
        int i = 0;
        int j = 0;
        int k = 0;
        int left_length = middle - left + 1;
        int right_length = right - middle;
        int left_array[left_length];
        int right_array[right_length];

Здесь вы создаете 2 локальных массива, но в стеке, а не в куче. Как правило, размер стека ограничен в зависимости от вашей ОС некоторым низким МБ, например, 10 или более.

Поэтому я заменил ваши C -массивы векторами и оптимизировал код немного дальше: более сложные типы, изменив основной, так что теперь я могу сортировать разные рандомизированные варианты за один прогон.

так вот мой код:

#include <iostream>
#include <iomanip>
#include <string>
#include <sstream>
#include <cstdint>
#include <thread>
#include <chrono>
#include <ctime>
#include <iterator>
#include <vector>
#include <array>
#include <random>
#include <algorithm>

using namespace std;

using idx_t  = size_t;
using data_t = int;

/* define variables for the problem */
constexpr data_t const UPPER_LIM =  2000000;
constexpr data_t const LOWER_LIM = 0;

constexpr idx_t const REPEAT_CNT = 10000;

/* function definitions */
std::string to_array_str(vector<data_t>& arr, idx_t max_elem){
  std::stringstream ss;

  idx_t ii=1;
  idx_t cnt = 0;

  for(auto _d:arr){
    ss << setw(8) << _d;
    if(0==(ii%10)){
      ss << ",\n";
      ii=0;
    }else{
      ss << ", ";
    }
    if(cnt>=max_elem) break;
    ++ii;
    ++cnt;
  }
  return ss.str();
}

/**
 * generate random numbers within the specified limit
 */
data_t generate_random_number(data_t const lower_limit, data_t const upper_limit){
  static std::random_device rd;
  static std::mt19937 gen(rd());
  static std::uniform_int_distribution<data_t> dis(lower_limit, upper_limit);
  return dis(gen);
  //return lower_limit + (upper_limit - lower_limit) * ((double)rand() / RAND_MAX);
}

/**
 * test to ensure that the array is in sorted order
 */
bool isTest_array_is_in_order(vector<data_t>& arr){
  bool unorderd = false;
  for(idx_t ii=1;ii<arr.size();++ii){
    if(arr[ii]<arr[ii-1]){
      unorderd = true;
      cout << "unordered Index: " << ii << "\n";
      cout << "arr[ii] " << arr[ii] << " arr[ii-1] " << arr[ii-1] << "\n";

      break;
    }
  }
  return !unorderd;
}

/**
 * merge function
 */
void merge(vector<data_t>& arr, idx_t const left, idx_t const middle, idx_t const right) {
  idx_t const  left_length = middle - left + 1;
  idx_t const right_length = right - middle;
  vector<data_t>  left_array( left_length);
  vector<data_t> right_array(right_length);

  constexpr bool const use_loopcopy = true;
  if(use_loopcopy){
  /* copy values to left array */
  for(idx_t ii=0; ii < left_length; ++ii){
      left_array[ii] = arr[left + ii];
  }
  /* copy values to right array */
  for(idx_t ii=0; ii < right_length; ++ii){
    right_array[ii] = arr[middle + 1 + ii];
  }
  }

  constexpr bool const use_libcode = false;
  if(use_libcode){
  {
    auto from =  arr.begin();
    auto to   = from;
    std::advance(from,left);
    std::advance(to,middle+1);
    std::copy(from,to,left_array.begin());
  }
  {
    auto from = arr.begin();
    auto to   = from;
    std::advance(from,middle+1);
    std::advance(to,right+1);
    std::copy(from,to,right_array.begin());
  }
  }

  idx_t ii = 0;
  idx_t jj = 0;
  idx_t kk = 0;
  /** chose from right and left arrays and copy */
  while((ii < left_length) && (jj < right_length)){
    if(left_array[ii] <= right_array[jj]){
      arr[left + kk] = left_array[ii];
      ++ii;
    } else {
      arr[left + kk] = right_array[jj];
      ++jj;
    }
    ++kk;
  }

  /* copy the remaining values to the array */
  while(ii < left_length){
    arr[left + kk] = left_array[ii];
    ++kk;
    ++ii;
  }

  while(jj < right_length){
    arr[left + kk] = right_array[jj];
    ++kk;
    ++jj;
  }
  return;
}

/**
 * perform merge sort
 */
void merge_sort(vector<data_t>& arr, idx_t const left, idx_t const right) {
  if(left < right){
    idx_t middle = left + ((right - left)>>1);
    //Divide
    merge_sort(arr, left, middle);
    merge_sort(arr, middle + 1, right);
    //Conquer
    merge(arr, left, middle, right);
  }
  return;
}

/**
 * assigns work to each thread to perform merge sort
 */
void thread_merge_sort(vector<data_t>& arr, idx_t const thread_id, idx_t const NUM_THREADS, idx_t const NUMBERS_PER_THREAD, idx_t const OFFSET){
  idx_t left  =  thread_id * (NUMBERS_PER_THREAD);
  idx_t right = (thread_id + 1) * (NUMBERS_PER_THREAD) - 1;
  if(thread_id == (NUM_THREADS - 1)) {
    right += OFFSET;
  }
  merge_sort(arr,left,right);
  return;
}

int main(int argc, const char * argv[]){

  /*
  int const NUM_THREADS        = 16; //atoi (argv[1]);
  int const LENGTH             = 1000000000; //atoi(argv[2]);
  */
  /*
  int const NUM_THREADS        = 8; //atoi (argv[1]);
  int const LENGTH             = 449999; //atoi(argv[2]);
  */
  int const NUM_THREADS        = 8; //atoi (argv[1]);
  int const LENGTH             = 100; //atoi(argv[2])

  int const NUMBERS_PER_THREAD = LENGTH / NUM_THREADS;
  int const OFFSET             = LENGTH % NUM_THREADS;

  cout << sizeof(int) << "\n";

  //srand(time(0)) ;

  std::vector<int> array(LENGTH);
  //array.reserve(LENGTH);

  constexpr size_t  nthreads = NUM_THREADS; //std::thread::hardware_concurrency();
  std::cout<<"parallel ("<<nthreads<<" threads):"<<std::endl;


  uint64_t time = 0;
  bool ordered = true;
  for(idx_t ii=0; ii<REPEAT_CNT; ++ii){

    /* initialize array with random numbers */
    for(int ii=0; ii < LENGTH; ++ii){
      //array.push_back(generate_random_number(LOWER_LIM, UPPER_LIM));
      array[ii]=(generate_random_number(LOWER_LIM, UPPER_LIM));
    }

    /* begin timing */
    auto start = std::chrono::high_resolution_clock::now();

    {
      // Pre loop
      std::vector<std::thread> workers;

      for(std::size_t tt=0; tt<nthreads; ++tt){
        workers.push_back(thread(thread_merge_sort, ref(array), tt, nthreads, NUMBERS_PER_THREAD, OFFSET));
      }

      // await thread termination
      for(thread& t: workers) {
        t.join();
      }

      ordered &= isTest_array_is_in_order(array);
      if(!ordered) break;
    }

      auto elapsed = std::chrono::high_resolution_clock::now() - start;
      auto usec    = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count();
      time = (usec + time)>>1;
  }

  // and print time
  //cout << "Spent " << usec << "[µs] executing " << nthreads << " Threads in parallel working on " << array.size() << " elements " << "\n";
  cout << "Spent " << time << "[µs] executing " << nthreads << " Threads in parallel working on " << array.size() << " elements " << "\n";

  //cout << "Result is ordered: " << isTest_array_is_in_order(array) << "\n";
  cout << "Result is ordered: " << ordered << "\n";

  cout << to_array_str(array,100) << "\n";

  return 0;
}

Но даже если этот код больше не вызывает segfaults, поиграйте с По разным параметрам, оказывается, что он когда-либо производит отсортированный массив, за исключением однопотоковых запусков. Таким образом, анализ chqrl ie является правильным, поскольку отсортированные части отдельных потоков также должны быть отсортированы.

...