Создание счетчика, который остается синхронизированным между процессами MPI - PullRequest
6 голосов
/ 09 февраля 2011

У меня довольно большой опыт использования базовых методов связи и группового MPI2, и я выполняю довольно много смущающей параллельной симуляции с использованием MPI.До сих пор я структурировал свой код так, чтобы он имел узел диспетчеризации и группу рабочих узлов.Диспетчерский узел имеет список файлов параметров, которые будут запускаться с симулятором.Он заполняет каждый рабочий узел файлом параметров.Рабочие узлы запускают моделирование, а затем запрашивают другой файл параметров, который предоставляет узел диспетчеризации.После запуска всех файлов параметров узел диспетчеризации завершает работу каждого рабочего узла перед тем, как отключиться.

Файлы параметров обычно называются "Par_N.txt", где N - целое число идентификации (например, -N =1-1000).Поэтому я подумал, что если бы я мог создать счетчик и мог бы синхронизировать этот счетчик по всем моим узлам, я бы избавился от необходимости иметь диспетчерский узел и сделал систему немного проще.Как бы просто это ни звучало в теории, на практике я подозреваю, что это немного сложнее, поскольку мне нужно было бы убедиться, что счетчик заблокирован во время изменения, и т. Д. И я подумал, что MPI может иметь встроенный способсправиться с этим.Какие-нибудь мысли?Я задумываюсь над этим?

Ответы [ 4 ]

10 голосов
/ 10 февраля 2011

Реализация общего счетчика не тривиальна, но как только вы сделаете это и поместите его в библиотеку, вы можете сделать с ним лот .

