Оптимизировать генератор простых чисел ржавчины N - PullRequest
3 голосов
/ 08 мая 2020

Игра с rust итераторами и лямбдами Я реализовал N генератор простых чисел ( оригинальная игровая площадка / улучшенный код. ).
Он работает очень плохо ( В 10 раз медленнее) по сравнению с эквивалентным функциональным кодом на другом языке, с которым я лучше разбираюсь на том же самом компьютере.
Некоторые числа:

test calc_primes ... bench: 7,754,795 нс / итер ( +/- 1,887,591)

#[macro_use]
extern crate bencher;
use bencher::Bencher; 
use nth_prime::*;                                                                                                   
fn calc_primes(b: &mut Bencher) {
   b.iter(|| primes(10000)); 
} 

benchmark_group!(benches, calc_primes); 
benchmark_main!(benches);                           

Я ожидаю, что ржавчина превзойдет ее, но мне трудно избавиться от fl aws в приведенном ниже коде. Итак, вот призыв помочь оптимизировать эту конкретную реализацию.

  • Мне не нравятся эти Vec выделения, некоторые из них определенно не нужны ...
  • , вероятно, могли бы передать больше ссылок здесь и там
  • в другой реализации используется логический OR на fve c bitarray, что очень быстро. Logi c ниже ржавчины express тот же подход в вызове fold, но здесь используется Vec<bool>, который должен быть медленным по сравнению с эквивалентом битового массива ...
  • использовать другой тип данных вместо Vec<bool> , не прочь выделить массивный массив байтов и работать с битовыми сдвигами и т. д. c., но не могу представить, как собрать это вместе ...
pub fn primes(n: u64) -> Vec<usize> {                                                                                                                                             
    if n < 4 {                                                                                                                                                                    
        vec![2]                                                                                                                                                                   
    } else {                                                                                                                                                                      
        base(primes((n as f64).sqrt().ceil() as u64), n)                                                                                                                          
    }                                                                                                                                                                             
}                                                                                                                                                                                 

fn base(r: Vec<usize>, p: u64) -> Vec<usize> {                                                                                                                                    
    let fvec = (0..p).map(|_| false).collect::<Vec<bool>>();                                                                                                                      
    let z = r                                                                                                                                                                     
        .iter()                                                                                                                                                                   
        .map(|&x| (0..x).map(|y| y > 0).collect::<Vec<bool>>())                                                                                                                   
        .map(|x| {                                                                                                                                                                
            (0..p)                                                                                                                                                                
                .map(|n| !x[(n % (x.len() as u64)) as usize])                                                                                                                     
                .collect::<Vec<bool>>()                                                                                                                                           
        })                                                                                                                                                                        
        .fold(fvec, |acc, x| {                                                                                                                                                    
            acc.iter().zip(x).map(|(a, b)| *a || b).collect()                                                                                                                     
        });                                                                                                                                                                       
    let yy: Vec<usize> = z                                                                                                                                                        
        .iter()                                                                                                                                                                   
        .enumerate()                                                                                                                                                              
        .filter_map(|(i, x)| if !*x { Some(i) } else { None })                                                                                                                    
        .skip(1)                                                                                                                                                                  
        .collect();
    r.iter().chain(yy.iter()).map(|x| *x).collect()                                                                                                                               
}                                                                                          

Относительно алгоритмов генерации простых чисел , есть выдающийся ответ в этой ветке , так что это не вопрос об алгоритме оптимизации, а об этой конкретной реализации без внешних ящиков (это обучающее упражнение).

EDIT:

После некоторых подсказок из раздела комментариев, он значительно оптимизирован:

test calc_primes       ... bench:   4,776,400 ns/iter (+/- 1,427,534)
test calc_primes_sieve ... bench:     644,220 ns/iter (+/- 1,655,581)
test calc_primes_2     ... bench:     268,598 ns/iter (+/- 46,440)
  1. Заменено 'неэффективное' Vec<bool> @ fold растровое изображение на идиоматический c способ итератора of: step_by
  2. Отброшенные неизменяемые типы в пользу состояния (есть ли способ сохранить неизменяемый подход?)
