MPI асинхронная / односторонняя связь - PullRequest
0 голосов
/ 07 июля 2011

У меня есть ситуация, похожая на код ниже: рабочие процессы работают с подмножеством данных и должны отправить неизвестный объем данных обратно в мастер.Можно ли заставить мастера ждать и получать неизвестное количество посылок от рабочих процессов?Есть ли способ сделать это с помощью одностороннего общения?Заранее спасибо!

#include <errno.h>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

/*
    sample run/output:
    $mpirun -np 5 practice.exe
    @[1]: i=30
    @[2]: i=0
    @[2]: i=75
    @[4]: i=40
    @[4]: i=55
    @[3]: i=85
    @[3]: i=65
*/
int main(int argc, char *argv[])
{
    int i, rank, size, np, nw, num;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &np);
    nw = np -1;

    srand(time(NULL)*rank);

    if (rank > 0)
    {
        for (i=(rank-1); i<(nw*10); i+=nw)
        {
            num = rand() % 100;
            if (num % 5 == 0)
            {
                printf("@[%d]: i=%d\n", rank, num);
                // SEND num TO MASTER
            }
        }
    }
    else
    {
        // RECEIVE num FROM WORKER
    }

    MPI_Finalize();

    return EXIT_SUCCESS;
}

1 Ответ

1 голос
/ 07 июля 2011

Конечно, есть много способов сделать это, но это не имеет ничего общего с асинхронной связью.Вы можете сделать это с помощью односторонней связи, но даже у этого есть свои проблемы с этим (вам все еще нужно угадать, сколько общего объема памяти потребуется для данных).

Один из способов сделать этоэто просто, чтобы выяснить, сколько у вас есть данных, отправить их вперед мастеру, чтобы он знал, сколько сообщений получить, а затем отправлять ваши данные по одному:

#include <errno.h>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

#define MAXPERWORKER 10
#define TAG_NUM_INCOMING 1
#define TAG_DATA 2
int main(int argc, char *argv[])
{
    int i, rank, size, np, nw, num;
    int mynums[MAXPERWORKER], numcount, total;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &np);
    nw = np -1;

    srand(time(NULL)*rank);

    if (rank > 0)
    {
        numcount = 0;
        total    = 0;
        for (i=(rank-1); i<(nw*10); i+=nw)
        {
            num = rand() % 100;
            if (num % 3 == 0)
            {
                printf("@[%d]: i=%d\n", rank, num);
                mynums[numcount] = num;
                numcount++;
                total += num;
            }

        }
        /* of course, in this case we could just
         * do this in one message, but..
         */
        MPI_Send(&numcount, 1, MPI_INT, 0, TAG_NUM_INCOMING, MPI_COMM_WORLD);
        for (i=0; i<numcount; i++)
            MPI_Send(&(mynums[i]), 1, MPI_INT, 0, TAG_DATA, MPI_COMM_WORLD);

        printf("@[%d]: Total of all nums is %d\n", rank, total);
    }
    else
    {
        int *totals = malloc(sizeof(int)*nw);
        int *counts = malloc(sizeof(int)*nw);
        int *sofar  = malloc(sizeof(int)*nw);
        int **data = malloc(sizeof(int *)*nw);
        int rcv;
        int totalcounts;
        int j;
        int workernum;
        MPI_Status status;

        for (i=0; i<nw; i++) {
            sofar[i] = 0;
            totals[i]= 0;
        }

        /* get number of incoming messages */
        for (i=0; i<nw; i++) {
            MPI_Recv(&rcv, 1, MPI_INT, MPI_ANY_SOURCE, TAG_NUM_INCOMING, MPI_COMM_WORLD, &status);

            workernum = status.MPI_SOURCE-1;
            counts[workernum] = rcv;
            totalcounts += rcv;
            data[workernum] = malloc(sizeof(int)*rcv);
        }

        /* get real data */
        for (i=0; i<totalcounts; i++) {
            MPI_Recv(&rcv, 1, MPI_INT, MPI_ANY_SOURCE, TAG_DATA, MPI_COMM_WORLD, &status);
            workernum = status.MPI_SOURCE-1;
            data[ workernum ][ sofar[workernum]++ ] = rcv;
            totals[ workernum ] += rcv;
        }

        /* print results */
        for (i=0; i<nw; i++) {
            printf("From [%2d]:", i+1);
            for (j=0; j<counts[i]; j++)
                printf("%3d ", data[i][j]);
            printf("| %3d\n", totals[i]);
        }

        for (i=0; i<nw; i++)
            free(data[i]);
        free(data);
        free(totals);
        free(counts);
        free(sofar);
    }

    MPI_Finalize();

    return EXIT_SUCCESS;
}