В ИспользованиеКнига MPI-2 , которую вы должны будете отдать, если собираетесь реализовать этот материал, один из примеров (код доступен онлайн ) - это общий счетчик.«Не масштабируемый» должен хорошо обрабатывать несколько десятков процессов - счетчик представляет собой массив целых чисел 0..size-1, по одному на ранг, а затем операция `get next work item # 'состоит изблокировка окна, чтение вклада всех остальных в счетчик (в данном случае, сколько предметов они забрали), обновление своего (++), закрытие окна и вычисление итогов.Все это делается с помощью пассивных односторонних операций.(Лучше масштабируемый использует только дерево, а не 1-й массив).

Таким образом, вы можете использовать, скажем, ранг 0 для размещения счетчика, и все продолжают выполнять рабочие единицы и обновлять счетчик, чтобы получить следующий, пока не останется больше работы;затем вы ждете у барьера или чего-то еще и завершаете.

Когда у вас есть что-то вроде этого - использование общего значения для получения следующей доступной рабочей единицы - работы, то вы можете обобщить на более сложный подход.Поэтому, как предположил Сузтерпатт, все, кто брал «свою долю» рабочих единиц на старте, прекрасно работают, но что делать, если некоторые заканчивают быстрее других?Обычный ответ сейчас - воровство работы;каждый хранит свой список рабочих единиц в очереди, а затем, когда у кого-то заканчивается работа, он крадет рабочие единицы с другого конца чьей-то очереди, пока не останется больше работы.Это действительно полностью распределенная версия master-worker, где больше нет единой мастер-секции для работы с разделами.Как только у вас будет работать один общий счетчик, вы можете создавать из него мьютексы, и из этого вы можете создавать очередь.Но если простой разделяемый счетчик работает достаточно хорошо, вам, возможно, не нужно идти туда.

Обновление: Хорошо, так что вот хакерская попытка создания общего счетчика - моя версияпростой в книге MPI-2: кажется, работает, но я бы не сказал ничего более сильного, чем это (давно не играл с этим).Существует простая реализация счетчика (соответствующая немасштабирующей версии в книге MPI-2) с двумя простыми тестами, один из которых примерно соответствует вашему рабочему сценарию;каждый элемент обновляет счетчик, чтобы получить рабочий элемент, затем выполняет «работу» (спит случайное количество времени).В конце каждого теста распечатывается структура данных счетчика, которая представляет собой число приращений, выполненных каждым рангом.

#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>

struct mpi_counter_t {
    MPI_Win win;
    int  hostrank ;
    int  myval;
    int *data;
    int rank, size;
};

struct mpi_counter_t *create_counter(int hostrank) {
    struct mpi_counter_t *count;

    count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t));
    count->hostrank = hostrank;
    MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank));
    MPI_Comm_size(MPI_COMM_WORLD, &(count->size));

    if (count->rank == hostrank) {
        MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->data));
        for (int i=0; i<count->size; i++) count->data[i] = 0;
        MPI_Win_create(count->data, count->size * sizeof(int), sizeof(int),
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
    } else {
        count->data = NULL;
        MPI_Win_create(count->data, 0, 1,
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
    }
    count -> myval = 0;

    return count;
}

int increment_counter(struct mpi_counter_t *count, int increment) {
    int *vals = (int *)malloc( count->size * sizeof(int) );
    int val;

    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);

    for (int i=0; i<count->size; i++) {

        if (i == count->rank) {
            MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,
                           count->win);
        } else {
            MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
        }
    }

    MPI_Win_unlock(0, count->win);
    count->myval += increment;

    vals[count->rank] = count->myval;
    val = 0;
    for (int i=0; i<count->size; i++)
        val += vals[i];

    free(vals);
    return val;
}

void delete_counter(struct mpi_counter_t **count) {
    if ((*count)->rank == (*count)->hostrank) {
        MPI_Free_mem((*count)->data);
    }
    MPI_Win_free(&((*count)->win));
    free((*count));
    *count = NULL;

    return;
}

void print_counter(struct mpi_counter_t *count) {
    if (count->rank == count->hostrank) {
        for (int i=0; i<count->size; i++) {
            printf("%2d ", count->data[i]);
        }
        puts("");
    }
}

int test1() {
    struct mpi_counter_t *c;
    int rank;
    int result;

    c = create_counter(0);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    result = increment_counter(c, 1);
    printf("%d got counter %d\n", rank, result);

    MPI_Barrier(MPI_COMM_WORLD);
    print_counter(c);
    delete_counter(&c);
}


int test2() {
    const int WORKITEMS=50;

    struct mpi_counter_t *c;
    int rank;
    int result = 0;

    c = create_counter(0);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    srandom(rank);

    while (result < WORKITEMS) {
        result = increment_counter(c, 1);
        if (result <= WORKITEMS) {
             printf("%d working on item %d...\n", rank, result);
             sleep(random() % 10);
         } else {
             printf("%d done\n", rank);
         }
    }

    MPI_Barrier(MPI_COMM_WORLD);
    print_counter(c);
    delete_counter(&c);
}

int main(int argc, char **argv) {

    MPI_Init(&argc, &argv);

    test1();
    test2();

    MPI_Finalize();
}
3 голосов
/ 09 февраля 2011

Я не могу придумать какой-либо встроенный механизм для решения этой проблемы, вам придется реализовать его вручную. Судя по вашим комментариям, вы хотите децентрализовать программу, и в этом случае каждый процесс (или, по крайней мере, группы процессов) должен будет сохранять свои значения счетчика и синхронизировать его. Вероятно, это можно сделать с умным использованием неблокирующих отправок / приемов, но их семантика не тривиальна.

Вместо этого я решил бы проблему насыщения, просто выпуская несколько файлов одновременно для рабочих процессов. Это уменьшит сетевой трафик и позволит вам сохранить простую настройку одного диспетчера.

0 голосов
/ 10 февраля 2011

Непонятно, нужно ли просматривать файлы в строгом порядке или нет. Если нет, то почему бы просто не каждому узлу i обрабатывать все файлы, где N % total_workers == i - то есть циклическое распределение работы?

0 голосов
/ 10 февраля 2011

Похоже, вы используете свой диспетчерский узел для динамического распределения нагрузки (назначение работы процессорам, когда они станут доступны).Общий счетчик, который не требует остановки всех процессоров, этого не сделает.Я бы порекомендовал оставаться с тем, что у вас есть сейчас, или делать то, что предлагает Сузтерпатт, отправлять пакеты файлов за раз.

...