Отправка распределенных фрагментов двумерного массива в корневой процесс в MPI - PullRequest
3 голосов
/ 03 апреля 2011

У меня есть 2D-массив, который распределен по сетке процессов MPI (3 x 2 процесса в этом примере). Значения массива генерируются в процессе, в который распространяется этот фрагмент массива, и я хочу собрать все эти фрагменты вместе в корневом процессе, чтобы отобразить их.

Пока у меня есть код ниже. При этом генерируется декартовый коммуникатор, выясняются координаты процесса MPI и определяется, сколько массива он должен получить на основе этого (поскольку массив не должен быть кратным размеру декартовой сетки). Затем я создаю новый производный тип данных MPI, который будет отправлять весь этот подмассив процессов как один элемент (то есть, шаг, длина блока и число различны для каждого процесса, поскольку каждый процесс имеет массивы разного размера). Однако, когда я собираю данные вместе с MPI_Gather, я получаю ошибку сегментации.

Я думаю, это потому, что я не должен использовать один и тот же тип данных для отправки и получения при вызове MPI_Gather. Тип данных подходит для отправки данных, так как он имеет правильное количество, шаг и длину блока, но когда он доберется до другого конца, ему потребуется совсем другой производный тип данных. Я не уверен, как рассчитать параметры для этого типа данных - у кого-нибудь есть идеи?

Кроме того, если я подхожу к этому с совершенно неправильного угла, пожалуйста, дайте мне знать!

#include<stdio.h>
#include<array_alloc.h>
#include<math.h>
#include<mpi.h>

int main(int argc, char ** argv)
{
    int size, rank;
    int dim_size[2];
    int periods[2];
    int A = 2;
    int B = 3;
    MPI_Comm cart_comm;
    MPI_Datatype block_type;
    int coords[2];

    float **array;
    float **whole_array;

    int n = 10;
    int rows_per_core;
    int cols_per_core;
    int i, j;

    int x_start, x_finish;
    int y_start, y_finish;

    /* Initialise MPI */
    MPI_Init(&argc, &argv);

    /* Get the rank for this process, and the number of processes */
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if (rank == 0)
    {
        /* If we're the master process */
        whole_array = alloc_2d_float(n, n);

        /* Initialise whole array to silly values */
        for (i = 0; i < n; i++)
        {
            for (j = 0; j < n; j++)
            {
                whole_array[i][j] = 9999.99;
            }
        }

        for (j = 0; j < n; j ++)
        {
            for (i = 0; i < n; i++)
            {
                printf("%f ", whole_array[j][i]);
            }
            printf("\n");
        }
    }

    /* Create the cartesian communicator */
    dim_size[0] = B;
    dim_size[1] = A;
    periods[0] = 1;
    periods[1] = 1;

    MPI_Cart_create(MPI_COMM_WORLD, 2, dim_size, periods, 1, &cart_comm);

    /* Get our co-ordinates within that communicator */
    MPI_Cart_coords(cart_comm, rank, 2, coords);

    rows_per_core = ceil(n / (float) A);
    cols_per_core = ceil(n / (float) B);

    if (coords[0] == (B - 1))
    {
        /* We're at the far end of a row */
        cols_per_core = n - (cols_per_core * (B - 1));
    }
    if (coords[1] == (A - 1))
    {
        /* We're at the bottom of a col */
        rows_per_core = n - (rows_per_core * (A - 1));
    }

    printf("X: %d, Y: %d, RpC: %d, CpC: %d\n", coords[0], coords[1], rows_per_core, cols_per_core);

    MPI_Type_vector(rows_per_core, cols_per_core, cols_per_core + 1, MPI_FLOAT, &block_type);
    MPI_Type_commit(&block_type);

    array = alloc_2d_float(rows_per_core, cols_per_core);

    if (array == NULL)
    {
        printf("Problem with array allocation.\nExiting\n");
        return 1;
    }

    for (j = 0; j < rows_per_core; j++)
    {
        for (i = 0; i < cols_per_core; i++)
        {
            array[j][i] = (float) (i + 1);
        }
    }

    MPI_Barrier(MPI_COMM_WORLD);

    MPI_Gather(array, 1, block_type, whole_array, 1, block_type, 0, MPI_COMM_WORLD);

    /*
    if (rank == 0)
    {
        for (j = 0; j < n; j ++)
        {
            for (i = 0; i < n; i++)
            {
                printf("%f ", whole_array[j][i]);
            }
            printf("\n");
        }
    }
    */
    /* Close down the MPI environment */
    MPI_Finalize();
}

