Найдите эффективный способ выполнить МАКС, байт на байт, на 2 огромных буфера - PullRequest
1 голос
/ 06 февраля 2020

Мне нужно очень быстро сравнить 9 миллионов байтов, чтобы сохранить максимум каждого байта. Вот что я делаю:

int bufSize = 9000000;
byte_t *buf = /* ... */;
byte_t *maxBuf = /* ... */;

for (int i = 0; i < bufSize; ++i) {
  if (buf[i] > maxBuf[i]) {
    maxBuf[i] = buf[i];
  }
}

Это работает, но мне нужно сократить время обработки на 3.

В частности, есть ли способ работы с 64-битным процессором ?

Знаете ли вы, могут ли помочь numpy массивы?

РЕДАКТИРОВАТЬ: Процессор Четырехъядерный ARM Cortex-A57 и ОС Linux для Тегры . Извините, я должен был написать это раньше.

Ответы [ 4 ]

3 голосов
/ 06 февраля 2020

Указывая на мгновение очевидное. Ваш код выборочно изменяет данные в maxBuf, что приводит к сбою векторизатора. Просто измените код, чтобы использовать вместо него std :: max ....

  for (int i = 0; i < bufSize; ++i) {
    maxBuf[i] = std::max(maxBuf[i], buf[i]);
  }

... и код теперь будет векторизован.

Подтверждение: https://godbolt.org/z/rviiKF

Внутренний l oop развернут и теперь использует AVX2:

.LBB0_12:                               # =>This Inner Loop Header: Depth=1
        vmovdqu ymm0, ymmword ptr [rsi + rax]
        vmovdqu ymm1, ymmword ptr [rsi + rax + 32]
        vmovdqu ymm2, ymmword ptr [rsi + rax + 64]
        vmovdqu ymm3, ymmword ptr [rsi + rax + 96]
        vpmaxub ymm0, ymm0, ymmword ptr [rdi + rax]
        vpmaxub ymm1, ymm1, ymmword ptr [rdi + rax + 32]
        vmovdqu ymmword ptr [rsi + rax], ymm0
        vmovdqu ymmword ptr [rsi + rax + 32], ymm1
        vpmaxub ymm0, ymm2, ymmword ptr [rdi + rax + 64]
        vpmaxub ymm1, ymm3, ymmword ptr [rdi + rax + 96]
        vmovdqu ymmword ptr [rsi + rax + 64], ymm0
        vmovdqu ymmword ptr [rsi + rax + 96], ymm1
        vmovdqu ymm0, ymmword ptr [rsi + rax + 128]
        vmovdqu ymm1, ymmword ptr [rsi + rax + 160]
        vpmaxub ymm0, ymm0, ymmword ptr [rdi + rax + 128]
        vpmaxub ymm1, ymm1, ymmword ptr [rdi + rax + 160]
        vmovdqu ymmword ptr [rsi + rax + 128], ymm0
        vmovdqu ymmword ptr [rsi + rax + 160], ymm1
        vmovdqu ymm0, ymmword ptr [rsi + rax + 192]
        vmovdqu ymm1, ymmword ptr [rsi + rax + 224]
        vpmaxub ymm0, ymm0, ymmword ptr [rdi + rax + 192]
        vpmaxub ymm1, ymm1, ymmword ptr [rdi + rax + 224]
        vmovdqu ymmword ptr [rsi + rax + 192], ymm0
        vmovdqu ymmword ptr [rsi + rax + 224], ymm1
        add     rax, 256
        add     rdx, 4
        jne     .LBB0_12
2 голосов
/ 06 февраля 2020

Вы можете получить высокоэффективное решение (в моей системе [Intel i5-8250U] ~ 45 мс против ~ 1 мс), если у вас есть процессор с поддержкой AVX2 и обрабатывает 32 байта одновременно, используя встроенные функции Intel SIMD ( Intel Руководство по внутренним характеристикам - макс. )

Поскольку 9000000 делится на 32, вам даже не требуется дополнительный l oop для увеличения sh.

// #include <immintrin.h>, also for g++ add `-mavx2`-flag

