Быстрый алгоритм для вычисления пересечений N множеств - PullRequest
0 голосов
/ 08 февраля 2019

У меня есть n наборов A0, A2, ... An-1, содержащих элементы набора E.

Я определяю конфигурацию C как целое число, состоящее из n битов, поэтому C имеет значения от 0 до2 ^ п-1.Теперь я определяю следующее:

(C)   an item e of E is in configuration C 
       <=> for each bit b of C, if b==1 then e is in Ab, else e is not in Ab

Например, для n = 3 конфигурация C = 011 соответствует элементам E, которые находятся в A0 и A1, но НЕ в A2 (НЕ важно)

C[bitmap] - это количество элементов, которые имеют точно такой же шаблон присутствия / отсутствия в наборах. C[001] - это количество элементов в A0, которых нет ни в одном другом наборе..


Другое возможное определение:

(V)   an item e of E is in configuration V 
       <=> for each bit b of V, if b==1 then e is in Ab

Например, для n = 3 конфигурация (V) V = 011 соответствует элементам E, которые находятся в A0 и A1

V[bitmap] - это счетчик пересечений выбранных наборов. (т. Е. Счетчик того, сколько элементов находится во всех наборах, где растровое изображение истинно.) V[001]количество элементов в A0.V[011] - это количество элементов в A0 и A1, независимо от того, находятся ли они также в A2.


Далее на первом рисунке показаны элементыустанавливает A0, A1 и A2, второе изображение показывает размер (C) конфигураций, а третье изображение показывает размер (V) конфигураций.

sets examples enter image description here enter image description here

Я также могу представлять конфигурации с помощью любого из двух векторов:

C[001]= 5       V[001]=14
C[010]=10       V[010]=22
C[100]=11       V[100]=24
C[011]= 2       V[011]= 6
C[101]= 3       V[101]= 7
C[110]= 6       V[110]=10
C[111]= 4       V[111]= 4

Я хочу написать C/ C ++ функция, которая максимально эффективно преобразует C в V .Наивным подходом может быть следующая функция 'transfo', которая явно в O (4 ^ n):

#include <vector>
#include <cstdio>

using namespace std;

vector<size_t> transfo (const vector<size_t>& C)
{
    vector<size_t> V (C.size());

    for (size_t i=0; i<C.size(); i++)
    {
        V[i] = 0;

        for (size_t j=0; j<C.size(); j++)
        {
            if ((j&i)==i)  { V[i] += C[j]; }
        }
    }

    return V;
} 


int main()
{
    vector<size_t> C = { 
        /* 000 */  0,
        /* 001 */  5,
        /* 010 */ 10,
        /* 011 */  2,
        /* 100 */ 11,
        /* 101 */  3,
        /* 110 */  6,
        /* 111 */  4
    };

    vector<size_t> V = transfo (C);

    for (size_t i=1; i<V.size(); i++)  {  printf ("[%2ld]  C=%2ld   V=%2ld\n", i, C[i], V[i]);  }
}

Мой вопрос: существует ли более эффективный алгоритм, чем наивный для преобразованиявектор С в вектор V?И какова будет сложность такого «хорошего» алгоритма?

Обратите внимание, что меня может заинтересовать любое решение SIMD.

Ответы [ 2 ]

0 голосов
/ 13 февраля 2019

Согласно великому наблюдению @ CătălinFrâncu, я написал две рекурсивные реализации преобразования (см. Код ниже):

  • transfo_recursive: очень простая рекурсивная реализация
  • transfo_avx2: все еще рекурсивно, но используйте AVX2 для последнего шага рекурсии для n = 3

Я предлагаю здесь, чтобы размеры счетчиков кодировались в 32 битах и ​​чтобы значение n могло расти до28.

Я также написал итеративную реализацию (transfo_iterative), основанную на моих собственных наблюдениях за поведением рекурсии.На самом деле, я думаю, что это близко к нерекурсивной реализации, предложенной @ chtz.

Вот код теста:

// compiled with: g++ -O3   intersect.cpp -march=native -mavx2 -lpthread -DNDEBUG

#include <vector>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <cmath>
#include <thread>
#include <algorithm>
#include <sys/times.h>
#include <immintrin.h>
#include <boost/align/aligned_allocator.hpp>

