Какова лучшая стратегия реализации? - PullRequest
2 голосов
/ 12 июля 2011

Этот вопрос о лучшей стратегии для реализации следующего моделирования в C ++.

Я пытаюсь сделать моделирование как часть исследовательского проекта по физике, который в основном отслеживает динамику цепочкиузлы в пространстве.Каждый узел содержит позицию вместе с определенными параметрами (локальная кривизна, скорость, расстояние до соседей и т. Д.), Которые все эволюционируют через время.

Каждый шаг по времени может быть разбит на эти четыре части:

  • Рассчитать локальные параметры.Значения зависят от ближайших соседей в цепочке.
  • Рассчитать глобальные параметры.
  • Развитие.Положение каждого узла перемещается на небольшую величину, в зависимости от глобальных и локальных параметров и некоторых случайных силовых полей.
  • Заполнение.Новые узлы вставляются, если расстояние между двумя последовательными узлами достигает критического значения.

(Кроме того, узлы могут застрять, что делает их неактивными до конца симуляции. Локальные параметры неактивных узлов с неактивными соседями не изменятся и не требуют дополнительных вычислений..)

Каждый узел содержит ~ 60 байтов, у меня ~ 100 000 узлов в цепочке, и мне нужно развить цепочку приблизительно за ~ 1 000 000 временных шагов.Однако я хотел бы максимизировать эти числа, так как это повысило бы точность моего моделирования, но с ограничением, что моделирование выполняется в разумные сроки (~ часы).(~ 30% узлов будут неактивны.)

Я начал реализовывать это моделирование в виде двусвязного списка в C ++.Это кажется естественным, поскольку мне нужно вставить новые узлы между существующими и потому, что локальные параметры зависят от ближайших соседей.(Я добавил дополнительный указатель на следующий активный узел, чтобы избежать ненужных вычислений всякий раз, когда я зацикливаюсь по всей цепочке).

Я не эксперт, когда дело доходит до распараллеливания (или кодирования в этом отношении),но я играл с OpenMP, и мне действительно нравится, как я могу ускорить цикл независимых операций с двумя строками кода.Я не знаю, как заставить мой связанный список делать что-то параллельно, или он вообще работает (?).Так что у меня появилась идея работать с вектором stl.Где вместо того, чтобы иметь указатели на ближайших соседей, я мог хранить индексы соседей и получать к ним доступ с помощью стандартного поиска.Я также мог бы отсортировать вектор по позиции цепочки (каждый x-й шаг), чтобы получить лучшую локальность в памяти.Этот подход позволил бы зацикливаться на OpenMP.

Я несколько испуган этой идеей, поскольку мне не приходится иметь дело с управлением памятью.И я предполагаю, что реализация вектора stl намного лучше, чем мой простой способ «нового» и «удаления» для работы с узлами в списке.Я знаю, что мог бы сделать то же самое со списками stl, но мне не нравится способ доступа к ближайшим соседям с помощью итераторов.

Поэтому я спрашиваю вас, 1337 h4x0r и опытных программистов, что будетлучший дизайн для моей симуляции?Описан ли векторный подход выше хорошей идеи?Или есть какие-то хитрости для воспроизведения в связанном списке, чтобы заставить их работать с OpenMP?Или я должен рассмотреть совершенно другой подход?

Симуляция будет выполняться на компьютере с 8 ядрами и 48 ГБ ОЗУ, поэтому я предполагаю, что могу обменять много памяти на скорость.

Заранее спасибо

Редактировать: Мне нужно добавлять 1-2% новых узлов каждый раз, так что их сохранение в виде вектора без индексов для ближайших соседей не будет работать, если я не отсортируювектор каждый шаг по времени.

Ответы [ 3 ]

2 голосов
/ 12 июля 2011

Это классический обменный вопрос.Использование массива или std :: vector сделает вычисления быстрее, а вставки - медленнее;использование двусвязного списка или std :: list сделает вставки быстрее, а вычисления медленнее.

