Почему мьютексы Rust не дают блокировки потоку, который хотел заблокировать его последним? - PullRequest
6 голосов
/ 07 июля 2019

Я хотел написать программу, которая порождает два потока, которые блокируют Mutex, увеличивают его, распечатывают что-то, а затем разблокируют Mutex, чтобы другой поток мог сделать то же самое.Я добавил немного времени ожидания, чтобы сделать его более согласованным, поэтому я подумал, что вывод должен выглядеть примерно так:

ping pong ping pong …  

, но фактический вывод довольно случайный.В большинстве случаев это просто

ping ping ping … pong 

Но нет никакой последовательности;иногда посередине есть «понг».

Я верил, что у мьютексов есть какой-то способ определить, кто хочет заблокировать его последним, но, похоже, это не так.

  1. Как на самом деле работает блокировка?
  2. Как получить желаемый результат?
use std::sync::{Arc, Mutex};
use std::{thread, time};

fn main() {
    let data1 = Arc::new(Mutex::new(1));
    let data2 = data1.clone();
    let ten_millis = time::Duration::from_millis(10);

    let a = thread::spawn(move || loop {
        let mut data = data1.lock().unwrap();
        thread::sleep(ten_millis);
        println!("ping ");
        *data += 1;
        if *data > 10 {
            break;
        }
    });

    let b = thread::spawn(move || loop {
        let mut data = data2.lock().unwrap();
        thread::sleep(ten_millis);
        println!("pong ");
        *data += 1;
        if *data > 10 {
            break;
        }
    });

    a.join().unwrap();
    b.join().unwrap();
}

Ответы [ 3 ]

8 голосов
/ 07 июля 2019

Mutex и RwLock относятся к специфическим для ОС примитивам и не могут быть честными. В Windows они оба реализованы с блокировками SRW , которые специально задокументированы как не справедливо. Я не проводил исследования для других операционных систем, но вы определенно не можете полагаться на честность с std::sync::Mutex, особенно если вам нужен этот код для переносимости.

Возможным решением в Rust является реализация Mutex, предоставляемая parking_lot корзиной , которая обеспечивает метод unlock_fair , который реализовано с использованием справедливого алгоритма.

Из parking_lot документации :

По умолчанию мьютексы несправедливы и позволяют текущему потоку повторно блокировать мьютекс, прежде чем другой сможет получить блокировку, даже если этот поток был заблокирован на мьютексе в течение длительного времени. Это значение по умолчанию, поскольку оно позволяет значительно повысить пропускную способность, поскольку позволяет избежать принудительного переключения контекста при каждой разблокировке мьютекса. Это может привести к тому, что один поток получит мьютекс намного больше, чем другие потоки.

Однако в некоторых случаях может быть полезно обеспечить справедливость, принудительно передавая блокировку ожидающему потоку, если он есть. Это делается с помощью этого метода вместо обычного сброса MutexGuard.

Хотя parking_lot::Mutex не претендует на то, чтобы быть справедливым без использования unlock_fair метода, я обнаружил, что ваш код генерирует то же количество пингов, что и pongs, просто сделав этот переключатель ( plays ) ), даже не используя метод unlock_fair.

Обычно мьютексы разблокируются автоматически, когда охранник выходит из области видимости. Чтобы сделать его разблокированным, вам нужно вставить этот метод перед тем, как сбросить охрану:

let b = thread::spawn(move || loop {
    let mut data = data1.lock();
    thread::sleep(ten_millis);
    println!("pong ");
    *data += 1;
    if *data > 10 {
        break;
    }
    MutexGuard::unlock_fair(data);
});
4 голосов
/ 07 июля 2019

Я полагал, что у мьютексов был какой-то способ определить, кто хотел заблокировать его последним, но это не так.

Нет.Работа мьютекса заключается в том, чтобы сделать код максимально быстрым.Чередование дает худшую производительность, потому что вы постоянно выбрасываете кэш процессора.Вы запрашиваете наихудшую возможную реализацию мьютекса.

Как на самом деле работает блокировка?

Планировщик пытается выполнить как можно больше работы.Ваша работа - писать код, который выполняет только ту работу, которую вы действительно хотите выполнить.

Как получить желаемый результат?

Не использовать два потокаесли вы просто хотите сделать что-то одно, то что-то другое, то снова первое.Используйте потоки, когда вас не интересует порядок выполнения работы, и вы просто хотите выполнить как можно больше работы.

