Непонятное поведение в простой программе отправки / получения MPI - PullRequest
1 голос
/ 22 октября 2011

У меня уже давно есть ошибка в моем коде, и я пока не могу понять, как ее устранить.

То, чего я пытаюсь достичь, достаточно просто: каждый рабочий узел (т.е.узел с рангом! = 0) получает строку (представленную одномерным arry) в квадратной структуре, которая включает в себя некоторые вычисления.Как только вычисления завершены, эта строка отправляется обратно мастеру.

Для целей тестирования вычисления не используются.Все, что происходит:

  • мастер отправляет номер строки работнику, работник использует номер строки для вычисления соответствующих значений
  • работник отправляет массив со значениями результата обратно

Теперь моя проблема заключается в следующем:

  • все работает, как ожидается, до определенного размера для количества элементов в строке (размер = 1006) и количества рабочих> 1
  • если количество элементов в строке превышает 1006, рабочие не могут завершить работу и программа не завершает работу
  • , это происходит только в том случае, если я пытаюсь отправить массив обратно мастеру.Если я просто отправлю обратно INT, то все в порядке (см. Закомментированную строку в doMasterTasks () и doWorkerTasks ())

Исходя из последней точки маркера, я предполагаю, что должна быть какая-то гонка-условие, которое появляется только тогда, когда массив, подлежащий отправке обратно мастеру, достигает определенного размера.

Есть ли у вас какие-либо идеи, в чем может быть проблема?

Скомпилируйте следующий код с помощью: mpicc-O2 -std = c99 -o простой

Запустить исполняемый файл следующим образом: mpirun -np 3 simple (например, 1006 или 1007)

Вот код:

#include "mpi.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#define MASTER_RANK 0
#define TAG_RESULT 1
#define TAG_ROW 2
#define TAG_FINISHOFF 3

int mpi_call_result, my_rank, dimension, np;

// forward declarations
void doInitWork(int argc, char **argv);
void doMasterTasks(int argc, char **argv);
void doWorkerTasks(void);
void finalize();
void quit(const char *msg, int mpi_call_result);

void shutdownWorkers() {
    printf("All work has been done, shutting down clients now.\n");
    for (int i = 0; i < np; i++) {
        MPI_Send(0, 0, MPI_INT, i, TAG_FINISHOFF, MPI_COMM_WORLD);
    }
}

void doMasterTasks(int argc, char **argv) {
    printf("Starting to distribute work...\n");
    int size = dimension;
    int * dataBuffer = (int *) malloc(sizeof(int) * size);

    int currentRow = 0;
    int receivedRow = -1;
    int rowsLeft = dimension;
    MPI_Status status;

    for (int i = 1; i < np; i++) {
        MPI_Send(&currentRow, 1, MPI_INT, i, TAG_ROW, MPI_COMM_WORLD);
        rowsLeft--;
        currentRow++;

    }

    for (;;) {
//        MPI_Recv(dataBuffer, size, MPI_INT, MPI_ANY_SOURCE, TAG_RESULT, MPI_COMM_WORLD, &status);
        MPI_Recv(&receivedRow, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);

        if (rowsLeft == 0)
            break;

        if (currentRow > 1004)
            printf("Sending row %d to worker %d\n", currentRow, status.MPI_SOURCE);
        MPI_Send(&currentRow, 1, MPI_INT, status.MPI_SOURCE, TAG_ROW, MPI_COMM_WORLD);
        rowsLeft--;
        currentRow++;
    }
    shutdownWorkers();
    free(dataBuffer);
}