fn base2(head: Vec<u64>, p: u64) -> Vec<u64> {                                                                                                                                    
    let mut fvec = (0..p).map(|_| false).collect::<Vec<bool>>();                                                                                                                  
    head.iter()                                                                                                                                                                   
        .map(|&x| {                                                                                                                                                               
            (0..p)                                                                                                                                                                
                .step_by(x as usize)                                                                                                                                              
                .for_each(|y| fvec[y as usize] = true)                                                                                                                            
        })                                                                                                                                                                        
        .for_each(drop);                                                                                                                                                          
    let tail: Vec<u64> = fvec.iter().enumerate()
        .filter_map(|(i, x)| if !*x { Some(i as u64) } else { None })                                                                     
        .skip(1)                                              
        .collect();                                     
    head.iter().chain(tail.iter()).cloned().collect()                                                                                                                             
}                                                         

EDIT 2 :

Для справки вот Реализация «другого» языка:

q)p:{$[x<4; enlist 2; r, 1_ where not any x#'not til each r:p ceiling sqrt x]}



q)\ts do[1000;p 10000]
474 443088  
=
474,000 ns/iter 

Для 64-битных q v4.0:

q)\t do[1000;p 10000]
273  
=
273,000 ns/iter 

Неплохо для интерпретируемого языка.

EDIT 3

Добавлены дополнительные тесты для новых реализаций из ответов.
(в хронологическом порядке)
Удалось немного сбрить без skip(1) и различий fvec распределение Новый код игровой площадки.

test ssqrt_p_1__vec_bool_n_mod   ... bench:   6,838,150 ns/iter (+/- 627,389)
test sieve_p_2__mut_step_by      ... bench:     367,229 ns/iter (+/- 38,279)
test ssqrt_p_3__mut_step_by      ... bench:     266,189 ns/iter (+/- 56,215)
test ssqrt_p_4__push_step_by_mod ... bench:   1,255,206 ns/iter (+/- 262,107)
test sieve_p_5__for_loop_mut     ... bench:     441,397 ns/iter (+/- 47,077)
test ssqrt_p_6__no_skip          ... bench:     176,186 ns/iter (+/- 24,765)

StdDev теперь играет большую роль ... работает с:

набор задач - c 0 автомобиль go скамейка --jobs 1

fn base3(head: Vec<u64>, p: u64) -> Vec<u64> {                                                                                                                                    
    let mut fvec = vec![false; p as usize]; fvec[1] = true;                                                                                                                       
    head.iter()                                                                                                                                                                   
        .map(|&x| {                                                                                                                                                               
            (0..p)                                                                                                                                                                
                .step_by(x as usize)                                                                                                                                              
                .for_each(|y| fvec[y as usize] = true)                                                                                                                            
        })                                                                                                                                                                        
        .for_each(drop);                                                                                                                                                          
    let tail: Vec<u64> = fvec                                                                                                                                                     
        .iter()                                                                                                                                                                   
        .enumerate()                                                                                                                                                              
        .filter_map(|(i, x)| if !*x { Some(i as u64) } else { None })                                                                                                             
        .collect();                                                                                                                                                               
    head.iter().chain(tail.iter()).cloned().collect()                                                                                                                             
} 

EDIT 5

На удивление ниже no_chain медленнее, чем base3:no_skip версия. Угадайте, что компилятор magi c (оптимизация хвостового вызова в рекурсивных функциях ???) .

fn base4(head: Vec<u64>, p: u64) -> Vec<u64> {                                                                                                                                    
    let mut fvec = vec![false; p as usize]; fvec[1] = true;                                                                                                                       
    head.iter()                                                                                                                                                                   
        .map(|&x| {                                                                                                                                                                           
            (0..p)                                                                                                                                                                
                .step_by(x as usize)                                                                                                                                              
                .for_each(|y| fvec[y as usize] = true);                                                                                                                           
            fvec[x as usize] = false;                                                                                                                                             
        })                                                                                                                                                                        
        .for_each(drop);                                                                                                                                                          
    fvec.iter().enumerate()                                                                                                                                                       
        .filter_map(|(i, x)| if !*x { Some(i as u64) } else { None })                                                                                                             
        .collect()                                                                                                                                                                
}                                              