Единственный способ оценить компромиссные вопросы - эмпирически;который будет работать быстрее для вашего конкретного приложения?Все, что вы можете сделать, это попробовать оба способа и посмотреть.Чем интенсивнее вычисления и чем короче трафарет (например, интенсивность вычислений - сколько флопов вы должны сделать на объем памяти, который вы должны внести), тем менее важным будет стандартный массив.Но в основном вы должны смоделировать реализацию ваших базовых вычислений в обоих направлениях и посмотреть, имеет ли это значение.Я взломал вместе очень сырой вариант с std :: vector и std :: list;это, вероятно, неправильно любым из способов, но вы можете попробовать и поиграть с некоторыми параметрами и посмотреть, какие выигрыши для вас.В моей системе, учитывая размеры и объем вычислений, список быстрее, но он может довольно легко пойти в любую сторону.

W / rt openmp, да, если вы так и собираетесь, выруки несколько связаны;вам почти наверняка придется идти с векторной структурой, но сначала вы должны убедиться, что дополнительные затраты на вставки не сгорят в пользу нескольких ядер.

#include <iostream>
#include <list>
#include <vector>
#include <cmath>
#include <sys/time.h>
using namespace std;

struct node {
    bool stuck;
    double x[2];
    double loccurve;
    double disttoprev;
};

void tick(struct timeval *t) {
    gettimeofday(t, NULL);
}

/* returns time in seconds from now to time described by t */
double tock(struct timeval *t) {
    struct timeval now;
    gettimeofday(&now, NULL);
    return (double)(now.tv_sec - t->tv_sec) +
        ((double)(now.tv_usec - t->tv_usec)/1000000.);
}