Запуск этого на 4процессы, я получаю:

$ mpirun -np 4 ./masterworker1

@[1]: i=39
@[1]: i=81
@[3]: i=9
@[3]: i=45
@[3]: i=0
@[3]: i=57
@[3]: Total of all nums is 111
@[1]: Total of all nums is 120
From [ 1]: 39  81 | 120
From [ 2]: 24   6  39 |  69
From [ 3]:  9  45   0  57 | 111
@[2]: i=24
@[2]: i=6
@[2]: i=39
@[2]: Total of all nums is 69

Однако, это может быть неосуществимо - вы можете не захотеть буферизовать все ваши данные, как это (и если бы вы могли, вы могли бы просто отправить их в одном сообщении).

Другой подход заключается в отправке данных, а затем отправке специального сообщения, когда вы закончите отправку данных, и мастер просто продолжает получать, пока не услышит одно из следующих сообщений «Готово» от каждого работника:

#include <errno.h>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

#define MAXPERWORKER 10
#define TAG_DATA 2
#define TAG_DONE 1
int main(int argc, char *argv[])
{
    int i, rank, size, np, nw, num;
    int mynums[MAXPERWORKER], numcount, total;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &np);
    nw = np -1;

    srand(time(NULL)*rank);

    if (rank > 0)
    {
        numcount = 0;
        total    = 0;
        for (i=(rank-1); i<(nw*10); i+=nw)
        {
            num = rand() % 100;
            if (num % 3 == 0)
            {
                printf("@[%d]: i=%d\n", rank, num);
                total += num;
                MPI_Send(&num, 1, MPI_INT, 0, TAG_DATA, MPI_COMM_WORLD);
            }

        }
        MPI_Send(&num, 1, MPI_INT, 0, TAG_DONE, MPI_COMM_WORLD);

        printf("@[%d]: Total of all nums is %d\n", rank, total);
    }
    else
    {
        int *totals = malloc(sizeof(int)*nw);
        int *counts = malloc(sizeof(int)*nw);
        int **data = malloc(sizeof(int *)*nw);
        int rcv;
        int j;
        int workernum;
        int stillsending;
        MPI_Status status;

        for (i=0; i<nw; i++) {
            totals[i]= 0;
            counts[i]= 0;
            data[i] = malloc(sizeof(int)*MAXPERWORKER);
        }
        stillsending = nw;

        /* get data */
        while (stillsending > 0) {
            MPI_Recv(&rcv, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);

            workernum = status.MPI_SOURCE-1;
            if (status.MPI_TAG == TAG_DONE) {
                stillsending--;
            } else if (status.MPI_TAG == TAG_DATA) {
                data[workernum][counts[workernum]] = rcv;
                totals[workernum] += rcv;
                counts[workernum]++;
            }
        }

        /* print results */
        for (i=0; i<nw; i++) {
            printf("From [%2d]:", i+1);
            for (j=0; j<counts[i]; j++)
                printf("%3d ", data[i][j]);
            printf("| %3d\n", totals[i]);
        }

        for (i=0; i<nw; i++)
            free(data[i]);
        free(data);
        free(totals);
        free(counts);
    }

    MPI_Finalize();

    return EXIT_SUCCESS;
}

Опять же для 4 задач я получаю:

$ mpirun -np 4 ./masterworker2

@[1]: i=63
@[1]: i=99
@[1]: i=60
@[1]: i=69
@[1]: i=21
@[1]: i=48
@[1]: i=24
@[1]: Total of all nums is 384
@[2]: i=39
@[2]: i=84
@[2]: i=63
@[2]: Total of all nums is 186
@[3]: i=3
@[3]: i=51
@[3]: i=36
@[3]: Total of all nums is 90
From [ 1]: 63  99  60  69  21  48  24 | 384
From [ 2]: 39  84  63 | 186
From [ 3]:  3  51  36 |  90

Обратите внимание, что в обоих этих случаях я использовал некоторый массив размеров MAXPERWORKER для предварительного выделения объектов;хотя вам это и не нужно, вы можете при необходимости использовать массив и realloc или использовать std :: vector, если вы хотите использовать C ++.

...