Ответы [ 3 ]

4 голосов
/ 09 мая 2020

В вашем коде есть очень странные накладные расходы, например

(0..x).map(|y| y > 0).collect::<Vec<bool>>()

, которые вы используете только как x[(n % (x.len() as u64)) as usize] ... aka. как очень медленный способ записи n % x == 0.

Вот почти прямой вариант вашего кода, где я просто удаляю эти накладные расходы, которые вы добавили, и встроенный код там, где это имеет смысл.

pub fn primes(n: usize) -> Vec<usize> {
    if n < 4 {
        vec![2]
    } else {
        let mut r = primes((n as f64).sqrt().ceil() as usize);
        let mut fvec: Vec<_> = (0..n).map(|i| r.iter().any(|x| i % x == 0)).collect();
        fvec[1] = true;

        for (i, x) in fvec.into_iter().enumerate() {
            if !x { r.push(i) }
        }
        r
    }
}

Самым большим фактическим изменением была замена skip(1) на fvec[1] = true, поскольку так было значительно проще.

Конечно, реальное сито Эратосфена проще, с меньшими накладными расходами и Лучшая временная сложность.

pub fn primes(n: usize) -> Vec<usize> {
    let mut primes = Vec::new();
    let mut sieve = vec![true; n];

    for p in 2..n {
        if sieve[p] {
            primes.push(p);
            let mut i = p * p;
            while i < n {
                sieve[i] = false;
                i += p;
            }
        }
    }

    primes
}

Вот слегка оптимизированное сито, позволяющее избежать ненужной работы и использования памяти.

pub fn primes6(mut n: usize) -> Vec<usize> {
    let mut primes = vec![2];
    n >>= 1;

    // 0  1  2  3  4
    // 2, 3, 5, 7, 9, ...
    let mut sieve = vec![true; n];

    let end = (n as f64).sqrt().ceil() as usize;
    for p in 1..end {
        if sieve[p] {
            let prime = p * 2 + 1;
            primes.push(prime);

            let mut i = ((prime * prime) - 1) / 2;
            while i < n {
                sieve[i] = false;
                i += prime; // skip by 2·prime
            }
        }
    }

    for p in end..n {
        if sieve[p] {
            let prime = p * 2 + 1;
            primes.push(prime);
        }
    }

    primes
}
2 голосов
/ 09 мая 2020

Я нашел еще две многообещающие оптимизации:

Классический алгоритм:

pub fn primes_erathosthenes(upper_bound: u64) -> Vec<u64> {
  let sieve_size = (upper_bound + 1) as u64;
  let mut sieve = vec![false; sieve_size as usize];
  let mut p = 2;
  'next: loop {
    if p * p >= sieve_size {
      break 'next;
    }
    for f in p..=(upper_bound / p) {
      let index = p * f;
      sieve[index as usize] = true;
    }
    p += 1;
    while sieve[p as usize] {
      p += 1;
    }
  }
  sieve
    .into_iter()
    .enumerate()
    .skip(2)
    .filter_map(|(i, p)| if p { None } else { Some(i as u64) })
    .collect()
}

Ваш окончательный алгоритм с улучшенным l oop

fn base_improved(head: Vec<u64>, p: u64) -> Vec<u64> {
  let mut fvec = vec![false; p as usize];
  fvec[0] = true;
  fvec[1] = true;
  head
    .iter()
    .flat_map(|&x| (2u64..p).map(move |i| i * x).take_while(|y| *y < p))
    .for_each(|y| fvec[y as usize] = true);
  fvec
    .iter()
    .enumerate()
    .filter_map(|(i, x)| if !*x { Some(i as u64) } else { None })
    .collect()
}

и более быстрым алгоритм просеивания (сундарамс, могу выложить код, если он интересен)

Тесты:

