Почему порождение потоков с использованием Iterator :: map не запускает потоки параллельно? - PullRequest
0 голосов
/ 03 апреля 2019

Я написал простое многопоточное приложение на Rust для добавления чисел от 1 до x. (Я знаю, что для этого есть формула, но смысл заключался в том, чтобы написать какой-то многопоточный код на Rust, а не получить результат.) Он работал нормально, но после того, как я перестроил его на более функциональный стиль вместо обязательного, больше не было ускорения от многопоточности. При проверке использования процессора выясняется, что из моего 4-ядерного / 8-поточного ЦП используется только одно ядро. В исходном коде загрузка процессора составляет 790%, а в рефакторированной версии - только 99%.

Оригинальный код:

use std::thread;

fn main() {
    let mut handles: Vec<thread::JoinHandle<u64>> = Vec::with_capacity(8);

    const thread_count: u64 = 8;
    const batch_size: u64 = 20000000;

    for thread_id in 0..thread_count {
        handles.push(thread::spawn(move || {
            let mut sum = 0_u64;

            for i in thread_id * batch_size + 1_u64..(thread_id + 1) * batch_size + 1_u64 {
                sum += i;
            }

            sum
        }));
    }

    let mut total_sum = 0_u64;

    for handle in handles.into_iter() {
        total_sum += handle.join().unwrap();
    }
    println!("{}", total_sum);
}

Реорганизованный код:

use std::thread;

fn main() {
    const THREAD_COUNT: u64 = 8;
    const BATCH_SIZE: u64 = 20000000;

    // spawn threads that calculate a part of the sum
    let handles = (0..THREAD_COUNT).map(|thread_id| {
        thread::spawn(move ||
            // calculate the sum of all numbers from assigned to this thread
            (thread_id * BATCH_SIZE + 1 .. (thread_id + 1) * BATCH_SIZE + 1)
                .fold(0_u64,|sum, number| sum + number))
    });

    // add the parts of the sum together to get the total sum
    let sum = handles.fold(0_u64, |sum, handle| sum + handle.join().unwrap());

    println!("{}", sum);
}

выходы программ одинаковы (12800000080000000), но реорганизованная версия работает в 5-6 раз медленнее.

Похоже, что итераторы лениво вычисляются. Как я могу заставить весь итератор быть оцененным? Я пытался собрать его в массив типа [thread::JoinHandle<u64>; THREAD_COUNT as usize], но затем я получаю следующую ошибку:

  --> src/main.rs:14:7
   |
14 |     ).collect::<[thread::JoinHandle<u64>; THREAD_COUNT as usize]>();
   |       ^^^^^^^ a collection of type `[std::thread::JoinHandle<u64>; 8]` cannot be built from `std::iter::Iterator<Item=std::thread::JoinHandle<u64>>`
   |
   = help: the trait `std::iter::FromIterator<std::thread::JoinHandle<u64>>` is not implemented for `[std::thread::JoinHandle<u64>; 8]`

Сбор в вектор работает, но это кажется странным решением, потому что размер известен заранее. Есть ли лучший способ, чем использовать вектор?

1 Ответ

4 голосов
/ 03 апреля 2019

Итераторы в Rust ленивы, поэтому ваши потоки не запускаются, пока handles.fold не попытается получить доступ к соответствующему элементу итератора. В основном, что происходит:

  1. handles.fold пытается получить доступ к первому элементу итератора.
  2. Первый поток запущен.
  3. handles.fold вызывает его закрытие, которое вызывает handle.join() для первого потока.
  4. handle.join ожидает завершения первого потока.
  5. handles.fold пытается получить доступ ко второму элементу итератора.
  6. Второй поток запущен.
  7. и т. Д.

Вы должны собрать ручки в вектор, прежде чем сложить результат:

let handles: Vec<_> = (0..THREAD_COUNT)
    .map(|thread_id| {
        thread::spawn(move ||
            // calculate the sum of all numbers from assigned to this thread
            (thread_id * BATCH_SIZE + 1 .. (thread_id + 1) * BATCH_SIZE + 1)
                .fold(0_u64,|sum, number| sum + number))
    })
    .collect();

Или вы можете использовать ящик типа Rayon , который предоставляет параллельные итераторы.

...