Подпрограмма выделения двумерного массива, которую я использовал выше, реализована как:

float **alloc_2d_float( int ndim1, int ndim2 ) {

  float **array2 = malloc( ndim1 * sizeof( float * ) );

  int i;

  if( array2 != NULL ){

    array2[0] = malloc( ndim1 * ndim2 * sizeof( float ) );

    if( array2[ 0 ] != NULL ) {

      for( i = 1; i < ndim1; i++ )
    array2[i] = array2[0] + i * ndim2;

    }

    else {
      free( array2 );
      array2 = NULL;
    }

  }

  return array2;

}

Ответы [ 3 ]

4 голосов
/ 05 апреля 2011

Это сложный вопрос. Вы на правильном пути, и да, вам понадобятся разные типы для отправки и получения.

Отправка части проста - если вы отправляете весь подмассив array, то вам даже не нужен векторный тип; Вы можете отправить все (rows_per_core)*(cols_per_core) смежных чисел, начиная с &(array[0][0]) (или array[0], если хотите).

Это сложная часть, как вы и собрали. Давайте начнем с самого простого случая - предположим, что все делится равномерно, поэтому все блоки имеют одинаковый размер. Тогда вы можете использовать самый helfpul MPI_Type_create_subarray (вы всегда можете сделать это вместе с векторными типами, но для массивов более высокой размерности это становится утомительным, так как вам нужно создать 1 промежуточный тип для каждого измерения массива, кроме последний ...

Кроме того, вместо жесткого кодирования декомпозиции, вы можете использовать также полезный MPI_Dims_create, чтобы создать декомпозицию ваших рангов как можно более квадратной. Заметка что это не обязательно связано с MPI_Cart_create, хотя вы можете использовать его для запрошенных измерений. Я собираюсь пропустить материал cart_create здесь не потому, что это бесполезно, а потому, что я хочу сосредоточиться на материале сбора.

Так что, если у всех одинаковый размер array, root получает одинаковый тип данных от всех, и для получения данных можно использовать очень простой тип подмассива:

MPI_Type_create_subarray(2, whole_array_size, sub_array_size, starts,
                         MPI_ORDER_C, MPI_FLOAT, &block_type);
MPI_Type_commit(&block_type);

где sub_array_size[] = {rows_per_core, cols_per_core}, whole_array_size[] = {n,n}, а здесь starts[]={0,0} - например , мы просто предположим, что все начинается с начала. Причина этого в том, что мы можем затем использовать Gatherv для явной установки смещений в массив:

for (int i=0; i<size; i++) {
    counts[i] = 1;   /* one block_type per rank */

    int row = (i % A);
    int col = (i / A);
    /* displacement into the whole_array */
    disps[i] = (col*cols_per_core + row*(rows_per_core)*n);
}

MPI_Gatherv(array[0], rows_per_core*cols_per_core, MPI_FLOAT,
            recvptr, counts, disps, resized_type, 0, MPI_COMM_WORLD);

Итак, теперь каждый отправляет свои данные в виде одного фрагмента, и они поступают в тип в правую часть массива. Чтобы это работало, я изменил тип, чтобы его экстент был только одним плавающим, поэтому смещения можно рассчитать в этой единице:

MPI_Type_create_resized(block_type, 0, 1*sizeof(float), &resized_type);
MPI_Type_commit(&resized_type);

Весь код ниже:

#include<stdio.h>
#include<stdlib.h>
#include<math.h>
#include<mpi.h>

float **alloc_2d_float( int ndim1, int ndim2 ) {
    float **array2 = malloc( ndim1 * sizeof( float * ) );
    int i;

    if( array2 != NULL ){
        array2[0] = malloc( ndim1 * ndim2 * sizeof( float ) );
        if( array2[ 0 ] != NULL ) {
            for( i = 1; i < ndim1; i++ )
                array2[i] = array2[0] + i * ndim2;
        }

        else {
            free( array2 );
            array2 = NULL;
        }
    }
    return array2;
}

void free_2d_float( float **array ) {
    if (array != NULL) {
        free(array[0]);
        free(array);
    }
    return;
}

void init_array2d(float **array, int ndim1, int ndim2, float data) {
    for (int i=0; i<ndim1; i++) 
        for (int j=0; j<ndim2; j++)
            array[i][j] = data;
    return;
}

void print_array2d(float **array, int ndim1, int ndim2) {
    for (int i=0; i<ndim1; i++) {
        for (int j=0; j<ndim2; j++) {
            printf("%6.2f ", array[i][j]);
        }
        printf("\n");
    }
    return;
}


int main(int argc, char ** argv)
{
    int size, rank;
    int dim_size[2];
    int periods[2];
    MPI_Datatype block_type, resized_type;

    float **array;
    float **whole_array;
    float *recvptr;

    int *counts, *disps;

    int n = 10;
    int rows_per_core;
    int cols_per_core;
    int i, j;

    int whole_array_size[2];
    int sub_array_size[2];
    int starts[2];
    int A, B;

    /* Initialise MPI */
    MPI_Init(&argc, &argv);

    /* Get the rank for this process, and the number of processes */
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if (rank == 0)
    {
        /* If we're the master process */
        whole_array = alloc_2d_float(n, n);
        recvptr = &(whole_array[0][0]);

        /* Initialise whole array to silly values */
        for (i = 0; i < n; i++)
        {
            for (j = 0; j < n; j++)
            {
                whole_array[i][j] = 9999.99;
            }
        }

        print_array2d(whole_array, n, n);
        puts("\n\n");
    }

    /* Create the cartesian communicator */
    MPI_Dims_create(size, 2, dim_size);
    A = dim_size[1];
    B = dim_size[0];
    periods[0] = 1;
    periods[1] = 1;

    rows_per_core = ceil(n / (float) A);
    cols_per_core = ceil(n / (float) B);
    if (rows_per_core*A != n) {
        if (rank == 0) fprintf(stderr,"Aborting: rows %d don't divide by %d evenly\n", n, A);
        MPI_Abort(MPI_COMM_WORLD,1);
    }
    if (cols_per_core*B != n) {
        if (rank == 0) fprintf(stderr,"Aborting: cols %d don't divide by %d evenly\n", n, B);
        MPI_Abort(MPI_COMM_WORLD,2);
    }

    array = alloc_2d_float(rows_per_core, cols_per_core);
    printf("%d, RpC: %d, CpC: %d\n", rank, rows_per_core, cols_per_core);

    whole_array_size[0] = n;             
    sub_array_size  [0] = rows_per_core; 
    whole_array_size[1] = n;
    sub_array_size  [1] = cols_per_core;
    starts[0] = 0; starts[1] = 0;

    MPI_Type_create_subarray(2, whole_array_size, sub_array_size, starts, 
                             MPI_ORDER_C, MPI_FLOAT, &block_type);
    MPI_Type_commit(&block_type);
    MPI_Type_create_resized(block_type, 0, 1*sizeof(float), &resized_type);
    MPI_Type_commit(&resized_type);

    if (array == NULL)
    {
        printf("Problem with array allocation.\nExiting\n");
        MPI_Abort(MPI_COMM_WORLD,3);
    }

    init_array2d(array,rows_per_core,cols_per_core,(float)rank);

    counts = (int *)malloc(size * sizeof(int));
    disps  = (int *)malloc(size * sizeof(int));
    /* note -- we're just using MPI_COMM_WORLD rank here to
     * determine location, not the cart_comm for now... */
    for (int i=0; i<size; i++) {
        counts[i] = 1;   /* one block_type per rank */

        int row = (i % A);
        int col = (i / A);
        /* displacement into the whole_array */
        disps[i] = (col*cols_per_core + row*(rows_per_core)*n);
    }

    MPI_Gatherv(array[0], rows_per_core*cols_per_core, MPI_FLOAT, 
                recvptr, counts, disps, resized_type, 0, MPI_COMM_WORLD);

    free_2d_float(array);
    if (rank == 0) print_array2d(whole_array, n, n);
    if (rank == 0) free_2d_float(whole_array);
    MPI_Finalize();
}

Незначительная вещь - вам не нужен барьер перед собранием. На самом деле, вам вряд ли когда-либо действительно нужен барьер, и это дорогостоящие операции по нескольким причинам, и они могут скрыть проблемы - мое эмпирическое правило заключается в том, чтобы никогда, никогда не использовать барьеры, если вы точно не знаете, почему это правило необходимо сломан в этом случае. В этом случае, в частности, коллективная подпрограмма gather выполняет точно такую ​​же синхронизацию, что и барьер, поэтому просто используйте ее.

Теперь перейдем к более сложным вещам. Если вещи не делятся равномерно, у вас есть несколько вариантов. Самый простой, хотя и не обязательно лучший, это просто заполнить массив так, чтобы он делил равномерно, даже если только для этой операции.

Если вы можете сделать так, чтобы количество столбцов делилось равномерно, даже если количество строк не делалось, тогда вы все равно можете использовать collectv и создать векторный тип для каждой части строки, и соответствующее количество строк от каждого процессора. Это будет работать нормально.

Если у вас определенно есть случай, когда ни один из них не может быть рассчитан на деление, и вы не можете заполнить данные для отправки, тогда я вижу три подопции:

  • Как предполагает susterpatt, делайте двухточечное. Для небольшого числа задач это хорошо, но по мере увеличения оно будет значительно менее эффективным, чем коллективные операции.
  • Создайте коммуникатор, состоящий из всех процессоров, не относящихся к внешним границам, и используйте точно код выше для сбора их кода; и затем данные точка-точка краевых задач.
  • Не собирайтесь вообще обрабатывать 0; используйте тип распределенного массива для описания макета массива и используйте MPI-IO для записи всех данных в файл; как только это будет сделано, вы можете иметь нулевой процесс, отображающий данные, если хотите.
1 голос
/ 04 апреля 2011

Похоже, что первый аргумент для вас MPI_Gather вызов, вероятно, должен быть array[0], а не array.

Кроме того, если вам нужно получать разные объемы данных для каждого ранга, вам лучше использовать MPI_Gatherv.

Наконец, не то, что сбор всех ваших данных в одном месте для вывода не является масштабируемым во многих обстоятельствах. По мере роста объема данных со временем он будет превышать объем памяти, доступной для ранга 0. Возможно, вам будет гораздо лучше распределять выходные данные (если вы пишете в файл, используя MPI IO или другие вызовы библиотеки) или выполняете точечные вычисления. отправка в двухточечный ранг по одному на один ранг, чтобы ограничить общее потребление памяти.

С другой стороны, я бы не рекомендовал бы координировать печать каждого из ваших рядов со стандартным выводом, один за другим, потому что некоторые основные реализации MPI не гарантируют, что стандартный вывод будет производиться в порядке. В частности, MPI Cray довольно тщательно перемешивает стандартный вывод, если печатаются несколько рангов.

0 голосов
/ 04 апреля 2011

В соответствии с этим (выделено мной):

Условия согласования типов для коллективных операций более строгие, чем соответствующие условия между отправителем и получателем в точке.В точку.А именно, для коллективных операций, объем отправляемых данных должен точно соответствовать количеству данных, указанному получателем .Карты различного типа между отправителем и получателем по-прежнему разрешены.

Похоже, у вас есть два варианта:

  1. Добавление меньших подматриц, так что все процессы отправляют одинаковое количестводанные, а затем обрезать матрицу до ее первоначального размера после сбора.Если вы чувствуете себя авантюрным, вы можете попытаться определить принимаемую карту типов, чтобы отступы автоматически перезаписывались во время операции сбора, таким образом устраняя необходимость в кадрировании впоследствии.Это может быть немного сложнее, хотя.
  2. Возврат к двухточечному общению.Гораздо проще, но, возможно, более высокие затраты на связь.

Лично я бы выбрал вариант 2.

...