Согласно великому наблюдению @ CătălinFrâncu, я написал две рекурсивные реализации преобразования (см. Код ниже):
transfo_recursive
: очень простая рекурсивная реализация transfo_avx2
: все еще рекурсивно, но используйте AVX2 для последнего шага рекурсии для n = 3
Я предлагаю здесь, чтобы размеры счетчиков кодировались в 32 битах и чтобы значение n могло расти до28.
Я также написал итеративную реализацию (transfo_iterative
), основанную на моих собственных наблюдениях за поведением рекурсии.На самом деле, я думаю, что это близко к нерекурсивной реализации, предложенной @ chtz.
Вот код теста:
// compiled with: g++ -O3 intersect.cpp -march=native -mavx2 -lpthread -DNDEBUG
#include <vector>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <cmath>
#include <thread>
#include <algorithm>
#include <sys/times.h>
#include <immintrin.h>
#include <boost/align/aligned_allocator.hpp>
using namespace std;
////////////////////////////////////////////////////////////////////////////////
typedef u_int32_t Count;
// Note: alignment is important for AVX2
typedef std::vector<Count,boost::alignment::aligned_allocator<Count, 8*sizeof(Count)>> CountVector;
typedef void (*callback) (CountVector::pointer C, size_t q);
typedef vector<pair<const char*, callback>> FunctionsVector;
unsigned int randomSeed = 0;
////////////////////////////////////////////////////////////////////////////////
double timestamp()
{
struct timespec timet;
clock_gettime(CLOCK_MONOTONIC, &timet);
return timet.tv_sec + (timet.tv_nsec/ 1000000000.0);
}
////////////////////////////////////////////////////////////////////////////////
CountVector getRandomVector (size_t n)
{
// We use the same seed, so we'll get the same random values
srand (randomSeed);
// We fill a vector of size q=2^n with random values
CountVector C(1ULL<<n);
for (size_t i=0; i<C.size(); i++) { C[i] = rand() % (1ULL<<(8*sizeof(Count))); }
return C;
}
////////////////////////////////////////////////////////////////////////////////
void copy_add_block (CountVector::pointer C, size_t q)
{
for (size_t i=0; i<q/2; i++) { C[i] += C[i+q/2]; }
}
////////////////////////////////////////////////////////////////////////////////
void copy_add_block_avx2 (CountVector::pointer C, size_t q)
{
__m256i* target = (__m256i*) (C);
__m256i* source = (__m256i*) (C+q/2);
size_t imax = q/(2*8);
for (size_t i=0; i<imax; i++)
{
target[i] = _mm256_add_epi32 (source[i], target[i]);
}
}
////////////////////////////////////////////////////////////////////////////////
// Naive approach : O(4^n)
////////////////////////////////////////////////////////////////////////////////
CountVector transfo_naive (const CountVector& C)
{
CountVector V (C.size());
for (size_t i=0; i<C.size(); i++)
{
V[i] = 0;
for (size_t j=0; j<C.size(); j++)
{
if ((j&i)==i) { V[i] += C[j]; }
}
}
return V;
}
////////////////////////////////////////////////////////////////////////////////
// Recursive approach : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
void transfo_recursive (CountVector::pointer C, size_t q)
{
if (q>1)
{
transfo_recursive (C+q/2, q/2);
transfo_recursive (C, q/2);
copy_add_block (C, q);
}
}
////////////////////////////////////////////////////////////////////////////////
// Iterative approach : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
void transfo_iterative (CountVector::pointer C, size_t q)
{
size_t i = 0;
for (size_t n=q; n>1; n>>=1, i++)
{
size_t d = 1<<i;
for (ssize_t j=q-1-d; j>=0; j--)
{
if ( ((j>>i)&1)==0) { C[j] += C[j+d]; }
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Recursive AVX2 approach : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
#define ROTATE1(s) _mm256_permutevar8x32_epi32 (s, _mm256_set_epi32(0,7,6,5,4,3,2,1))
#define ROTATE2(s) _mm256_permutevar8x32_epi32 (s, _mm256_set_epi32(0,0,7,6,5,4,3,2))
#define ROTATE4(s) _mm256_permutevar8x32_epi32 (s, _mm256_set_epi32(0,0,0,0,7,6,5,4))
void transfo_avx2 (CountVector::pointer V, size_t N)
{
__m256i k1 = _mm256_set_epi32 (0,0xFFFFFFFF,0,0xFFFFFFFF,0,0xFFFFFFFF,0,0xFFFFFFFF);
__m256i k2 = _mm256_set_epi32 (0,0,0xFFFFFFFF,0xFFFFFFFF,0,0,0xFFFFFFFF,0xFFFFFFFF);
__m256i k4 = _mm256_set_epi32 (0,0,0,0,0xFFFFFFFF,0xFFFFFFFF,0xFFFFFFFF,0xFFFFFFFF);
if (N==8)
{
__m256i* source = (__m256i*) (V);
*source = _mm256_add_epi32 (*source, _mm256_and_si256(ROTATE1(*source),k1));
*source = _mm256_add_epi32 (*source, _mm256_and_si256(ROTATE2(*source),k2));
*source = _mm256_add_epi32 (*source, _mm256_and_si256(ROTATE4(*source),k4));
}
else // if (N>8)
{
transfo_avx2 (V+N/2, N/2);
transfo_avx2 (V, N/2);
copy_add_block_avx2 (V, N);
}
}
#define ROTATE1_AND(s) _mm256_srli_epi64 ((s), 32) // odd 32bit elements
#define ROTATE2_AND(s) _mm256_bsrli_epi128 ((s), 8) // high 64bit halves
// gcc doesn't have _mm256_zextsi128_si256
// and _mm256_castsi128_si256 doesn't guarantee zero extension
// vperm2i118 can do the same job as vextracti128, but is slower on Ryzen
#ifdef __clang__ // high 128bit lane
#define ROTATE4_AND(s) _mm256_zextsi128_si256(_mm256_extracti128_si256((s),1))
#else
//#define ROTATE4_AND(s) _mm256_castsi128_si256(_mm256_extracti128_si256((s),1))
#define ROTATE4_AND(s) _mm256_permute2x128_si256((s),(s),0x81) // high bit set = zero that lane
#endif
void transfo_avx2_pcordes (CountVector::pointer C, size_t q)
{
if (q==8)
{
__m256i* source = (__m256i*) (C);
__m256i tmp = *source;
tmp = _mm256_add_epi32 (tmp, ROTATE1_AND(tmp));
tmp = _mm256_add_epi32 (tmp, ROTATE2_AND(tmp));
tmp = _mm256_add_epi32 (tmp, ROTATE4_AND(tmp));
*source = tmp;
}
else //if (N>8)
{
transfo_avx2_pcordes (C+q/2, q/2);
transfo_avx2_pcordes (C, q/2);
copy_add_block_avx2 (C, q);
}
}
////////////////////////////////////////////////////////////////////////////////
// Template specialization (same as transfo_avx2_pcordes)
////////////////////////////////////////////////////////////////////////////////
template <int n>
void transfo_template (__m256i* C)
{
const size_t q = 1ULL << n;
transfo_template<n-1> (C);
transfo_template<n-1> (C + q/2);
__m256i* target = (__m256i*) (C);
__m256i* source = (__m256i*) (C+q/2);
for (size_t i=0; i<q/2; i++)
{
target[i] = _mm256_add_epi32 (source[i], target[i]);
}
}
template <>
void transfo_template<0> (__m256i* C)
{
__m256i* source = (__m256i*) (C);
__m256i tmp = *source;
tmp = _mm256_add_epi32 (tmp, ROTATE1_AND(tmp));
tmp = _mm256_add_epi32 (tmp, ROTATE2_AND(tmp));
tmp = _mm256_add_epi32 (tmp, ROTATE4_AND(tmp));
*source = tmp;
}
void transfo_recur_template (CountVector::pointer C, size_t q)
{
#define CASE(n) case 1ULL<<n: transfo_template<n> ((__m256i*)C); break;
q = q / 8; // 8 is the number of 32 bits items in the AVX2 registers
// We have to 'link' the dynamic value of q with a static template specialization
switch (q)
{
CASE( 1); CASE( 2); CASE( 3); CASE( 4); CASE( 5); CASE( 6); CASE( 7); CASE( 8); CASE( 9);
CASE(10); CASE(11); CASE(12); CASE(13); CASE(14); CASE(15); CASE(16); CASE(17); CASE(18); CASE(19);
CASE(20); CASE(21); CASE(22); CASE(23); CASE(24); CASE(25); CASE(26); CASE(27); CASE(28); CASE(29);
default: printf ("transfo_template undefined for q=%ld\n", q); break;
}
}
////////////////////////////////////////////////////////////////////////////////
// Recursive approach multithread : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
void transfo_recur_thread (CountVector::pointer C, size_t q)
{
std::thread t1 (transfo_recur_template, C+q/2, q/2);
std::thread t2 (transfo_recur_template, C, q/2);
t1.join();
t2.join();
copy_add_block_avx2 (C, q);
}
////////////////////////////////////////////////////////////////////////////////
void header (const char* title, const FunctionsVector& functions)
{
printf ("\n");
for (size_t i=0; i<functions.size(); i++) { printf ("------------------"); } printf ("\n");
printf ("%s\n", title);
for (size_t i=0; i<functions.size(); i++) { printf ("------------------"); } printf ("\n");
printf ("%3s\t", "# n");
for (auto fct : functions) { printf ("%20s\t", fct.first); }
printf ("\n");
}
////////////////////////////////////////////////////////////////////////////////
// Check that alternative implementations provide the same result as the naive one
////////////////////////////////////////////////////////////////////////////////
void check (const FunctionsVector& functions, size_t nmin, size_t nmax)
{
header ("CHECK (0 values means similar to naive approach)", functions);
for (size_t n=nmin; n<=nmax; n++)
{
printf ("%3ld\t", n);
CountVector reference = transfo_naive (getRandomVector(n));
for (auto fct : functions)
{
// We call the (in place) transformation
CountVector C = getRandomVector(n);
(*fct.second) (C.data(), C.size());
int nbDiffs= 0;
for (size_t i=0; i<C.size(); i++)
{
if (reference[i]!=C[i]) { nbDiffs++; }
}
printf ("%20ld\t", nbDiffs);
}
printf ("\n");
}
}
////////////////////////////////////////////////////////////////////////////////
// Performance test
////////////////////////////////////////////////////////////////////////////////
void performance (const FunctionsVector& functions, size_t nmin, size_t nmax)
{
header ("PERFORMANCE", functions);
for (size_t n=nmin; n<=nmax; n++)
{
printf ("%3ld\t", n);
for (auto fct : functions)
{
// We compute the average time for several executions
// We use more executions for small n values in order
// to have more accurate results
size_t nbRuns = 1ULL<<(2+nmax-n);
vector<double> timeValues;
// We run the test several times
for (size_t r=0; r<nbRuns; r++)
{
// We don't want to measure time for vector fill
CountVector C = getRandomVector(n);
double t0 = timestamp();
(*fct.second) (C.data(), C.size());
double t1 = timestamp();
timeValues.push_back (t1-t0);
}
// We sort the vector of times in order to get the median value
std::sort (timeValues.begin(), timeValues.end());
double median = timeValues[timeValues.size()/2];
printf ("%20lf\t", log(1000.0*1000.0*median)/log(2));
}
printf ("\n");
}
}
////////////////////////////////////////////////////////////////////////////////
//
////////////////////////////////////////////////////////////////////////////////
int main (int argc, char* argv[])
{
size_t nmin = argc>=2 ? atoi(argv[1]) : 14;
size_t nmax = argc>=3 ? atoi(argv[2]) : 28;
// We get a common random seed
randomSeed = time(NULL);
FunctionsVector functions = {
make_pair ("transfo_recursive", transfo_recursive),
make_pair ("transfo_iterative", transfo_iterative),
make_pair ("transfo_avx2", transfo_avx2),
make_pair ("transfo_avx2_pcordes", transfo_avx2_pcordes),
make_pair ("transfo_recur_template", transfo_recur_template),
make_pair ("transfo_recur_thread", transfo_recur_thread)
};
// We check for some n that alternative implementations
// provide the same result as the naive approach
check (functions, 5, 15);
// We run the performance test
performance (functions, nmin, nmax);
}
А вот график производительности:
Можно заметить, что простая рекурсивная реализация довольно хороша, даже по сравнению с версией AVX2.Итеративная реализация немного разочаровывает, но я не приложил больших усилий для ее оптимизации.
Наконец, для моего собственного случая использования с 32-битными счетчиками и для n значений до 28, эти реализации, очевидно, подходят мнепо сравнению с первоначальным «наивным» подходом в O (4 ^ n).
Обновление
Следуя некоторым замечаниям @PeterCordes и @chtz, я добавилследующие реализации:
transfo-avx2-pcordes
: то же самое, что transfo-avx2
с некоторыми оптимизациями AVX2 transfo-recur-template
: то же самое, что transfo-avx2-pcordes
, но с использованием специализации шаблона C ++ для реализациирекурсия transfo-recur-thread
: использование многопоточности для двух начальных рекурсивных вызовов transfo-recur-template
Вот обновленный результат теста:
Несколько замечаний по поводу этого результата:
- реализации AVX2 являются логически лучшими вариантами, но, возможно, не с максимальным потенциальным ускорением x8 со счетчиками 32 бит
- утраНа реализациях AVX2 специализация шаблонов дает небольшое ускорение, но почти исчезает при больших значениях для n
- простая двухпоточная версия имеет плохие результаты для n <20;для n> = 20 всегда есть небольшое ускорение, но далеко от потенциального 2x.