4 голосов
/ 07 июля 2019

Порядок блокировки мьютекса не гарантируется;первый поток может получить блокировку 100% времени, в то время как второй поток 0%

. Потоки запланированы ОС, и вполне возможен следующий сценарий:

  1. ОС отдает процессорное время первому потоку и получает блокировку
  2. ОС отдает процессорное время второму потоку, но блокировка снята, следовательно, она переходит в спящий режим
  3. Первый поток снимает блокировку, но ОС по-прежнему может запускаться.Он идет на другую итерацию цикла и повторно получает блокировку
  4. Другой поток не может продолжить, потому что блокировка все еще снята.

Если вы дадите второму потоку больше временичтобы получить блокировку, вы увидите ожидаемую схему пинг-понга, хотя нет никакой гарантии (плохая ОС может решить никогда не выделять процессорное время некоторым вашим потокам):

use std::sync::{Arc, Mutex};
use std::{thread, time};

fn main() {
    let data1 = Arc::new(Mutex::new(1));
    let data2 = data1.clone();

    let ten_millis = time::Duration::from_millis(10);

    let a = thread::spawn(move || loop {
        let mut data = data1.lock().unwrap();
        *data += 1;
        if *data > 10 {
            break;
        }

        drop(data);
        thread::sleep(ten_millis);
        println!("ping ");
    });

    let b = thread::spawn(move || loop {
        let mut data = data2.lock().unwrap();
        *data += 1;
        if *data > 10 {
            break;
        }

        drop(data);
        thread::sleep(ten_millis);
        println!("pong ");
    });

    a.join().unwrap();
    b.join().unwrap();
}

Вы можете проверить, чтоиграя со временем сна.Чем меньше время сна, тем более нерегулярным будет чередование пинг-понга, и при значениях, меньших 10 мс, вы можете увидеть пинг-пинг-понг и т. Д.

По сути, решение, основанное на времени, - этоплохо по дизайну.Вы можете гарантировать, что за «ping» последует «pong», улучшив алгоритм.Например, вы можете напечатать «ping» для нечетных чисел и «pong» для четных чисел:

use std::sync::{Arc, Mutex};
use std::{thread, time};

const MAX_ITER: i32 = 10;

fn main() {
    let data1 = Arc::new(Mutex::new(1));
    let data2 = data1.clone();

    let ten_millis = time::Duration::from_millis(10);

    let a = thread::spawn(move || 'outer: loop {
        loop {
            thread::sleep(ten_millis);
            let mut data = data1.lock().unwrap();
            if *data > MAX_ITER {
                break 'outer;
            }

            if *data & 1 == 1 {
                *data += 1;
                println!("ping ");
                break;
            }
        }
    });

    let b = thread::spawn(move || 'outer: loop {
        loop {
            thread::sleep(ten_millis);
            let mut data = data2.lock().unwrap();
            if *data > MAX_ITER {
                break 'outer;
            }

            if *data & 1 == 0 {
                *data += 1;
                println!("pong ");
                break;
            }
        }
    });

    a.join().unwrap();
    b.join().unwrap();
}

Это не лучшая реализация, но я попытался сделать это с минимальным количеством модификаций дляоригинальный код.

Вы также можете рассмотреть реализацию с Condvar, на мой взгляд, лучшее решение, так как оно позволяет избежать занятого ожидания мьютекса (ps: также устранено дублирование кода):

use std::sync::{Arc, Mutex, Condvar};
use std::thread;

const MAX_ITER: i32 = 10;

fn main() {
    let cv1 = Arc::new((Condvar::new(), Mutex::new(1)));
    let cv2 = cv1.clone();

    let a = thread::spawn(ping_pong_task("ping", cv1, |x| x & 1 == 1));
    let b = thread::spawn(ping_pong_task("pong", cv2, |x| x & 1 == 0));

    a.join().unwrap();
    b.join().unwrap();
}

fn ping_pong_task<S: Into<String>>(
        msg: S, 
        cv: Arc<(Condvar, Mutex<i32>)>, 
        check: impl Fn(i32) -> bool) -> impl Fn() 
{
    let message = msg.into();

    move || {
        let (condvar, mutex) = &*cv;

        let mut value = mutex.lock().unwrap();
        loop {
            if check(*value) {
                println!("{} ", message);
                *value += 1;
                condvar.notify_all();
            }

            if *value > MAX_ITER {
                break;
            }

            value = condvar.wait(value).unwrap();
        }
    }
}
...