отправка блоков двумерного массива в C с использованием MPI - PullRequest
47 голосов
/ 14 февраля 2012

Как вы отправляете блоки 2-D массива на разные процессоры? Предположим, размер 2D-массива равен 400x400, и я хочу отправить блоки размером 100X100 на разные процессоры. Идея состоит в том, что каждый процессор будет выполнять вычисления на своем отдельном блоке и отправлять свой результат обратно первому процессору для окончательного результата.
Я использую MPI в программах на Си.

Ответы [ 2 ]

132 голосов
/ 14 февраля 2012

Позвольте мне начать с того, что вы обычно не хотите этого делать - разбрасывайте и собирайте огромные куски данных из какого-то "главного" процесса. Как правило, вы хотите, чтобы каждая задача выполнялась в виде отдельной части головоломки, и вы должны стремиться к тому, чтобы ни один процессор не нуждался в «глобальном представлении» всех данных; как только вам это потребуется, вы ограничиваете масштабируемость и размер проблемы. Если вы делаете это для ввода-вывода - один процесс читает данные, затем разбрасывает их, а затем собирает их обратно для записи, в конечном итоге вы захотите изучить MPI-IO.

Однако, возвращаясь к вашему вопросу, у MPI есть очень хорошие способы извлечения произвольных данных из памяти и их разброса / сбора на набор процессоров и обратно. К сожалению, для этого требуется значительное количество концепций MPI - типы MPI, экстенты и коллективные операции. В ответе на этот вопрос обсуждается множество основных идей - MPI_Type_create_subarray и MPI_Gather .

Обновление - В холодном свете дня это много кода и мало объяснений. Итак, позвольте мне немного расширить.

Рассмотрим глобальный массив 1d-целых чисел, который есть в задаче 0, который вы хотите распределить по ряду задач MPI, чтобы каждый из них получил часть в своем локальном массиве. Допустим, у вас есть 4 задачи, а глобальный массив - [01234567]. У вас может быть задача 0 отправить четыре сообщения (включая одно себе), чтобы распространить это, и когда придет время пересобрать, получите четыре сообщения, чтобы связать их вместе; но это очевидно становится очень трудоемким при большом количестве процессов. Для таких операций существуют оптимизированные процедуры - операции разброса / сбора. Так что в этом 1d случае вы бы сделали что-то вроде этого:

int global[8];   /* only task 0 has this */
int local[2];    /* everyone has this */
const int root = 0;   /* the processor with the initial global data */

if (rank == root) {
   for (int i=0; i<7; i++) global[i] = i;
}

MPI_Scatter(global, 2, MPI_INT,      /* send everyone 2 ints from global */
            local,  2, MPI_INT,      /* each proc receives 2 ints into local */
            root, MPI_COMM_WORLD);   /* sending process is root, all procs in */
                                     /* MPI_COMM_WORLD participate */

После этого данные процессоров будут выглядеть как

task 0:  local:[01]  global: [01234567]
task 1:  local:[23]  global: [garbage-]
task 2:  local:[45]  global: [garbage-]
task 3:  local:[67]  global: [garbage-]

То есть операция разброса принимает глобальный массив и отправляет непрерывные 2-целые чанки всем процессорам.

Чтобы собрать массив, мы используем операцию MPI_Gather(), которая работает точно так же, но в обратном порядке:

for (int i=0; i<2; i++) 
   local[i] = local[i] + rank;

MPI_Gather(local,  2, MPI_INT,      /* everyone sends 2 ints from local */
           global, 2, MPI_INT,      /* root receives 2 ints each proc into global */
           root, MPI_COMM_WORLD);   /* recv'ing process is root, all procs in */
                                    /* MPI_COMM_WORLD participate */

и теперь данные выглядят как

task 0:  local:[01]  global: [0134679a]
task 1:  local:[34]  global: [garbage-]
task 2:  local:[67]  global: [garbage-]
task 3:  local:[9a]  global: [garbage-]

Gather возвращает все данные, и здесь a равно 10, потому что я не продумал мое форматирование достаточно тщательно при запуске этого примера.

Что произойдет, если количество точек данных не делит поровну количество процессов, и нам нужно отправлять разное количество элементов для каждого процесса? Затем вам нужна обобщенная версия разброса, MPI_Scatterv(), которая позволяет вам указать количество для каждого процессор и смещения - где в глобальном массиве начинается этот фрагмент данных. Допустим, у вас был массив символов [abcdefghi] с 9 символами, и вы собирались назначить каждому процессу два символа, кроме последнего, который получил три. Тогда вам понадобится

