Добавление двух векторов с использованием pthreads без глобальной переменной суммы - PullRequest
0 голосов
/ 27 апреля 2018

Я пытаюсь вычислить сумму двух векторов a и b, используя pthreads в C. Мне дана функция, которая вычисляет сумму в последовательной форме, а другая - в параллельной форме. Моя программа работает правильно, но вычисляет разные суммы при наличии нескольких потоков. Я использовал правильную синхронизацию потоков в критической области, но все еще не вижу, где я иду не так. Я получаю правильный ответ в первом потоке, поскольку только один поток выполняет работу, а затем я получаю неправильные ответы в нескольких потоках. Вот мой код:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>


// type for value of vector element
typedef short value_t;
// type for vector dimension / indices
typedef long index_t;
// function type to combine two values
typedef value_t (*function_t)(const value_t x, const value_t y);
// struct to store the respective values of the vectors a,b and c
typedef struct{
    index_t start;
    index_t end;
    value_t *arr;
    value_t *brr;
    value_t *crr;
    value_t *part_sum;
    pthread_mutex_t *mutex;
}arg_struct;

// function to combine two values
value_t add(const value_t x, const value_t y) {
  return ((x+y)*(x-y)) % ((int)x+1) +27;
}


// function to initialize the vectors a,b and c
void vectorInit(index_t n, value_t a[n], value_t b[n], value_t c[n]) {

  for(index_t i=0; i<n; i++) {
    a[i] = (value_t)(2*i);
    b[i] = (value_t)(n-i);
    c[i] = 0;
  }
}


// function to count the sum of two variables sequentially
value_t vectorOperation(index_t n, value_t a[n], value_t b[n], value_t c[n], function_t f) {

  value_t sum = 0;

  for(index_t i=0; i<n; i++) {
    sum += (c[i] = f(a[i], b[i]));
  }

  return sum;
}
/* Thread function */
void* vector_sum(void* arg)
{   
    arg_struct *param = (arg_struct*)arg;
   /*
    for(index_t i= param->start; i<param->end; i++)
    {
        pthread_mutex_lock(&param->mutex);
        *param->part_sum += vectorOperation(i,param->arr,param->brr,param->crr,add);
        pthread_mutex_unlock(&param->mutex);
    }
    */
    index_t n = param->end - param->start;
    pthread_mutex_lock(&(*param->mutex));
    // Each thread uses the vectorOperation function to calculate the sum sequentially(Also the critical area)
    *param->part_sum = *param->part_sum + vectorOperation(n,param->arr,param->brr,param->crr,add);
    //*param->part_sum += vectorOperation(param->end-param->start,param->arr,param->brr,param->crr,add);
    pthread_mutex_unlock(&(*param->mutex));

    pthread_exit(NULL);
}



// Sum of two vectors in parallel. 
value_t vectorOperationParallel(index_t n, value_t a[n], value_t b[n], value_t c[n], function_t f, int p) {

  value_t sum = 0;

    pthread_t threads[p];
    arg_struct thread_args[p];
    pthread_mutex_t mutex;
    pthread_mutex_init(&mutex,NULL);
    index_t div = (n+p-1)/p;

      for(int i=0; i<p; i++)
    {
        thread_args[i].start = i*div;
        thread_args[i].end = (i+1)*div;
        thread_args[i].arr = a;
        thread_args[i].brr = b;
        thread_args[i].crr = c;
          for(int j =0; j<div; j++)
          {
          thread_args[i].arr[j] = a[thread_args[i].start+j];
          thread_args[i].brr[j] = b[thread_args[i].start+j];
          thread_args[i].crr[j] = c[thread_args[i].start+j];

          }
        thread_args[i].part_sum = &sum;
        thread_args[i].mutex = &mutex;
        pthread_create(&threads[i],NULL,vector_sum, (void*)&thread_args[i]);
    }


    for(int i=0; i<p; i++)
    {
        pthread_join(threads[i],NULL);
    }
  return sum;
}


int main(int argc, char **argv)
{
  // check for correct argument count
  if (argc != 3)
    {
      printf ("usage: %s vector_size n_threads\n", argv[0]);
      exit (EXIT_FAILURE);
    }

  // get arguments
  // vector size
  index_t n = (index_t)atol (argv[1]);
  // number of threads
  int p = atoi (argv[2]);
  // check for plausible values
  if((p < 1) || (p > 1000)) {
      printf("illegal number of threads\n");
      exit (EXIT_FAILURE);
  }

  // allocate memory
  value_t *a = malloc(n * sizeof(*a));
  value_t *b = malloc(n * sizeof(*b));
  value_t *c = malloc(n * sizeof(*c));
  if((a == NULL) || (b == NULL) || (c == NULL)) {
    printf("no more memory\n");
    exit(EXIT_FAILURE);
  }

  // initialize vectors a,b,c
  vectorInit(n, a, b, c);

  // work on vectors sequentially
  value_t c1sum = vectorOperation(n, a, b, c, add);

  // work on vectors parallel for all thread counts from 1 to p
  for(int thr=1; thr<= p; thr++) {
    // do operation
    value_t c2sum = vectorOperationParallel(n, a, b, c, add, thr);

    // check result
    if(c1sum != c2sum) {
      printf("!!! error: vector results are not identical !!!\nsum1=%ld, sum2=%ld\n", (long)c1sum, (long)c2sum);
      return EXIT_FAILURE;
    } 
    else
        printf("The results are equal: sum1=%ld, sum2=%ld\n",(long)c1sum, (long)c2sum);
  }

  return EXIT_SUCCESS;

}

1 Ответ

0 голосов
/ 27 апреля 2018

Хорошо, я не уверен, но похоже, что это не так.

Сначала имена переменных ужасны.

тогда n.m. прокомментировал:

pthread_mutex_init в цикле, вероятно, плохая идея

Вы рассчитываете index_t div = (elements_in_vector + num_of_threads - 1) / num_of_threads; А позже вы используете div * num_of_threads, чтобы расстроить элементы. Таким образом, вы можете попытаться получить доступ к большему количеству элементов, чем доступно.

пример:

index_t div = (elements_in_vector + num_of_threads - 1) / num_of_threads;
//(13 * 5 - 1) / 5 = 3
thread_args[i].end = (i + 1) * div; // for the last i ( = 2)
//(2 + 1) * 5 = 15

Как только вы получаете доступ к i >= 13, вы получаете значения мусора (неопределенное поведение)

Затем вы создаете копию частей вашего исходного массива (я бы предположил, что это медленнее, чем просто передача ссылки на оригинал).

Кажется, вы не используете массив результатов *thread_args[i].crr.

Вам нужен только мьютекс для суммы всех значений, поскольку у вас есть выделенная память для каждого массива, который вы передаете в потоке. Вы могли бы даже передать указатели оригинальных массивов в потоки без мьютекса, если бы не использовали переменную суммы во всех них. Поскольку каждое добавление является автономным и не имеет доступа к памяти другого добавления, мьютекс не требуется.

Чтобы вычислить сумму всех значений, вы можете просто использовать возвращаемое значение потока вместо ссылки на значение, которое вы передаете каждому. Так было бы намного быстрее.

Я не уверен, что все нашел, но это может помочь вам немного улучшить это.

...