Как поточно-ориентированная функция, которая добавляет элементы в массив? - PullRequest
1 голос
/ 13 июля 2020

Я написал следующую функцию, которая принимает объект (element), освобождает место для него в существующем векторе (elements) и обновляет вектор, добавляя новый объект:

void addElement(const ElementType& element) {
    if (numElements == elements.size()) {
        elements.resize(boost::extents[numElements+1]);
    }

    elements[numElements] = element;

    numElements++;
}

Как я могу сделать его потокобезопасным для MPI? Насколько я понимаю, каждый поток знает размер elements, поэтому я не понимаю, почему эта функция не будет потокобезопасной. numElements инициализируется нулем вне этой функции и является размером вектора elements.

Изменить: я использую функцию, как написано выше, и блокировку и разблокировку mtx следующим образом, но все же последний elements вектор содержит данные только первого ранга.

    #pragma omp parallel for collapse(3) schedule(static)
    for (long n0 = mgr->startN0; n0 < mgr->startN0 + mgr->localN0; n0++) {
      for (int n1 = 0; n1 < Ν; n1++) {
        for (int n2 = 0; n2 < Ν; n2++) {
          ElementType element;
          std::mutex mtx;
            for(int i=0;i<g_field[n0][n1][n2];i++){
              ... do stuff with element ...
              mtx.lock();
              #pragma omp critical
              addElement(element);
              mtx.unlock();}
       }
     }
   }

Изменить: по соображениям скорости мне пришлось немного изменить функцию и ее использование:

void addElements_MPI(std::vector<ElementType> new_batch,std::mutex& mtx) {

  std::lock_guard<std::mutex> lock(mtx); 
  elements.resize(boost::extents[elements.num_elements()+new_batch.size()]); 

  std::copy(new_batch.begin(), new_batch.end(), elements.begin()+numElements); 

  numElements += new_batch.size();
}
std::mutex mtx;
std::vector<ElementType> all_elements;
#pragma omp parallel for collapse(3) schedule(static)
    for (long n0 = mgr->startN0; n0 < mgr->startN0 + mgr->localN0; n0++) {
      for (int n1 = 0; n1 < Ν; n1++) {
        for (int n2 = 0; n2 < Ν; n2++) {

          ElementType element;

            for(int i=0;i<g_field[n0][n1][n2];i++){
              ... do stuff with element ...
              mtx.lock();
              #pragma omp critical
              all_elements.push_back(element);
              mtx.unlock();}
       }
     }
   }
 mtx.lock();
 #pragma omp critical
 addElement_MPI(all_elements,mtx);
 mtx.unlock();

1 Ответ

5 голосов
/ 13 июля 2020
• 1000 потоки не используют никаких эксклюзивных блокировок для управления своим доступом к этой памяти.

В вашем случае один поток может добавить элемент, а другой поток читает elements. Кроме того, несколько потоков могут одновременно добавлять новые элементы. Чтобы контролировать их доступ, вы должны заблокировать общий ресурс, т.е. elements.

Как заблокировать общий ресурс в C ++?

В C ++ вы можете использовать std::mutex для защиты общих данных от одновременного доступа нескольких потоков. Когда std::mutex заблокирован одним потоком, другие потоки не могут получить доступ к общему ресурсу. Как только std::mutex разблокирован, другие потоки могут получить доступ к ресурсу. Вы можете использовать std::mutex::lock и std::mutex::unlock например:

std::mutex mtx;
ElementsType elements;

...
mtx.lock();

// only one thread accesses this part at a time
// work with elements

mtx.unlock();

Разработчики часто что-то забыли ...

Так как разработчики часто что-то забывали ... например, разблокировка заблокированный std::mutex ... C ++ предоставляет std::lock_guard (в случае C ++ 11) и std::scoped_lock (в случае C ++ 17). Вы можете думать о std::lock_guard и std::scoped_lock как о каких-то оболочках, которые принимают экземпляр std::mutex в качестве параметра конструктора и блокируют этот экземпляр std::mutex в конструкторе. При уничтожении экземпляра std::lock_guard или std::scoped_lock экземпляр std::mutex автоматически разблокируется.

void addElement(const ElementType& element) {
    std::lock_guard<std::mutex> lock(mtx); // similar to mtx.lock()

    if (numElements == elements.size()) {
        elements.resize(boost::extents[numElements+1]);
    }

    elements[numElements] = element;

    numElements++;

    // no need for mtx.unlock() since lock instance is now destructed and mutex is automatically unlocked
}

EDIT

Поскольку std::mutex - это своего рода канала связи между всеми потоками, все потоки должны использовать один и тот же экземпляр std::mutex, то есть один и тот же экземпляр std::mutex должен быть виден всем потокам.

В случае ниже (который я взял из обновленный вопрос):

for (int n2 = 0; n2 < Ν; n2++) {
    ElementType element;
    std::mutex mtx;
    for(int i=0;i<g_field[n0][n1][n2];i++){
         // ... do stuff with element ...
         mtx.lock();
         #pragma omp critical
         addElement(element);
         mtx.unlock();}
    }
}

std::mutex создается N раз, и каждый поток создает свой собственный std::mutex экземпляр, который не имеет никакого смысла, потому что они не могут общаться друг с другом. Вместо этого один экземпляр std::mutex должен быть виден всем потокам, так же как elements видны всем потокам.

...