Как оптимизировать переупорядочение массива? - PullRequest
0 голосов
/ 15 ноября 2018

Я хочу оптимизировать переупорядочение некоторых массивов данных, содержащих около 4 миллионов шорт без знака. Цель состоит в том, чтобы обработать поток данных, приведя значения, которые должны быть похожи друг на друга, чтобы быть близкими друг к другу. Псевдокод выглядит так:

  for( i=0; i<n; i++)
    dest[i] = src[ idx[i] ] ;

Чтобы оптимизировать код для определенного списка idx[i] Я попытался скомпилировать 4-миллионную строку c функцией с заполненными значениями idx:

void reorder( unsigned short * restrict i, unsigned short * restrict o) {
  o[0]=i[2075723];
  o[1]=i[2075724];
  o[2]=i[2075722];
  ...
  o[4194301]=i[4192257];
  o[4194302]=i[4192256];
  o[4194303]=i[4190208];
 }

Я надеялся, что GCC создаст умный поток инструкций pshufw / pblend / unpack ... вместо этого он зависает после использования большого количества памяти (7 ГБ). Я пытался сделать копию, основанную на копии, чтобы избежать сложностей, связанных с заменой.

Кто-нибудь сможет предложить хорошие способы создания оптимизированного кода для этой проблемы? Пока что попробовал:

  • упорядоченное чтение, случайная запись: 60 мс (openmp не помогло)
  • заказное письмо, случайное чтение: 20 мс (openmp -> 4 мс)

Я надеялся в конечном итоге приблизиться к пропускной способности памяти (порядка 0,4 мс). Схема, которая учитывает размер кэша и выполняет блокировку, должна помочь, но я не знаю, с чего начать для разработки такой. Мне также интересно, есть ли простой способ использовать инструкции SIMD?

Делая игрушечный пример с транспонированием, я даже не смог получить gcc для вывода SIMD-версии, см .:

https://godbolt.org/z/bzGWad

Это сложная проблема для компиляторов или я упускаю что-то простое?

Редактировать 21/11/2018 Добавлен полный, но минимальный пример проблемы

Вот полный пример проблемы, которую я пытаюсь оптимизировать. На самом деле упорядочение является более сложной функцией, но суть в том, чтобы упорядочить пиксели данных в соответствии с их расстоянием от центра изображения, как разматывание спирали.

#include <omp.h>
#include <vector>
#include <cmath>
#include <iostream>
#include <iomanip>
#include <algorithm>

#define N 2048

// Sorting on output, one core
void reorder_simple( const std::vector<size_t> &indices,
             const unsigned short input[],
             unsigned short output[]){
  for( int i=0; i<N*N; i++)
    output[i] = input[ indices[i] ];
}
// Sorting on output write, many cores
void reorder_omp( const std::vector<size_t> &indices,
          const unsigned short input[],
          unsigned short output[]){
#pragma omp parallel for
  for( int i=0; i<N*N; i++)
    output[i] = input[ indices[i] ];
}
// Benchmark for memory throughput, one core
void copy_simple(  const std::vector<size_t> &indices,
           const unsigned short input[],
           unsigned short output[]){
  for( int i=0; i<N*N; i++)
    output[i] = input[i];
}
// Benchmark for memory throughput, many cores
void copy_omp (  const std::vector<size_t> &indices,
         const unsigned short input[],
         unsigned short output[]){
#pragma omp parallel for
  for( int i=0; i<N*N; i++)
    output[i] = input[i];
}

// Macro to avoid retyping
#define bench(func)                                          \
  func( indices, input, output);                             \
  start = omp_get_wtime();                                   \
  for( size_t i=0; i<100; i++)                               \
      func( indices, input, output );                        \
  end =  omp_get_wtime();                                    \
  std:: cout << std::setw(15) << #func <<                    \
     ", Time taken: "  << (end-start)/100 << " /s\n";

int main()
{
  std::vector<float> sort_order(N*N);
  std::vector<size_t> indices(N*N);
  float radius, azimuth, ci, cj;
  double start, end;
  unsigned short *input, *output;

  ci = N*0.496;  // changes according to calibration
  cj = N*0.4985;  // reality is more complicated (tilts etc)
  for( size_t i=0; i<N; i++){
    for( size_t j=0; j<N; j++){
      radius  = sqrt( (i-ci)*(i-ci) + (j-cj)*(j-cj) );
      azimuth = atan2( i-ci, j-cj ); // from -pi to pi
      sort_order[i*N+j] = round( radius ) + azimuth/2/M_PI;
      indices[i*N+j] = i*N+j;
    }
  }
  // Find the order to sort data onto a spiral 
  std::sort( indices.begin(), indices.end(),
         [&sort_order](int i, int j){
           return sort_order[i] < sort_order[j]; });
  // Invent some test data
  input = new unsigned short [N*N];
  output = new unsigned short [N*N];
  for( size_t i=0 ; i<N*N; i++){
    input[i] = i;
    output[i]= 0;
  }
  // some timing:
  bench(reorder_simple);
  bench(reorder_omp)   ;
  bench(copy_simple)   ;
  bench(copy_omp)      ;
}


   % g++ reorder.cpp -o reorder -std=c++11 -O3 -march=native -fopenmp -Wall
   % ./reorder
     reorder_simple, Time taken: 0.0179023 /s
        reorder_omp, Time taken: 0.00349932 /s
        copy_simple, Time taken: 0.00140805 /s
           copy_omp, Time taken: 0.000250205 /s

Я бы хотел, чтобы функция reorder_omp была ближе к скорости функции copy_omp. Детекторы могут работать со скоростью 500 кадров в секунду, поэтому 3,5 мс - это плохо по сравнению с 0,25 мс.

Снова отредактируйте: 21/11/2018 код для написания функции, которая не компилируется

  //top of file
  #include <fstream>  
  ...
  //just before the end: 
  std::ofstream out;
  out.open("cfunc.c");
  out << "void cfunc( unsigned short * restrict input,\n" <<
         "            unsigned short * restrict output){ \n"; 
  for(int i=0;i<N;i++)
    for(int j=0;j<N;j++)
      out << "output[" << i*N+j << "] = input[" << indices[i*N+j] << "];\n";
  out << "}\n";
  out.close();

Тестируя это на другой машине, я получаю ошибки компилятора как от gcc (7.3.0), так и от clang (6.0.0). Он компилируется и запускается с tcc (0.9.27), но заканчивается медленнее, чем зацикливание индексов.

1 Ответ

0 голосов
/ 21 ноября 2018

(секция комментариев слишком короткая)

Я бы протестировал следующую идею:

  1. Ведение таблицы обратного индекса, чтобы наивный алгоритм стал:

     for (i = 0; i<n; i++) {
       dest[index[i]] = src[i];
     }
    
  2. Вместо использования простого алгоритма:

    2.1 Создание временного массива пар (значение, destindex)

    struct pair {
      int value;
      int destindex;
    };
    for (i = 0; i < n; i++) {
      pairs[i] = {.value=src[i], .destindex=index[i]};
    }
    

    2.2 Использование слияния или быстрой сортировки длясортировать массив пар по .destindex полю

    2.3. Копировать значения из массива пар в dest массив

В этом алгоритме нет случайных обращений и, следовательно, нетошибки страницы с произвольным доступом.Однако я не уверен, что он будет работать лучше, чем простой алгоритм из-за большого количества линейных проходов.

...