Как я могу парализовать эту функцию с помощью OpenMP без условий гонки или ложного обмена? - PullRequest
0 голосов
/ 15 ноября 2018

Мне нужно парализовать одну функцию без условий гонки или ложного обмена. Я перепробовал много способов, но пока не смог этого достичь. Функция:

__inline static
void calculateClusterCentroIDs(int numCoords, int numObjs, int numClusters, float * dataSetMatrix, int * clusterAssignmentCurrent, float *clustersCentroID) {
    int * clusterMemberCount = (int *) calloc (numClusters,sizeof(float));

    // sum all points
    // for every point
    for (int i = 0; i < numObjs; ++i) {
        // which cluster is it in?
        int activeCluster = clusterAssignmentCurrent[i];

        // update count of members in that cluster
        ++clusterMemberCount[activeCluster];

        // sum point coordinates for finding centroid
        for (int j = 0; j < numCoords; ++j)
            clustersCentroID[activeCluster*numCoords + j] += dataSetMatrix[i*numCoords + j];
    }


    // now divide each coordinate sum by number of members to find mean/centroid
    // for each cluster
    for (int i = 0; i < numClusters; ++i) {
        if (clusterMemberCount[i] != 0)
            // for each coordinate
            for (int j = 0; j < numCoords; ++j)
                clustersCentroID[i*numCoords + j] /= clusterMemberCount[i];  /// XXXX will divide by zero here for any empty clusters!
    }

Есть идеи, как мне этого достичь?

Спасибо.

Ответы [ 3 ]

0 голосов
/ 16 ноября 2018

Вы должны указать порядок ожидаемых значений numCoords, numObjs и numClusters, поскольку от этого зависит оптимальный способ распараллеливания. В частности, numCoords важно видеть, имеет ли смысл распараллеливание / векторизация внутреннего цикла по координатам; Например, вы берете трехмерные координаты или 1000 измерений?

Еще одна попытка с недостатком оператора if в первом цикле (плохо для производительности), статического расписания (возможный дисбаланс нагрузки), но каждый поток увеличивает непрерывные части clusterMemberCount и clustersCentroID, что ограничивает риск ложный обмен.

#ifdef _OPENMP
   #include <omp.h>
#else
   #define omp_get_num_threads() 1
   #define omp_get_thread_num() 0
#endif


__inline static
void calculateClusterCentroIDs(int numCoords, int numObjs, int numClusters, float * dataSetMatrix, int * clusterAssignmentCurrent, float *clustersCentroID) {
    int * clusterMemberCount = (int *) calloc (numClusters,sizeof(float));
    // sum all points
    // for every point

    #pragma omp parallel
    {
        int nbOfThreads = omp_get_num_threads();
        int thisThread = omp_get_thread_num();
        // Schedule for the first step : process only cluster with ID in the [from , to[ range
        int clustFrom = (thisThread*numClusters)/nbOfThreads;
        int clustTo   = (thisThread+1 == nbOfThreads) ? numClusters : ((thisThread+1)*numClusters)/nbOfThreads;

        // Each thread will loop through all values of numObjs but only process them depending on activeCluster
        // The loop is skipped only if the thread was assigned no cluster
        if (clustTo>clustFrom){
            for (int i = 0; i < numObjs; ++i) {
                // which cluster is it in?
                int activeCluster = clusterAssignmentCurrent[i];

                if (activeCluster>=clustFrom && activeCluster<clustTo){
                    // update count of members in that cluster
                    ++clusterMemberCount[activeCluster];

                    // sum point coordinates for finding centroid
                    for (int j = 0; j < numCoords; ++j)
                        clustersCentroID[activeCluster*numCoords + j] += dataSetMatrix[i*numCoords + j];
                }
            }
        }

        #pragma omp barrier

        // now divide each coordinate sum by number of members to find mean/centroid
        // for each cluster
        #pragma omp for // straightforward
        for (int i = 0; i < numClusters; ++i) {
            if (clusterMemberCount[i] != 0)
                // for each coordinate
                for (int j = 0; j < numCoords; ++j)
                    clustersCentroID[i*numCoords + j] /= clusterMemberCount[i];  /// XXXX will divide by zero here for any empty clusters!
        }
    }
    free(clusterMemberCount);
}
0 голосов
/ 17 ноября 2018

Добавление к моему комментарию: ++clusterMemberCount[activeCluster] формирует гистограмму и является проблематичным, когда два потока пытаются обновить один и тот же элемент (или соседний элемент, совместно использующий строку кэша).Это должно быть извлечено из цикла либо как последовательная часть, либо должно быть распараллелено, имея отдельную копию гистограммы для каждого потока, а затем объединено.

Вы можете легко отделить эту часть от первой параллелиloop.

// Make the histogram
for (int i = 0; i < numObjs; ++i) {
    int activeCluster = clusterAssignmentCurrent[i];
    ++clusterMemberCount[activeCluster];
}

Затем обработайте все, используя параллелизм

// parallel processing
#pragma omp parallel for
for (int i = 0; i < numObjs; ++i) {
    int activeCluster = clusterAssignmentCurrent[i];
    for (int j = 0; j < numCoords; ++j)
        clustersCentroID[activeCluster*numCoords + j] += dataSetMatrix[i*numCoords + j];
}

Во второй раз возможное ложное совместное использование, когда numCoords * sizeof(clustersCentroID[0]) % 64 != 0 принимает 64-байтовую строку кэша.Это может быть смягчено путем перераспределения clusterSCentroID в полные кратные 64 байта.

// Loop for numCoords, but index by numCoordsX
for (int j = 0; j < numCoords; ++j)
    clustersCentroID[activeCluster*numCoordsX + j] += dataSetMatrix[i*numCoords + j];
0 голосов
/ 16 ноября 2018

Это довольно просто

// sum all points
// for every point
for (int i = 0; i < numObjs; ++i) {
    // which cluster is it in?
    int activeCluster = clusterAssignmentCurrent[i];

    // update count of members in that cluster
    ++clusterMemberCount[activeCluster];

    // sum point coordinates for finding centroid
    #pragma omp parallel for
    for (int j = 0; j < numCoords; ++j)
        clustersCentroID[activeCluster*numCoords + j] += dataSetMatrix[i*numCoords + j];
}

Внутренний цикл идеально подходит для распараллеливания, поскольку все записи происходят с различными элементами clustersCentroID.Вы можете с уверенностью предположить, что в расписании по умолчанию не будет значительного ложного обмена, обычно оно имеет достаточно большие фрагменты.Просто не попробуйте что-то вроде schedule(static,1).

Внешний цикл не так легко распараллелить.Вы можете использовать сокращение на clusterMemberCount и clusterMemberCount или сделать что-то вроде:

#pragma omp parallel // note NO for
for (int i = 0; i < numObjs; ++i) {
    int activeCluster = clusterAssignmentCurrent[i];
    // ensure that exactly one thread works on each cluster
    if (activeCluster % omp_num_threads() != omp_get_thread_num()) continue;

Делайте это только в том случае, если простое решение не дает достаточной производительности.

Другоецикл также прост

#pragma omp parallel for
for (int i = 0; i < numClusters; ++i) {
    if (clusterMemberCount[i] != 0)
        // for each coordinate
        for (int j = 0; j < numCoords; ++j)
            clustersCentroID[i*numCoords + j] /= clusterMemberCount[i];
}

Опять же, доступ к данным совершенно изолирован как с точки зрения корректности, так и, за исключением крайних случаев, ложного совместного использования.

...