using namespace std;

////////////////////////////////////////////////////////////////////////////////

typedef u_int32_t Count;

// Note: alignment is important for AVX2
typedef std::vector<Count,boost::alignment::aligned_allocator<Count, 8*sizeof(Count)>>  CountVector;

typedef void (*callback) (CountVector::pointer C, size_t q);
typedef vector<pair<const char*, callback>> FunctionsVector;

unsigned int randomSeed = 0;

////////////////////////////////////////////////////////////////////////////////
double timestamp()
{
    struct timespec timet;
    clock_gettime(CLOCK_MONOTONIC, &timet);
    return timet.tv_sec + (timet.tv_nsec/ 1000000000.0);
}

////////////////////////////////////////////////////////////////////////////////
CountVector getRandomVector (size_t n)
{
    // We use the same seed, so we'll get the same random values
    srand (randomSeed);

    // We fill a vector of size q=2^n with random values
    CountVector C(1ULL<<n);
    for (size_t i=0; i<C.size(); i++)  { C[i] = rand() % (1ULL<<(8*sizeof(Count))); }
    return C;
}

////////////////////////////////////////////////////////////////////////////////
void copy_add_block (CountVector::pointer C, size_t q)
{
    for (size_t i=0; i<q/2; i++)   {  C[i] += C[i+q/2]; }
}

////////////////////////////////////////////////////////////////////////////////
void copy_add_block_avx2 (CountVector::pointer C, size_t q)
{
    __m256i* target = (__m256i*) (C);
    __m256i* source = (__m256i*) (C+q/2);

    size_t imax = q/(2*8);

    for (size_t i=0; i<imax; i++)
    {
        target[i] = _mm256_add_epi32 (source[i], target[i]);
    }
}

////////////////////////////////////////////////////////////////////////////////
// Naive approach : O(4^n)
////////////////////////////////////////////////////////////////////////////////
CountVector transfo_naive (const CountVector& C)
{
    CountVector V (C.size());

    for (size_t i=0; i<C.size(); i++)
    {
        V[i] = 0;

        for (size_t j=0; j<C.size(); j++)
        {
            if ((j&i)==i)  { V[i] += C[j]; }
        }
    }

    return V;
}

////////////////////////////////////////////////////////////////////////////////
// Recursive approach : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
void transfo_recursive (CountVector::pointer C, size_t q)
{
    if (q>1)
    {
        transfo_recursive (C+q/2, q/2);
        transfo_recursive (C,     q/2);

        copy_add_block (C, q);
    }
}