int main()
{
    const int nstart = 100;
    const int niters = 100;
    const int nevery = 30;
    const bool doPrint = false;
    list<struct node>   nodelist;
    vector<struct node> nodevect;

    // Note - vector is *much* faster if you know ahead of time 
    //  maximum size of vector
    nodevect.reserve(nstart*30);

    // Initialize
    for (int i = 0; i < nstart; i++) {
        struct node *mynode = new struct node;
        mynode->stuck = false;
        mynode->x[0] = i; mynode->x[1] = 2.*i;
        mynode->loccurve = -1;
        mynode->disttoprev = -1;

        nodelist.push_back( *mynode );
        nodevect.push_back( *mynode );
    }

    const double EPSILON = 1.e-6;
    struct timeval listclock;
    double listtime;

    tick(&listclock);
    for (int i=0; i<niters; i++) {
        // Calculate local curvature, distance

        list<struct node>::iterator prev, next, cur;
        double dx1, dx2, dy1, dy2;

        next = cur = prev = nodelist.begin();
        cur++;
        next++; next++;
        dx1 = prev->x[0]-cur->x[0];
        dy1 = prev->x[1]-cur->x[1];

        while (next != nodelist.end()) {
            dx2 = cur->x[0]-next->x[0];
            dy2 = cur->x[1]-next->x[1];

            double slope1 = (dy1/(dx1+EPSILON));
            double slope2 = (dy2/(dx2+EPSILON));

            cur->disttoprev = sqrt(dx1*dx1 + dx2*dx2 );

            cur->loccurve = ( slope1*slope2*(dy1+dy2) +
                              slope2*(prev->x[0]+cur->x[0]) -
                              slope1*(cur->x[0] +next->x[0]) ) /
                            (2.*(slope2-slope1) + EPSILON);

            next++;
            cur++;
            prev++;
        }

        // Insert interpolated pt every neveryth pt
        int count = 1;
        next = cur = nodelist.begin();
        next++;
        while (next != nodelist.end()) {
            if (count % nevery == 0) {
                struct node *mynode = new struct node;
                mynode->x[0] = (cur->x[0]+next->x[0])/2.;
                mynode->x[1] = (cur->x[1]+next->x[1])/2.;
                mynode->stuck = false;
                mynode->loccurve = -1;
                mynode->disttoprev = -1;
                nodelist.insert(next,*mynode);
            }
            next++;
            cur++;
            count++;
        }
    }
                                                               51,0-1        40%

    struct timeval vectclock;
    double vecttime;

    tick(&vectclock);
    for (int i=0; i<niters; i++) {
        int nelem = nodevect.size();
        double dx1, dy1, dx2, dy2;
        dx1 = nodevect[0].x[0]-nodevect[1].x[0];
        dy1 = nodevect[0].x[1]-nodevect[1].x[1];

        for (int elem=1; elem<nelem-1; elem++) {
            dx2 = nodevect[elem].x[0]-nodevect[elem+1].x[0];
            dy2 = nodevect[elem].x[1]-nodevect[elem+1].x[1];

            double slope1 = (dy1/(dx1+EPSILON));
            double slope2 = (dy2/(dx2+EPSILON));

            nodevect[elem].disttoprev = sqrt(dx1*dx1 + dx2*dx2 );

            nodevect[elem].loccurve = ( slope1*slope2*(dy1+dy2) +
                              slope2*(nodevect[elem-1].x[0] +
                                      nodevect[elem].x[0])  -
                              slope1*(nodevect[elem].x[0] +
                                      nodevect[elem+1].x[0]) ) /
                            (2.*(slope2-slope1) + EPSILON);

        }

        // Insert interpolated pt every neveryth pt
        int count = 1;
        vector<struct node>::iterator next, cur;
        next = cur = nodevect.begin();
        next++;
        while (next != nodevect.end()) {
            if (count % nevery == 0) {
                struct node *mynode = new struct node;
                mynode->x[0] = (cur->x[0]+next->x[0])/2.;
                mynode->x[1] = (cur->x[1]+next->x[1])/2.;
                mynode->stuck = false;
                mynode->loccurve = -1;
                mynode->disttoprev = -1;
                nodevect.insert(next,*mynode);
            }
            next++;
            cur++;
            count++;
        }
    }
    vecttime = tock(&vectclock);

    cout << "Time for list: " << listtime << endl;
    cout << "Time for vect: " << vecttime << endl;

    vector<struct node>::iterator v;
    list  <struct node>::iterator l;

    if (doPrint) {
        cout << "Vector: " << endl;
        for (v=nodevect.begin(); v!=nodevect.end(); ++v) {
             cout << "[ (" << v->x[0] << "," << v->x[1] << "), " << v->disttoprev << ", " << v->loccurve << "] " << endl;
        }

        cout << endl << "List: " << endl;
        for (l=nodelist.begin(); l!=nodelist.end(); ++l) {
             cout << "[ (" << l->x[0] << "," << l->x[1] << "), " << l->disttoprev << ", " << l->loccurve << "] " << endl;
        }

    }

    cout << "List size is " << nodelist.size() << endl;
}
1 голос
/ 12 июля 2011

Предполагая, что создание новых элементов происходит относительно редко, я бы выбрал подход отсортированного вектора по всем перечисленным вами причинам:

  • Нет необходимости тратить время на указатели / индексы в районе
  • Воспользуйтесь преимуществами пространственного местоположения
  • Намного проще распараллелить

Конечно, чтобы это работало, вам нужно убедиться, что вектор был всегда отсортирован, а не просто каждый k-й временной шаг.

0 голосов
/ 12 июля 2011

Это похоже на хорошее упражнение для студентов, занимающихся параллельным программированием.

Кажется, у вас есть структура данных, которая естественным образом ведет к распределению, цепочке. Вы можете проделать немалую работу над подцепями, которые (частично) статически назначены разным потокам. Возможно, вы захотите иметь дело с N-1 граничными случаями отдельно, но если длины подцепей> 3, то они изолированы друг от друга.

Конечно, между каждым шагом вам придется обновлять глобальные переменные, но такие переменные, как длина цепочки, являются простыми параллельными дополнениями. Просто рассчитайте длину каждой подцепи, а затем сложите их. Если длина ваших подцепей составляет 100000/8, однопоточное произведение - это добавление этих 8 длин подцепей между шагами.

Если рост узлов сильно неравномерен, вы можете периодически перебалансировать длины подцепей.

...