void doWorkerTasks() {
    printf("Worker %d started\n", my_rank);

    // send the processed row back as the first element in the colours array.
    int size = dimension;
    int * data = (int *) malloc(sizeof(int) * size);
    memset(data, 0, sizeof(size));

    int processingRow = -1;
    MPI_Status status;

    for (;;) {

        MPI_Recv(&processingRow, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
        if (status.MPI_TAG == TAG_FINISHOFF) {
            printf("Finish-OFF tag received!\n");
            break;
        } else {
//            MPI_Send(data, size, MPI_INT, 0, TAG_RESULT, MPI_COMM_WORLD);
            MPI_Send(&processingRow, 1, MPI_INT, 0, TAG_RESULT, MPI_COMM_WORLD);
        }
    }

    printf("Slave %d finished work\n", my_rank);
    free(data);
}

int main(int argc, char **argv) {


    if (argc == 2) {
        sscanf(argv[1], "%d", &dimension);
    } else {
        dimension = 1000;
    }

    doInitWork(argc, argv);

    if (my_rank == MASTER_RANK) {
        doMasterTasks(argc, argv);
    } else {
        doWorkerTasks();
    }
    finalize();
}

void quit(const char *msg, int mpi_call_result) {
    printf("\n%s\n", msg);
    MPI_Abort(MPI_COMM_WORLD, mpi_call_result);
    exit(mpi_call_result);
}

void finalize() {
    mpi_call_result = MPI_Finalize();
    if (mpi_call_result != 0) {
        quit("Finalizing the MPI system failed, aborting now...", mpi_call_result);
    }
}

void doInitWork(int argc, char **argv) {
    mpi_call_result = MPI_Init(&argc, &argv);
    if (mpi_call_result != 0) {
        quit("Error while initializing the system. Aborting now...\n", mpi_call_result);
    }
    MPI_Comm_size(MPI_COMM_WORLD, &np);
    MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
}

Любая помощь очень ценится!

Best, Крис

1 Ответ

5 голосов
/ 22 октября 2011

Если вы посмотрите на свои задачи doWorkerTasks, вы увидите, что они отправляют ровно столько сообщений с данными, сколько они получают; (и они получают еще один, чтобы закрыть их).

Но ваш мастер-код:

for (int i = 1; i < np; i++) {
    MPI_Send(&currentRow, 1, MPI_INT, i, TAG_ROW, MPI_COMM_WORLD);
    rowsLeft--;
    currentRow++;

}

for (;;) {
    MPI_Recv(dataBuffer, size, MPI_INT, MPI_ANY_SOURCE, TAG_RESULT, MPI_COMM_WORLD, &status);

    if (rowsLeft == 0)
        break;

    MPI_Send(&currentRow, 1, MPI_INT, status.MPI_SOURCE, TAG_ROW, MPI_COMM_WORLD);
    rowsLeft--;
    currentRow++;
}

отправляет np-2 больше сообщений с данными, чем получает. В частности, он продолжает получать данные до тех пор, пока ему больше не нужно отправлять , хотя должно быть еще np-2 ожидающих сообщений данных. Меняем код на следующее:

int rowsLeftToSend= dimension;
int rowsLeftToReceive = dimension;

for (int i = 1; i < np; i++) {
    MPI_Send(&currentRow, 1, MPI_INT, i, TAG_ROW, MPI_COMM_WORLD);
    rowsLeftToSend--;
    currentRow++;

}

while (rowsLeftToReceive > 0) {
    MPI_Recv(dataBuffer, size, MPI_INT, MPI_ANY_SOURCE, TAG_RESULT, MPI_COMM_WORLD, &status);
    rowsLeftToReceive--;

    if (rowsLeftToSend> 0) {
        if (currentRow > 1004)
            printf("Sending row %d to worker %d\n", currentRow, status.MPI_SOURCE);
        MPI_Send(&currentRow, 1, MPI_INT, status.MPI_SOURCE, TAG_ROW, MPI_COMM_WORLD);
        rowsLeftToSend--;
        currentRow++;
    }
}

Сейчас работает.

Почему код не блокируется (обратите внимание, что это тупик, а не условие гонки; это более распространенная параллельная ошибка в распределенных вычислениях) для сообщений меньшего размера - тонкая деталь того, как работает большинство реализаций MPI. Как правило, реализации MPI просто «проталкивают» небольшие сообщения по каналу независимо от того, готов ли получатель к ним или нет, но более крупные сообщения (поскольку они занимают больше ресурсов хранения на принимающей стороне) требуют некоторого рукопожатия между отправителем и получателем. (Если вы хотите узнать больше, ищите протоколы eager vs rendezvous).

Таким образом, для случая с маленьким сообщением (в этом случае менее 1006 дюймов, и 1 int тоже работает) рабочие узлы отправляли свои сообщения независимо от того, получал ли их мастер. Если бы мастер имел , вызванный MPI_Recv (), сообщения уже были бы там, и он немедленно вернулся бы. Но это не так, поэтому на главной стороне находились ожидающие сообщения; но это не имело значения. Мастер разослал свои сообщения об уничтожении, и все вышли.

Но для больших сообщений оставшимся send () необходимо, чтобы получатель принимал участие в очистке, а поскольку получатель никогда не делает этого, остальные работники зависают.

Обратите внимание, что даже для небольшого сообщения, в котором не было тупиков, код не работал должным образом - отсутствовали вычисленные данные.

Обновление : в вашем shutdownWorkers была похожая проблема:

void shutdownWorkers() {
    printf("All work has been done, shutting down clients now.\n");
    for (int i = 0; i < np; i++) {
        MPI_Send(0, 0, MPI_INT, i, TAG_FINISHOFF, MPI_COMM_WORLD);
    }
}

Здесь вы отправляете всем процессам, включая ранг 0, тот, который выполняет отправку. В принципе, этот MPI_Send должен взаимоблокировать, поскольку это блокирующая отправка, и соответствующая запись уже не опубликована. Вы могли бы опубликовать неблокирующее получение раньше, чтобы избежать этого, но это не нужно - ранг 0 не должен сообщать о себе до конца. Так что просто измените цикл на

    for (int i = 1; i < np; i++)

tl; dr - ваш код заблокирован, потому что мастер не получил достаточно сообщений от рабочих; это случилось для сообщений небольшого размера из-за деталей реализации, общих для большинства библиотек MPI.

...