int bufSize = 9000000;
byte *buf = static_cast<byte *>(_mm_malloc(sizeof(*buf) * bufSize, 32));
byte *maxBuf = static_cast<byte *>(_mm_malloc(sizeof(*maxBuf) * bufSize, 32));

for (int i = 0; i < bufSize; ++i) 
{
    buf[i] = (byte) rand();
    maxBuf[i] = (byte) rand();
}

for (int i = 0; i < bufSize; i += 32) 
{
    __m256i *buf_simd = (__m256i *) &buf[i];
    __m256i *maxBuf_simd = (__m256i *) &maxBuf[i];

    *maxBuf_simd = _mm256_max_epu8(*maxBuf_simd, *buf_simd);
}

_mm_free(buf);
_mm_free(maxBuf);

Поскольку у меня нет ваших данных, я создаю два массива со случайными данными. Здесь очень важно, чтобы они были выровнены по 32 байтам.

После этого в каждой итерации for-l oop я загружаю 32Byte в векторные регистры и выполняю _mm256_max_epu8, который в основном делит 256 бит в 32-байтовые «пакеты» (так называемый упакованный вектор) и выбирает максимум каждого байта (более подробное объяснение можно найти по ссылке выше).

Если у вас есть только процессор с поддержкой SSE2, вы можно использовать _mm_max_epu8 со 128-битным вектором.

1 голос
/ 06 февраля 2020

Благодаря @Frederik мы нашли, как выполнять эти операции, используя NEON для ARM.

Вот код:

#include <arm_neon.h>

int bufSize = 9000000;
byte_t *buf = static_cast<byte_t *>(aligned_alloc(8, bufSize));
byte_t *maxBuf = static_cast<byte_t *>(aligned_alloc(8, bufSize));

// Optimized MAX using NEON, it works on packets of 8 bytes.
byte_t *maxPtr = maxBuf;
const byte_t *newPtr = buf;
int iterCount = bufSize / 8;
for (int i = 0; i < iterCount; ++i) {
  // load 8 bytes
  uint8x8_t v1 = vld1_u8(maxPtr);
  uint8x8_t v2 = vld1_u8(newPtr);
  // max on 8 bytes
  uint8x8_t result1 = vmax_u8(v1, v2);
  // store the result
  vst1_u8(maxPtr, result1);
  // move 8 bytes
  maxPtr += 8;
  newPtr += 8;
}

// Less optimized MAX for the remaining bytes (if 'bufSize' is not a multiple of 8).
for (int i = iterCount * 8; i < bufSize; ++i) {
  maxBuf[i] = std::max(maxBuf[i], buf[i]);
}

free(buf);
free(maxBuf);

Наши опции компилятора: -O3 -ffast-math -march=armv8-a+simd.

Время обработки отстает от 6 мс. Это было 17 мс с начальными if и 12 мс с std::max, как предложено @robthebloke. Большое спасибо, ребята!

Некоторая документация:

1 голос
/ 06 февраля 2020

С точки зрения того, что у вас есть, нет более быстрого способа сделать это. Использование numpy *1019* действительно только улучшает python, чтобы дать вам C -подобное поведение.

Я думаю, что вам лучше всего использовать OpenMP. Здесь - это простое руководство. Поскольку каждая итерация не зависит друг от друга, я думаю, что ваш код должен выглядеть следующим образом:

#pragma omp parallel for
for (int i = 0; i < bufSize; ++i) {
    #pragma omp simd
    if (buf[i] > maxBuf[i]) {
        maxBuf[i] = buf[i];
    }
}

И затем вы компилируете, используя -fopenmp. Я не уверен, что строка #pragma omp simd сильно поможет.

Вы также можете добавить оптимизацию компилятора. Здесь - список. Также см. Справочную страницу . Они не всегда улучшают скорость, и это зависит от многих факторов. Просто попробуйте их, и это может серьезно оптимизировать ваш код.

Например, у меня был алгоритм, который занимал несколько часов. После выполнения оптимизаций компилятора и OpenMP мне удалось снизить его примерно до 30 секунд. Но эта область программирования может стать очень сложной, и здесь нужно учитывать множество факторов.

...