char global[9];   /* only task 0 has this */
char local[3]={'-','-','-'};    /* everyone has this */
int  mynum;                     /* how many items */
const int root = 0;   /* the processor with the initial global data */

if (rank == 0) {
   for (int i=0; i<8; i++) global[i] = 'a'+i;
}

int counts[4] = {2,2,2,3};   /* how many pieces of data everyone has */
mynum = counts[rank];
int displs[4] = {0,2,4,6};   /* the starting point of everyone's data */
                             /* in the global array */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] pts from displs[i] */
            MPI_INT,      
            local, mynum, MPI_INT;   /* I'm receiving mynum MPI_INTs into local */
            root, MPI_COMM_WORLD);

Теперь данные выглядят как

task 0:  local:[ab-]  global: [abcdefghi]
task 1:  local:[cd-]  global: [garbage--]
task 2:  local:[ef-]  global: [garbage--]
task 3:  local:[ghi]  global: [garbage--]

Вы теперь использовали scatterv для распределения нерегулярных объемов данных. Смещение в каждом случае составляет два * ранга (измеряется в символах; смещение выражается в единицах типов, отправляемых для разброса или получаемых для сбора; обычно это не в байтах или чем-то еще) с начала массива, и количество {2,2,2,3}. Если бы это был первый процессор, который мы хотели бы иметь 3 символа, мы бы установили счетчики = {3,2,2,2} и смещения были бы {0,3,5,7}. Гатерв снова работает точно так же, но наоборот; количество и количество массивов останутся прежними.

Теперь для 2D это немного сложнее. Если мы хотим отправить 2d субблоки 2d массива, данные, которые мы отправляем сейчас, больше не являются смежными. Если мы отправляем (скажем) 3x3 субблока массива 6x6 на 4 процессора, в данных, которые мы отправляем, есть дыры:

2D Array

   ---------
   |000|111|
   |000|111|
   |000|111|
   |---+---|
   |222|333|
   |222|333|
   |222|333|
   ---------

Actual layout in memory

   [000111000111000111222333222333222333]

(Обратите внимание, что все высокопроизводительные вычисления сводятся к пониманию расположения данных в памяти.)

Если мы хотим отправить данные, помеченные как «1», в задачу 1, нам нужно пропустить три значения, отправить три значения, пропустить три значения, отправить три значения, пропустить три значения, отправить три значения.Второе осложнение заключается в том, что субрегионы останавливаются и начинаются;обратите внимание, что область "1" не начинается там, где область "0" останавливается;после последнего элемента в области «0» следующее место в памяти находится на полпути через область «1».

Сначала рассмотрим первую проблему макета - как извлечь только те данные, которые мы хотим отправить,Мы всегда можем просто скопировать все данные области «0» в другой непрерывный массив и отправить их;если бы мы планировали это достаточно тщательно, мы могли бы сделать это таким образом, чтобы мы могли назвать MPI_Scatter о результатах.Но мы бы предпочли не транспонировать всю нашу основную структуру данных таким образом.

Пока что все типы данных MPI, которые мы использовали, являются простыми - MPI_INT задает, скажем, 4 байта подряд.Однако MPI позволяет вам создавать свои собственные типы данных, которые описывают произвольно сложные макеты данных в памяти.И этот случай - прямоугольные области массива - достаточно распространен, и для этого есть определенный вызов.Для двумерного случая, который мы описали выше,

    MPI_Datatype newtype;
    int sizes[2]    = {6,6};  /* size of global array */
    int subsizes[2] = {3,3};  /* size of sub-region */
    int starts[2]   = {0,0};  /* let's say we're looking at region "0",
                                 which begins at index [0,0] */

    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &newtype);
    MPI_Type_commit(&newtype);