test calc_erathosthenes          ... bench:      34,192 ns/iter (+/- 2,902)
test calc_exercism               ... bench:      79,314 ns/iter (+/- 16,276)
test calc_stackoverflow          ... bench:      35,431 ns/iter (+/- 5,674)
test calc_stackoverflow_improved ... bench:      32,508 ns/iter (+/- 6,971)
test calc_sundaram               ... bench:      17,930 ns/iter (+/- 3,182)
test calc_veedrac                ... bench:      27,110 ns/iter (+/- 3,870)
test calc_veedrac_6              ... bench:      12,178 ns/iter (+/- 4,113)
0 голосов
/ 09 мая 2020

tl; dr

  • bitvecs могут работать, но для повышения производительности требуется много оптимизации в особых случаях
  • Наиболее важны чистые реализации достойных алгоритмов.

Длинная версия

Следует отметить, что производительность различных методов может меняться в зависимости от размера ввода. Это также может быть не так, дело в том, что использование только одного входного размера при тестировании может дать результаты, которые не распространяются на все размеры.

Вот исходный код в вопросе:


pub fn primes(n: u64) -> Vec<usize> {
    if n < 4 {
        vec![2]
    } else {
        base(primes((n as f64).sqrt().ceil() as u64), n)
    }
}

fn base(r: Vec<usize>, p: u64) -> Vec<usize> {
    let fvec = (0..p).map(|_| false).collect::<Vec<bool>>();
    let z = r
        .iter()
        .map(|&x| (0..x).map(|y| y > 0).collect::<Vec<bool>>())
        .map(|x| {
            (0..p)
                .map(|n| !x[(n % (x.len() as u64)) as usize])
                .collect::<Vec<bool>>()
        })
        .fold(fvec, |acc, x| {
            acc.iter().zip(x).map(|(a, b)| *a || b).collect()
        });
    let yy: Vec<usize> = z
        .iter()
        .enumerate()
        .filter_map(|(i, x)| if !*x { Some(i) } else { None })
        .skip(1)
        .collect();
    r.iter().chain(yy.iter()).map(|x| *x).collect()
}

Мне показалось, что приведенный выше код довольно сложно читать. Понять. Вроде бы много лишнего шума, вроде ненужных собирает. Кроме того, имена переменных могут быть более наглядными.

Ниже представлена ​​моя наивная реализация сита (calc_primes_mine_naive, на тестах ниже). У него нет изменчивости, хотя, по общему признанию, есть ненужный сбор. Это в 2–3 раза быстрее, чем указано выше.

pub fn primes_less_than(n: u64) -> Vec<usize> {
    let is_not_prime: Vec<bool> = (0..n).map(|num| {
        let sieve_upper_bound = (num as f64).sqrt().ceil() as u64 + 1;
        (2..sieve_upper_bound).any(|divisor| {
            (num % divisor) == 0
        })
    }).collect();
    is_not_prime.iter().enumerate().filter(|(_, this_one_not_prime)| {
        !**this_one_not_prime
    }).map(|(definitely_prime, _)| { definitely_prime }).collect()
}

Ниже первая попытка оптимизации. Как правило, sqrts медленные, поэтому избавление от повторяющихся sqrts, как указано выше, должно быть абсолютной победой (похоже, не имело большого значения).

pub fn primes_less_than_naive(n: u64) -> Vec<usize> {
    let mut sqrt = 1;
    let is_not_prime: Vec<bool> = (0..n).map(|num| {
        if sqrt * sqrt <= num {
            sqrt += 1;
        }
        let sieve_upper_bound = sqrt + 1;
        (2..sieve_upper_bound).any(|divisor| {
            (num % divisor) == 0
        })
    }).collect();
    is_not_prime.iter().enumerate().filter(|(_, this_one_not_prime)| {
        !**this_one_not_prime
    }).map(|(definitely_prime, _)| { definitely_prime }).collect()
}

Посмотрел сборку и профилировал в vtune. Vtune жаловался на накладные расходы на "высокое переключение MS". Я изначально предполагаю, что это было вызвано приведением типов к поплавкам и обратно, но оно продолжалось после удаления всех операций с плавающими точками. Взгляд на сборку, казалось, подразумевал, что могут быть некоторые накладные расходы из-за collect, поэтому я избавился от лишнего сбора.