////////////////////////////////////////////////////////////////////////////////
// Iterative approach : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
void transfo_iterative (CountVector::pointer C, size_t q)
{
    size_t i = 0;

    for (size_t n=q; n>1; n>>=1, i++)
    {
        size_t d = 1<<i;

        for (ssize_t j=q-1-d; j>=0; j--)
        {
            if ( ((j>>i)&1)==0) { C[j] += C[j+d]; }
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
// Recursive AVX2 approach : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////

#define ROTATE1(s)  _mm256_permutevar8x32_epi32 (s, _mm256_set_epi32(0,7,6,5,4,3,2,1))
#define ROTATE2(s)  _mm256_permutevar8x32_epi32 (s, _mm256_set_epi32(0,0,7,6,5,4,3,2))
#define ROTATE4(s)  _mm256_permutevar8x32_epi32 (s, _mm256_set_epi32(0,0,0,0,7,6,5,4))

void transfo_avx2 (CountVector::pointer V, size_t N)
{
    __m256i k1 = _mm256_set_epi32 (0,0xFFFFFFFF,0,0xFFFFFFFF,0,0xFFFFFFFF,0,0xFFFFFFFF);
    __m256i k2 = _mm256_set_epi32 (0,0,0xFFFFFFFF,0xFFFFFFFF,0,0,0xFFFFFFFF,0xFFFFFFFF);
    __m256i k4 = _mm256_set_epi32 (0,0,0,0,0xFFFFFFFF,0xFFFFFFFF,0xFFFFFFFF,0xFFFFFFFF);

    if (N==8)
    {
        __m256i* source = (__m256i*) (V);

        *source = _mm256_add_epi32 (*source, _mm256_and_si256(ROTATE1(*source),k1));
        *source = _mm256_add_epi32 (*source, _mm256_and_si256(ROTATE2(*source),k2));
        *source = _mm256_add_epi32 (*source, _mm256_and_si256(ROTATE4(*source),k4));
    }
    else // if (N>8)
    {
        transfo_avx2 (V+N/2, N/2);
        transfo_avx2 (V,     N/2);

        copy_add_block_avx2  (V, N);
    }
}

#define ROTATE1_AND(s)  _mm256_srli_epi64 ((s), 32)  // odd 32bit elements
#define ROTATE2_AND(s)  _mm256_bsrli_epi128 ((s), 8) // high 64bit halves
// gcc doesn't have _mm256_zextsi128_si256
// and _mm256_castsi128_si256 doesn't guarantee zero extension
// vperm2i118 can do the same job as vextracti128, but is slower on Ryzen
#ifdef __clang__                                      // high 128bit lane
#define ROTATE4_AND(s)  _mm256_zextsi128_si256(_mm256_extracti128_si256((s),1))
#else
//#define ROTATE4_AND(s)  _mm256_castsi128_si256(_mm256_extracti128_si256((s),1))
#define ROTATE4_AND(s)  _mm256_permute2x128_si256((s),(s),0x81)  // high bit set = zero that lane
#endif

void transfo_avx2_pcordes (CountVector::pointer C, size_t q)
{
    if (q==8)
    {
        __m256i* source = (__m256i*) (C);
        __m256i tmp = *source;
        tmp = _mm256_add_epi32 (tmp, ROTATE1_AND(tmp));
        tmp = _mm256_add_epi32 (tmp, ROTATE2_AND(tmp));
        tmp = _mm256_add_epi32 (tmp, ROTATE4_AND(tmp));
        *source = tmp;
    }
    else //if (N>8)
    {
        transfo_avx2_pcordes (C+q/2, q/2);
        transfo_avx2_pcordes (C,     q/2);

        copy_add_block_avx2  (C, q);
    }
}

////////////////////////////////////////////////////////////////////////////////
// Template specialization (same as transfo_avx2_pcordes)
////////////////////////////////////////////////////////////////////////////////
template <int n>
void transfo_template (__m256i* C)
{
    const size_t q = 1ULL << n;

    transfo_template<n-1> (C);
    transfo_template<n-1> (C + q/2);

    __m256i* target = (__m256i*) (C);
    __m256i* source = (__m256i*) (C+q/2);

    for (size_t i=0; i<q/2; i++)
    {
        target[i] = _mm256_add_epi32 (source[i], target[i]);
    }
}

template <>
void transfo_template<0> (__m256i* C)
{
    __m256i* source = (__m256i*) (C);
    __m256i tmp = *source;
    tmp = _mm256_add_epi32 (tmp, ROTATE1_AND(tmp));
    tmp = _mm256_add_epi32 (tmp, ROTATE2_AND(tmp));
    tmp = _mm256_add_epi32 (tmp, ROTATE4_AND(tmp));
    *source = tmp;
}

void transfo_recur_template (CountVector::pointer C, size_t q)
{
#define CASE(n)     case 1ULL<<n: transfo_template<n> ((__m256i*)C);   break;

    q = q / 8; // 8 is the number of 32 bits items in the AVX2 registers

    // We have to 'link' the dynamic value of q with a static template specialization
    switch (q)
    {
                  CASE( 1); CASE( 2); CASE( 3); CASE( 4); CASE( 5); CASE( 6); CASE( 7); CASE( 8); CASE( 9);
        CASE(10); CASE(11); CASE(12); CASE(13); CASE(14); CASE(15); CASE(16); CASE(17); CASE(18); CASE(19);
        CASE(20); CASE(21); CASE(22); CASE(23); CASE(24); CASE(25); CASE(26); CASE(27); CASE(28); CASE(29);

        default: printf ("transfo_template undefined for q=%ld\n", q);  break;
    }
}

////////////////////////////////////////////////////////////////////////////////
// Recursive approach multithread : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
void transfo_recur_thread (CountVector::pointer C, size_t q)
{
    std::thread t1 (transfo_recur_template, C+q/2, q/2);
    std::thread t2 (transfo_recur_template, C,     q/2);
    t1.join();
    t2.join();

    copy_add_block_avx2 (C, q);
}

////////////////////////////////////////////////////////////////////////////////
void header (const char* title, const FunctionsVector& functions)
{
    printf ("\n");
    for (size_t i=0; i<functions.size(); i++)  { printf ("------------------"); }  printf ("\n");
    printf ("%s\n", title);
    for (size_t i=0; i<functions.size(); i++)  { printf ("------------------"); }  printf ("\n");
    printf ("%3s\t", "# n");
    for (auto fct : functions)  {  printf ("%20s\t", fct.first);  }
    printf ("\n");
}

////////////////////////////////////////////////////////////////////////////////
// Check that alternative implementations provide the same result as the naive one
////////////////////////////////////////////////////////////////////////////////
void check (const FunctionsVector& functions, size_t nmin, size_t nmax)
{
    header ("CHECK (0 values means similar to naive approach)", functions);

    for (size_t n=nmin; n<=nmax; n++)
    {
        printf ("%3ld\t", n);

        CountVector reference = transfo_naive (getRandomVector(n));

        for (auto fct : functions)
        {
            // We call the (in place) transformation
            CountVector C = getRandomVector(n);
            (*fct.second) (C.data(), C.size());

            int nbDiffs= 0;
            for (size_t i=0; i<C.size(); i++)
            {
                if (reference[i]!=C[i]) { nbDiffs++; }
            }
            printf ("%20ld\t", nbDiffs);
        }
        printf ("\n");
    }
}

////////////////////////////////////////////////////////////////////////////////
// Performance test
////////////////////////////////////////////////////////////////////////////////
void performance (const FunctionsVector& functions, size_t nmin, size_t nmax)
{
    header ("PERFORMANCE", functions);

    for (size_t n=nmin; n<=nmax; n++)
    {
        printf ("%3ld\t", n);
        for (auto fct : functions)
        {
            // We compute the average time for several executions
            // We use more executions for small n values in order
            // to have more accurate results
            size_t nbRuns = 1ULL<<(2+nmax-n);
            vector<double> timeValues;

            // We run the test several times
            for (size_t r=0; r<nbRuns; r++)
            {
                // We don't want to measure time for vector fill
                CountVector C = getRandomVector(n);

                double t0 = timestamp();
                (*fct.second) (C.data(), C.size());
                double t1 = timestamp();

                timeValues.push_back (t1-t0);
            }

            // We sort the vector of times in order to get the median value
            std::sort (timeValues.begin(), timeValues.end());

            double median = timeValues[timeValues.size()/2];
            printf ("%20lf\t", log(1000.0*1000.0*median)/log(2));
        }
        printf ("\n");
    }
}

////////////////////////////////////////////////////////////////////////////////
//
////////////////////////////////////////////////////////////////////////////////
int main (int argc, char* argv[])
{
    size_t nmin = argc>=2 ? atoi(argv[1]) : 14;
    size_t nmax = argc>=3 ? atoi(argv[2]) : 28;

    // We get a common random seed
    randomSeed = time(NULL);

    FunctionsVector functions = {
        make_pair ("transfo_recursive",        transfo_recursive),
        make_pair ("transfo_iterative",        transfo_iterative),
        make_pair ("transfo_avx2",             transfo_avx2),
        make_pair ("transfo_avx2_pcordes",     transfo_avx2_pcordes),
        make_pair ("transfo_recur_template",   transfo_recur_template),
        make_pair ("transfo_recur_thread",     transfo_recur_thread)
    };

    // We check for some n that alternative implementations
    // provide the same result as the naive approach
    check (functions, 5, 15);

    // We run the performance test
    performance (functions, nmin, nmax);
}

А вот график производительности: enter image description here

Можно заметить, что простая рекурсивная реализация довольно хороша, даже по сравнению с версией AVX2.Итеративная реализация немного разочаровывает, но я не приложил больших усилий для ее оптимизации.

Наконец, для моего собственного случая использования с 32-битными счетчиками и для n значений до 28, эти реализации, очевидно, подходят мнепо сравнению с первоначальным «наивным» подходом в O (4 ^ n).


Обновление

Следуя некоторым замечаниям @PeterCordes и @chtz, я добавилследующие реализации:

  • transfo-avx2-pcordes: то же самое, что transfo-avx2 с некоторыми оптимизациями AVX2
  • transfo-recur-template: то же самое, что transfo-avx2-pcordes, но с использованием специализации шаблона C ++ для реализациирекурсия
  • transfo-recur-thread: использование многопоточности для двух начальных рекурсивных вызовов transfo-recur-template

Вот обновленный результат теста:

enter image description here

Несколько замечаний по поводу этого результата:

  1. реализации AVX2 являются логически лучшими вариантами, но, возможно, не с максимальным потенциальным ускорением x8 со счетчиками 32 бит
  2. утраНа реализациях AVX2 специализация шаблонов дает небольшое ускорение, но почти исчезает при больших значениях для n
  3. простая двухпоточная версия имеет плохие результаты для n <20;для n> = 20 всегда есть небольшое ускорение, но далеко от потенциального 2x.
0 голосов
/ 09 февраля 2019

Итак, вы пытаетесь вычислить 2 n значений, поэтому вы не можете добиться большего успеха, чем O (2 n ).

Наивный подход начинается снаблюдение, что V [X] получается путем фиксации всех 1 битов в X и итерации по всем возможным значениям, где 0 битов.Например,

V[010] = C[010] + C[011] + C[110] + C[111]

Но этот подход выполняет O (2 n ) сложений для каждого элемента V, что дает общую сложность O (4 n ).

Вот алгоритм O (n × 2 n ).Мне тоже любопытно, если существует алгоритм O (2 n ).

Пусть n = 4.Давайте рассмотрим полную таблицу V по сравнению с C. Каждая строка в таблице ниже соответствует одному значению V, и это значение рассчитывается путем суммирования столбцов, помеченных *.Расположение символов * может быть легко выведено из наивного подхода.

    |0000|0001|0010|0011|0100|0101|0110|0111||1000|1001|1010|1011|1100|1101|1110|1111
0000| *  | *  | *  | *  | *  | *  | *  | *  || *  | *  | *  | *  | *  | *  | *  | *  
0001|    | *  |    | *  |    | *  |    | *  ||    | *  |    | *  |    | *  |    | *  
0010|    |    | *  | *  |    |    | *  | *  ||    |    | *  | *  |    |    | *  | *  
0011|    |    |    | *  |    |    |    | *  ||    |    |    | *  |    |    |    | *  
0100|    |    |    |    | *  | *  | *  | *  ||    |    |    |    | *  | *  | *  | *  
0101|    |    |    |    |    | *  |    | *  ||    |    |    |    |    | *  |    | *  
0110|    |    |    |    |    |    | *  | *  ||    |    |    |    |    |    | *  | *  
0111|    |    |    |    |    |    |    | *  ||    |    |    |    |    |    |    | *  
-------------------------------------------------------------------------------------
1000|    |    |    |    |    |    |    |    || *  | *  | *  | *  | *  | *  | *  | *  
1001|    |    |    |    |    |    |    |    ||    | *  |    | *  |    | *  |    | *  
1010|    |    |    |    |    |    |    |    ||    |    | *  | *  |    |    | *  | *  
1011|    |    |    |    |    |    |    |    ||    |    |    | *  |    |    |    | *  
1100|    |    |    |    |    |    |    |    ||    |    |    |    | *  | *  | *  | *  
1101|    |    |    |    |    |    |    |    ||    |    |    |    |    | *  |    | *  
1110|    |    |    |    |    |    |    |    ||    |    |    |    |    |    | *  | *  
1111|    |    |    |    |    |    |    |    ||    |    |    |    |    |    |    | *  

Обратите внимание, что верхний левый, верхний правый и нижний правый углы содержат идентичные расположения.Поэтому мы можем выполнить некоторые массовые вычисления следующим образом:

  1. Вычислить нижнюю половину таблицы (нижний правый угол).
  2. Добавить значения в верхнюю половину.
  3. Вычислить верхний левый угол.

Если мы допустим q = 2 n , то есть повторяющаяся сложность составляет

T (q) = 2T (q / 2) + O (q)

, который решается с использованием Основной теоремы до

T (q) = O (q log q)

или, в пересчете на n,

T (n) = O (n × 2 n )

...