Это создает тип, который выбирает только область "0" из глобального массива;теперь мы можем отправить только этот фрагмент данных другому процессору

    MPI_Send(&(global[0][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "0" */

, и процесс-получатель может получить его в локальный массив.Обратите внимание, что процесс получения, если он только получает его в массив 3x3, может не описывать то, что он получает, как тип newtype;это больше не описывает структуру памяти.Вместо этого он просто получает блок из 3 * 3 = 9 целых чисел:

    MPI_Recv(&(local[0][0]), 3*3, MPI_INT, 0, tag, MPI_COMM_WORLD);

Обратите внимание, что мы можем сделать это и для других субрегионов, либо создав другой тип (с другим start).массив) для других блоков или просто отправив в начальную точку конкретного блока:

    MPI_Send(&(global[0][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "1" */
    MPI_Send(&(global[3][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "2" */
    MPI_Send(&(global[3][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "3" */

Наконец, обратите внимание, что мы требуем, чтобы глобальные и локальные были непрерывными порциями памяти здесь;то есть &(global[0][0]) и &(local[0][0]) (или, что эквивалентно, *global и *local указывают на смежные блоки памяти 6 * 6 и 3 * 3; это не гарантируется обычным способом распределения динамическогомассивы d. Ниже показано, как это сделать.

Теперь, когда мы понимаем, как указывать субрегионы, есть еще одна вещь, которую нужно обсудить перед использованием операций разброса / сбора, и это «размер» этих типов.Мы не могли просто использовать MPI_Scatter() (или даже scatterv) с этими типами, потому что у этих типов есть экстент 16 целых чисел, то есть, где они заканчиваются 16 целыми числами после того, как они начинаются - и где они заканчиваются, нев соответствии с тем, где начинается следующий блок, поэтому мы не можем просто использовать scatter - он выберет неправильное место для начала отправки данных на следующий процессор.

Конечно, мы могли бы использовать MPI_Scatterv() исами определяем смещения, и это то, что мы будем делать - за исключением смещений в единицах размера типа send, и это нам тоже не помогает, блоки начинаются со смещений(0,3,18,21) целые числа от начала глобального массива, и тот факт, что блок заканчивается 16 целыми числами от того места, где он начинается, не позволяет нам вообще выражать эти смещения в целочисленных кратных.

Чтобы справиться с этим, MPI позволяет вам установить экстент типа для целей этих вычислений.Это не усекает тип;он просто используется для определения, где начинается следующий элемент с учетом последнего элемента.Для типов, подобных этим, с отверстиями в них, часто удобно установить экстент как нечто меньшее, чем расстояние в памяти до фактического конца типа.

Мы можем установить экстент так, как нам удобно.Мы можем просто сделать экстент 1 целым, а затем установить смещения в единицах целых чисел.В этом случае, однако, мне нравится устанавливать экстент равным 3 целым числам - размер подстроки - таким образом, блок "1" начинается сразу после блока "0", а блок "3" начинается сразу после блока ".2" .К сожалению, при переходе от блока «2» к блоку «3» это работает не так хорошо, но с этим ничего не поделаешь.

Так что для разброса субблоков в этом случае мы бы сделалиследующее:

    MPI_Datatype type, resizedtype;
    int sizes[2]    = {6,6};  /* size of global array */
    int subsizes[2] = {3,3};  /* size of sub-region */
    int starts[2]   = {0,0};  /* let's say we're looking at region "0",
                                 which begins at index [0,0] */

    /* as before */
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);  
    /* change the extent of the type */
    MPI_Type_create_resized(type, 0, 3*sizeof(int), &resizedtype);
    MPI_Type_commit(&resizedtype);

Здесь мы создали тот же тип блока, что и раньше, но мы изменили его размер;мы не изменили, где тип «начинается» (0), но мы изменили, где он «заканчивается» (3 дюйма).Мы не упоминали об этом раньше, но MPI_Type_commit требуется, чтобы иметь возможность использовать тип;но вам нужно только зафиксировать последний тип, который вы на самом деле используете, а не промежуточные шаги.Вы используете MPI_Type_free, чтобы освободить тип, когда вы закончите.

Так что теперь, наконец, мы можем разбить блоки: манипуляции с данными выше немного сложны, но как только это будет сделано, scatterv выглядит простокак раньше:

int counts[4] = {1,1,1,1};   /* how many pieces of data everyone has, in units of blocks */
int displs[4] = {0,1,6,7};   /* the starting point of everyone's data */
                             /* in the global array, in block extents */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] types from displs[i] */
            resizedtype,      
            local, 3*3, MPI_INT;   /* I'm receiving 3*3 MPI_INTs into local */
            root, MPI_COMM_WORLD);

И теперь мы закончили, после небольшого обзора типов разброса, сбора и MPI.

Пример кода, который показывает как сборку, так и разброс.операция с массивами символов, следующим образом.Запускаем программу:

$ mpirun -n 4 ./gathervarray
Global array is:
0123456789
3456789012
6789012345
9012345678
2345678901
5678901234
8901234567
1234567890
4567890123
7890123456
Local process on rank 0 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Local process on rank 1 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 2 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 3 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Processed grid:
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD

и код следующий.

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"

int malloc2dchar(char ***array, int n, int m) {

    /* allocate the n*m contiguous items */
    char *p = (char *)malloc(n*m*sizeof(char));
    if (!p) return -1;

    /* allocate the row pointers into the memory */
    (*array) = (char **)malloc(n*sizeof(char*));
    if (!(*array)) {
       free(p);
       return -1;
    }

    /* set up the pointers into the contiguous memory */
    for (int i=0; i<n; i++)
       (*array)[i] = &(p[i*m]);

    return 0;
}

int free2dchar(char ***array) {
    /* free the memory - the first element of the array is at the start */
    free(&((*array)[0][0]));

    /* free the pointers into the memory */
    free(*array);

    return 0;
}

int main(int argc, char **argv) {
    char **global, **local;
    const int gridsize=10; // size of grid
    const int procgridsize=2;  // size of process grid
    int rank, size;        // rank of current process and no. of processes

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);


    if (size != procgridsize*procgridsize) {
        fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
        MPI_Abort(MPI_COMM_WORLD,1);
    }


    if (rank == 0) {
        /* fill in the array, and print it */
        malloc2dchar(&global, gridsize, gridsize);
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                global[i][j] = '0'+(3*i+j)%10;
        }


        printf("Global array is:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                putchar(global[i][j]);

            printf("\n");
        }
    }

    /* create the local array which we'll process */
    malloc2dchar(&local, gridsize/procgridsize, gridsize/procgridsize);

    /* create a datatype to describe the subarrays of the global array */

    int sizes[2]    = {gridsize, gridsize};         /* global size */
    int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize};     /* local size */
    int starts[2]   = {0,0};                        /* where this one starts */
    MPI_Datatype type, subarrtype;
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &type);
    MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(char), &subarrtype);
    MPI_Type_commit(&subarrtype);

    char *globalptr=NULL;
    if (rank == 0) globalptr = &(global[0][0]);

    /* scatter the array to all processors */
    int sendcounts[procgridsize*procgridsize];
    int displs[procgridsize*procgridsize];

    if (rank == 0) {
        for (int i=0; i<procgridsize*procgridsize; i++) sendcounts[i] = 1;
        int disp = 0;
        for (int i=0; i<procgridsize; i++) {
            for (int j=0; j<procgridsize; j++) {
                displs[i*procgridsize+j] = disp;
                disp += 1;
            }
            disp += ((gridsize/procgridsize)-1)*procgridsize;
        }
    }


    MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
                 gridsize*gridsize/(procgridsize*procgridsize), MPI_CHAR,
                 0, MPI_COMM_WORLD);

    /* now all processors print their local data: */

    for (int p=0; p<size; p++) {
        if (rank == p) {
            printf("Local process on rank %d is:\n", rank);
            for (int i=0; i<gridsize/procgridsize; i++) {
                putchar('|');
                for (int j=0; j<gridsize/procgridsize; j++) {
                    putchar(local[i][j]);
                }
                printf("|\n");
            }
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    /* now each processor has its local array, and can process it */
    for (int i=0; i<gridsize/procgridsize; i++) {
        for (int j=0; j<gridsize/procgridsize; j++) {
            local[i][j] = 'A' + rank;
        }
    }

    /* it all goes back to process 0 */
    MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize),  MPI_CHAR,
                 globalptr, sendcounts, displs, subarrtype,
                 0, MPI_COMM_WORLD);

    /* don't need the local data anymore */
    free2dchar(&local);

    /* or the MPI data type */
    MPI_Type_free(&subarrtype);

    if (rank == 0) {
        printf("Processed grid:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++) {
                putchar(global[i][j]);
            }
            printf("\n");
        }

        free2dchar(&global);
    }


    MPI_Finalize();

    return 0;
}
1 голос
/ 30 декабря 2015

Мне просто было проще это проверить.

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"

/*
 This is a version with integers, rather than char arrays, presented in this
 very good answer: http://stackoverflow.com/a/9271753/2411320
 It will initialize the 2D array, scatter it, increase every value by 1 and then gather it back.
*/

int malloc2D(int ***array, int n, int m) {
    int i;
    /* allocate the n*m contiguous items */
    int *p = malloc(n*m*sizeof(int));
    if (!p) return -1;

    /* allocate the row pointers into the memory */
    (*array) = malloc(n*sizeof(int*));
    if (!(*array)) {
       free(p);
       return -1;
    }

    /* set up the pointers into the contiguous memory */
    for (i=0; i<n; i++)
       (*array)[i] = &(p[i*m]);

    return 0;
}

int free2D(int ***array) {
    /* free the memory - the first element of the array is at the start */
    free(&((*array)[0][0]));

    /* free the pointers into the memory */
    free(*array);

    return 0;
}

int main(int argc, char **argv) {
    int **global, **local;
    const int gridsize=4; // size of grid
    const int procgridsize=2;  // size of process grid
    int rank, size;        // rank of current process and no. of processes
    int i, j, p;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);


    if (size != procgridsize*procgridsize) {
        fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
        MPI_Abort(MPI_COMM_WORLD,1);
    }


    if (rank == 0) {
        /* fill in the array, and print it */
        malloc2D(&global, gridsize, gridsize);
        int counter = 0;
        for (i=0; i<gridsize; i++) {
            for (j=0; j<gridsize; j++)
                global[i][j] = ++counter;
        }


        printf("Global array is:\n");
        for (i=0; i<gridsize; i++) {
            for (j=0; j<gridsize; j++) {
                printf("%2d ", global[i][j]);
            }
            printf("\n");
        }
    }
    //return;

    /* create the local array which we'll process */
    malloc2D(&local, gridsize/procgridsize, gridsize/procgridsize);

    /* create a datatype to describe the subarrays of the global array */
    int sizes[2]    = {gridsize, gridsize};         /* global size */
    int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize};     /* local size */
    int starts[2]   = {0,0};                        /* where this one starts */
    MPI_Datatype type, subarrtype;
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);
    MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(int), &subarrtype);
    MPI_Type_commit(&subarrtype);

    int *globalptr=NULL;
    if (rank == 0)
        globalptr = &(global[0][0]);

    /* scatter the array to all processors */
    int sendcounts[procgridsize*procgridsize];
    int displs[procgridsize*procgridsize];

    if (rank == 0) {
        for (i=0; i<procgridsize*procgridsize; i++)
            sendcounts[i] = 1;
        int disp = 0;
        for (i=0; i<procgridsize; i++) {
            for (j=0; j<procgridsize; j++) {
                displs[i*procgridsize+j] = disp;
                disp += 1;
            }
            disp += ((gridsize/procgridsize)-1)*procgridsize;
        }
    }


    MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
                 gridsize*gridsize/(procgridsize*procgridsize), MPI_INT,
                 0, MPI_COMM_WORLD);

    /* now all processors print their local data: */

    for (p=0; p<size; p++) {
        if (rank == p) {
            printf("Local process on rank %d is:\n", rank);
            for (i=0; i<gridsize/procgridsize; i++) {
                putchar('|');
                for (j=0; j<gridsize/procgridsize; j++) {
                    printf("%2d ", local[i][j]);
                }
                printf("|\n");
            }
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    /* now each processor has its local array, and can process it */
    for (i=0; i<gridsize/procgridsize; i++) {
        for (j=0; j<gridsize/procgridsize; j++) {
            local[i][j] += 1; // increase by one the value
        }
    }

    /* it all goes back to process 0 */
    MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize),  MPI_INT,
                 globalptr, sendcounts, displs, subarrtype,
                 0, MPI_COMM_WORLD);

    /* don't need the local data anymore */
    free2D(&local);

    /* or the MPI data type */
    MPI_Type_free(&subarrtype);

    if (rank == 0) {
        printf("Processed grid:\n");
        for (i=0; i<gridsize; i++) {
            for (j=0; j<gridsize; j++) {
                printf("%2d ", global[i][j]);
            }
            printf("\n");
        }

        free2D(&global);
    }


    MPI_Finalize();

    return 0;
}

Вывод:

linux16:>mpicc -o main main.c
linux16:>mpiexec -n 4 main Global array is:
 1  2  3  4
 5  6  7  8
 9 10 11 12
13 14 15 16
Local process on rank 0 is:
| 1  2 |
| 5  6 |
Local process on rank 1 is:
| 3  4 |
| 7  8 |
Local process on rank 2 is:
| 9 10 |
|13 14 |
Local process on rank 3 is:
|11 12 |
|15 16 |
Processed grid:
 2  3  4  5
 6  7  8  9
10 11 12 13
14 15 16 17
...