pub fn primes_less_than_naive2(n: u64) -> Vec<usize> {
    let mut sqrt = 1;
    (0..n).map(|num| {
        if sqrt * sqrt <= num {
            sqrt += 1;
        }
        let sieve_upper_bound = sqrt + 1;
        (2..sieve_upper_bound).any(|divisor| {
            (num % divisor) == 0
        })
    }).enumerate().filter(|(_, this_one_not_prime)| {
        !*this_one_not_prime
    }).map(|(definitely_prime, _)| { definitely_prime }).collect()
}

Отсутствие изменчивости, похоже, немного ухудшило производительность. Возможно, есть способ express без использования mut, но, вероятно, это будет довольно громоздкий код.

pub fn primes_less_than_naive_with_mut(n: usize) -> Vec<usize> {
    let mut is_not_prime = vec![false; n as usize];
    let sieve_upper_bound = (n as f64).sqrt().ceil() as usize + 1;
    (1..sieve_upper_bound).for_each(|step_size| {
        (0..n).step_by(step_size).for_each(|i| {
            is_not_prime[i] = true;
        })
    });
    is_not_prime.iter().enumerate().filter(|(_, this_one_not_prime)| {
        !**this_one_not_prime
    }).map(|(definitely_prime, _)| { definitely_prime }).collect()
}

В этот момент vtune жалуется, что некоторые части вышеперечисленного связаны с ядром, и некоторые части привязаны к памяти. Это раздражает, b / c способ решить проблему с привязкой к памяти - использовать битвек, а способ исправить привязку к ядру - не использовать битвек. Пытаясь облегчить ограничение памяти, я попытался изменить направление l oop, идея заключалась в том, что это позволит лучше использовать кеш. Когда я впервые написал это, я обнаружил, что это работает, но с тех пор я этого не сделал.

pub fn primes_less_than_naive_alternating(n: usize) -> Vec<usize> {
    let mut is_not_prime = vec![false; n as usize];
    let sieve_upper_bound = (n as f64).sqrt().ceil() as usize + 1;
    let mut forward = true;
    (1..sieve_upper_bound).for_each(|step_size| {
        if forward {
            (0..n).step_by(step_size).for_each(|i| {
                is_not_prime[i] = true;
            })
        } else {
            (0..n).step_by(step_size).rev().for_each(|i| {
                is_not_prime[i] = true;
            })
        };
        forward = !forward;
    });
    is_not_prime.iter().enumerate().filter(|(_, this_one_not_prime)| {
        !**this_one_not_prime
    }).map(|(definitely_prime, _)| { definitely_prime }).collect()
}

Bitve c версия. Это медленнее (неудивительно). Возможно, удастся сделать это быстро, имея какую-то таблицу поиска по маске для небольшого step_size, но я не optimisti c. Vtune не может сказать ничего интересного, кроме того, что это может быть связано с множеством зависимостей данных. (Иначе говоря, он не может выйти из строя) b / c следующий байт зависит от предыдущего байта [b / c позади сцены, загрузка и сохранение имеют размер слова?])

pub unsafe fn primes_less_than_naive_bitvec(n: usize) -> Vec<usize> {
    let num_bytes = n / 8 + 1;
    let mut is_not_prime = libc::calloc(size_of::<u8>(), num_bytes) as *mut u8;
    let sieve_upper_bound = (n as f64).sqrt().ceil() as usize + 1;
    (1..sieve_upper_bound).for_each(|step_size| {
        (0..n).step_by(step_size).for_each(|i| {
            let address = i >> 3;
            let bit_within_the_byte = i & 0b111;
            let cur = is_not_prime.offset(address as isize);
            *cur |= ((0x1 as u8) << bit_within_the_byte);
        });
    });
    (0..n).filter(|i|{
        let address = i / 8;
        let bit_within_the_byte = i % 8;
        let cur = is_not_prime.offset(address as isize).read();
        let is_not_prime = (cur | ((0x1 as u8) << bit_within_the_byte)) > 0;
        !is_not_prime
    }).collect::<Vec<usize>>()
}

При n = 10:

