MPI "пул" метод с MPI_Reduce - PullRequest
       59

MPI "пул" метод с MPI_Reduce

0 голосов
/ 29 января 2020

Я хочу адаптировать метод "pool" для своего кода, и мне нужна поддержка. Я привязываю mpi к сокету и omp к ядру. Для mpi я использую один метод пула (для переподписки) для двух функций fx и fy (0-4 для fx и 5-9 для fy).

первый вопрос:

в возвращенном массиве, могу ли я использовать MPI_Reduce (... MPI_MIN ...) вместо MPI_Recv для "пула"? Я немного запутался, потому что MPI_Reduce использует одно число, но мне нужно проверить min для одного элемента массива, а затем сохранить весь массив. Кроме того, пул означает, что у меня есть задачи, которые я даже не запускал, и Reduce должен ждать их запуска и завершения sh. Как я сделал это сейчас с MPI_Recv, результаты не верны (для этого небольшого примера может быть правильным).

второй вопрос только для информации

Есть ли способ назначить из "объединить" заданную спецификацию MPI c с указанным c узлом? (например, задача 7 переходит к узлу 2) Поскольку некоторые из процессоров более способны, чем другие, и задачи различаются, я хочу выделить более крупные задачи для улучшения работы процессора. В противном случае, если большая задача переходит к слабому процессору, это занимает слишком много времени.

спасибо

/*
gcc ex.c -os
mpicc ex.c -op
mpirun -n 3 ./p
*/
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <assert.h>

#define MIN(x, y) ((x) < (y) ? x : y)
#define is_mpi 1 // change 0 for-non mpi, 1 for mpi and compile it accordinglly

#if is_mpi
    #include <mpi.h>
#endif

void fx(int *&rx, int l) {
    // here I have isa dispatch + omp(with reduction) + intrinsics
    rx[0] = l;
    rx[1] = (rand() / (float)RAND_MAX) * 10;
    printf("x %d %d\n", rx[0], rx[1]);
}

void fy(int *&ry, int l) {
    // isa + ...
    ry[0] = l;
    ry[1] = (rand() / (float)RAND_MAX) * 10;
    printf("y %d %d\n", ry[0], ry[1]);
}

#if is_mpi
void wrk(int lu) {
    int *rx = (int *) calloc(2, sizeof(int));
    int *ry = (int *) calloc(2, sizeof(int));
    rx[1] = 11;
    ry[1] = 11;

    MPI_Status status;
    int myrank, l; MPI_Comm_rank(MPI_COMM_WORLD, &myrank);

    if (myrank <= 2 * lu) {
        MPI_Recv(&l, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
        while (status.MPI_TAG > 0) {

            if (l < lu) { // the pool is 0-4 for fx
                fx(rx, l);
                MPI_Send(rx, 2, MPI_INT, 0, status.MPI_TAG, MPI_COMM_WORLD); }
            else {   // the pool is 5-9 for fy
                fy(ry, l);
                MPI_Send(ry, 2, MPI_INT, 0, status.MPI_TAG, MPI_COMM_WORLD); }

            MPI_Recv(&l, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status); } }
    free(rx); free(ry);
}
int *arr(int dlu) {
    int *mat = (int *)calloc(dlu, sizeof(int));
    for (int i = 0; i < dlu; i++)
        mat[i] = i;
    return mat;
}
void mas(int **&res, int lu, int ws) {
    MPI_Status status;  int nsent = 0; 
    int *r8 = (int *) calloc(2, sizeof(int));
    int *mat = arr(2 * lu);

    for (int i = 1; i <= MIN((ws - 1), 2 * lu); i++) {  //  for (i = 1; i < MIN(ws, 2 * lu); i++) { for (i = 1; i <= MIN((ws-1), 2 * lu); i++) {
        MPI_Send(&mat[i-1], 1, MPI_INT, i, i, MPI_COMM_WORLD);
        nsent++; }

    for (int i = 0; i < 2 * lu; i++) {

        //MPI_Reduce(&r8, &res, 1, MPI_INT, MPI_MIN, 0, MPI_COMM_WORLD);                
        MPI_Recv(r8, 2, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);

        int t = 0;  // next lines are for min (instead of reduction mpi_min)
        if (i >= 5)
            t = 1;
        if (res[t][1] > r8[1])
            for (int c = 0; c < 2; c++)
                res[t][c] = r8[c];

        if (nsent < 2 * lu) {
            MPI_Send(&mat[nsent], 1, MPI_INT, status.MPI_SOURCE, nsent + 1, MPI_COMM_WORLD);
            nsent++; }
        else {
            MPI_Send(MPI_BOTTOM, 0, MPI_INT, status.MPI_SOURCE, 0, MPI_COMM_WORLD); }
    }

    free(r8); free(mat);
}
#endif

int main(void) {
    int wr = 0, ws = 0;
    #if is_mpi
        MPI_Init(NULL, NULL);
        MPI_Comm_rank(MPI_COMM_WORLD, &wr);
        MPI_Comm_size(MPI_COMM_WORLD, &ws);
    #endif

    int **res, lu = 5;
    if(wr == 0) {
        res = (int **) malloc(2 * sizeof(int *));
        for(int i = 0; i < 2; i++) {
            res[i] = (int *) calloc(2, sizeof(int));
            res[i][0] = 11;
            res[i][1] = 11;
        }
    }

    #if is_mpi
        if (wr == 0) { mas(res, lu, ws); }
        else                 { wrk(lu); }
    #else
        int *rx = (int *) calloc(2, sizeof(int));
        int *ry = (int *) calloc(2, sizeof(int));
        for (int l = 0; l < lu; l++) {
            fx(rx, l);
            fy(ry, l);
            if (res[0][1] > rx[1]) {
                res[0][0] = rx[0];
                res[0][1] = rx[1];
            }
            if (res[1][1] > ry[1]) {
                res[1][0] = ry[0];
                res[1][1] = ry[1];
            }
        }       
    #endif

    if(wr == 0) {       
        printf("\nresx %d %d\n", res[0][0], res[0][1]);
        printf("resy %d %d\n\n", res[1][0], res[1][1]);
        for(int i = 0; i < 2; i++)
            free(res[i]);
        free(res);
    }

    #if is_mpi
        MPI_Barrier(MPI_COMM_WORLD);
        MPI_Finalize();
    #endif
}

пс: я нашел базовое решение c для хранения всех результатов в массив, чем искать минимум определенного столбца и чем хранить всю его строку. Мы можем считать этот пост решенным.

...