test calc_primes                                 ... bench:         752 ns/iter (+/- 61)
test calc_primes_mine                            ... bench:         314 ns/iter (+/- 20)
test calc_primes_mine_alternating_loop_direction ... bench:         110 ns/iter (+/- 6)
test calc_primes_mine_bitvec                     ... bench:         165 ns/iter (+/- 888)
test calc_primes_mine_naive                      ... bench:         304 ns/iter (+/- 29)
test calc_primes_mine_naive2                     ... bench:         250 ns/iter (+/- 16)
test calc_primes_mine_naive_with_mut             ... bench:          70 ns/iter (+/- 4)

При n = 100:

test calc_primes                                 ... bench:       5,924 ns/iter (+/- 2,210)
test calc_primes_mine                            ... bench:       3,384 ns/iter (+/- 88)
test calc_primes_mine_alternating_loop_direction ... bench:         673 ns/iter (+/- 23)
test calc_primes_mine_bitvec                     ... bench:       1,194 ns/iter (+/- 36)
test calc_primes_mine_naive                      ... bench:       3,141 ns/iter (+/- 195)
test calc_primes_mine_naive2                     ... bench:       2,919 ns/iter (+/- 107)
test calc_primes_mine_naive_with_mut             ... bench:         521 ns/iter (+/- 31)

При n = 1000:

test calc_primes                                 ... bench:     120,828 ns/iter (+/- 18,018)
test calc_primes_mine                            ... bench:      59,269 ns/iter (+/- 6,993)
test calc_primes_mine_alternating_loop_direction ... bench:       7,080 ns/iter (+/- 2,795)
test calc_primes_mine_bitvec                     ... bench:      16,186 ns/iter (+/- 470)
test calc_primes_mine_naive                      ... bench:      56,572 ns/iter (+/- 1,849)
test calc_primes_mine_naive2                     ... bench:      54,526 ns/iter (+/- 9,773)
test calc_primes_mine_naive_with_mut             ... bench:       5,873 ns/iter (+/- 531)

При n = 10 000

test calc_primes                                 ... bench:   2,672,089 ns/iter (+/- 117,553)
test calc_primes_mine                            ... bench:   1,222,262 ns/iter (+/- 24,242)
test calc_primes_mine_alternating_loop_direction ... bench:      77,348 ns/iter (+/- 4,845)
test calc_primes_mine_bitvec                     ... bench:     201,894 ns/iter (+/- 4,446)
test calc_primes_mine_naive                      ... bench:   1,213,264 ns/iter (+/- 160,097)
test calc_primes_mine_naive2                     ... bench:   1,174,753 ns/iter (+/- 88,116)
test calc_primes_mine_naive_with_mut             ... bench:      66,283 ns/iter (+/- 6,358)

При = 100 000:

test calc_primes                                 ... bench:  71,953,394 ns/iter (+/- 11,795,923)
test calc_primes_mine                            ... bench:  27,542,006 ns/iter (+/- 5,158,828)
test calc_primes_mine_alternating_loop_direction ... bench:   1,103,863 ns/iter (+/- 135,770)
test calc_primes_mine_bitvec                     ... bench:   2,547,044 ns/iter (+/- 180,308)
test calc_primes_mine_naive                      ... bench:  27,399,779 ns/iter (+/- 3,121,742)
test calc_primes_mine_naive2                     ... bench:  26,430,825 ns/iter (+/- 1,818,277)
test calc_primes_mine_naive_with_mut             ... bench:   1,045,803 ns/iter (+/- 103,090)

При n = 1000000:

test calc_primes                                 ... bench: 1,827,068,877 ns/iter (+/- 131,368,352)
test calc_primes_mine                            ... bench: 658,940,900 ns/iter (+/- 45,559,627)
test calc_primes_mine_alternating_loop_direction ... bench:  18,112,057 ns/iter (+/- 1,839,472)
test calc_primes_mine_bitvec                     ... bench:  30,189,983 ns/iter (+/- 2,649,749)
test calc_primes_mine_naive                      ... bench: 658,018,301 ns/iter (+/- 39,323,294)
test calc_primes_mine_naive2                     ... bench: 651,249,390 ns/iter (+/- 22,800,036)
test calc_primes_mine_naive_with_mut             ... bench:  18,212,183 ns/iter (+/- 2,936,353)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...