Rust Mutex не работает при использовании функции обратного вызова из нескольких потоков C, созданных `fork` - PullRequest
3 голосов
/ 26 марта 2019

Я использую библиотеку C Cuba, которая использует функцию обратного вызова, которая вызывается из нескольких потоков, созданных в C. Параллелизация Cuba основана на функциях POSIX fork / wait вместо pthreads ( arxiv.орг / ABS / 1408.6373 ).Текущий поток указывается в параметре core.

Я пытаюсь записать результаты этой функции обратного вызова на экран и в файл.Если я использую println!, я получаю ожидаемый вывод, но если я использую slog, выход искажается, когда я использую Mutex сток.Если я использую слив async, я вообще не получаю вывод.

Не блокируется ли Mutex, поскольку он не может видеть, что функция фактически вызывается из другого потока?Я пытался воссоздать проблему с потоками Rust, но не смог.Желательно, чтобы заработал async сток.

Ниже приведен пример программы, которая дает проблемное поведение.Обратный вызов получает последний параметр функции vegas в качестве одного из аргументов.Это вектор клонов регистраторов.Таким образом, каждое ядро ​​должно иметь свою собственную копию регистратора:

#[macro_use]
extern crate slog;
extern crate cuba;
extern crate slog_term;

use slog::Drain;

// this function is called from different c threads
// `core` indicates which thread
fn integrand(
    _x: &[f64],
    _f: &mut [f64],
    loggers: &mut Vec<slog::Logger>,
    _nvec: usize,
    core: i32,
) -> Result<(), &'static str> {
    info!(loggers[core as usize], "A\nB\nC");

    Ok(())
}

fn main() {
    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::CompactFormat::new(decorator).build();
    let drain = std::sync::Mutex::new(drain).fuse();

    let log = slog::Logger::root(drain, o!());

    let mut integrator = cuba::CubaIntegrator::new(integrand);
    integrator.set_cores(10, 1000); // set 10 cores

    integrator.vegas(
        1,
        1,
        cuba::CubaVerbosity::Progress,
        0,
        vec![log.clone(); 11],
    );
}

Вывод:

C 
INFO Mar 26A
B
C 10:27
:42.195 MarINFO 26  10:27A
B
C:42.195
 MarINFO 26  10:27A
B
C:42.195
 INFO A
B
C
Mar 26 10:27:42.196 INFO A
B
C
Mar 26 10:27:42.196 INFO A
B
C

1 Ответ

8 голосов
/ 26 марта 2019

Библиотека Cuba C имеет следующее выражение: :

Для пользователей Windows: Cuba 3 и выше использует fork(2) для распараллеливания потоков выполнения.Однако эта функция POSIX не является частью API Windows и, кроме того, используется таким важным образом, что ее нельзя обойти просто с помощью CreateProcess и т. Д. По-видимому, единственная возможная эмуляция доступна через Cygwin.

Вот воспроизведение кода.Мы fork, а затем ребенок и родитель пытаемся удерживать мьютекс во время распечатки материала.sleep вставляется, чтобы побудить планировщик ОС попробовать другие потоки:

use nix::unistd::{fork, ForkResult}; // 0.13.0
use std::{sync::Mutex, thread, time::Duration};

fn main() {
    let shared = Mutex::new(10);

    match fork() {
        Ok(ForkResult::Parent { .. }) => {
            let max = shared.lock().unwrap();
            for _ in 0..*max {
                println!("Parent");
                thread::sleep(Duration::from_millis(10));
            }
        }
        Ok(ForkResult::Child) => {
            let max = shared.lock().unwrap();
            for _ in 0..*max {
                println!("Child");
                thread::sleep(Duration::from_millis(10));
            }
        }
        Err(e) => {
            eprintln!("Error: {}", e);
        }
    }
}
$ cargo run

Parent
Child
Parent
Child
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent

Использование fork с потоками - действительно боль в работе;Я отчетливо помню, как выискивал ужасные проблемы, связанные с этим раньше.Два ресурса, которые я нашел, углубляются:

Последний говорит (выделение мое):

Могу ли я создать мьютекс перед разветвлением?

Да - однако ребенок и родитель не будет совместно использовать виртуальную память, и каждый из них будет иметь мьютекс, независимый от другого .

(Дополнительные примечания. Существуют дополнительные параметры, использующие разделяемую память, которые позволяют дочернему элементу и родителю совместномьютекс, если он создан с правильными параметрами и использует сегмент общей памяти. См. procs, fork () и мьютексы )


Если я используюасинхронная утечка У меня вообще нет вывода.

См. также:


Я бы не доверял библиотеке Cuba Rust.Есть два основных момента:

  1. Если существуют создаваемые потоки, универсальный тип пользовательских данных должен иметь либо Sync, либо Send, ограниченный только типами, которыебезопасно передавать / передавать данные между потоками.

  2. Пользовательские данные, передаваемые в функцию integrand, должны , а не быть &mut.Основная концепция Rust заключается в том, что в любой момент времени может быть только одна изменяемая ссылка на любой фрагмент данных.Куба тривиально позволяет обойти это.

Вот попытка воспроизведения библиотек Cuba Rust и C:

#[macro_use]
extern crate slog;

use slog::Drain;

fn integrand(loggers: &mut Vec<slog::Logger>, core: i32) {
    info!(loggers[core as usize], "A\nB\nC\n{}", core);
}

fn main() {
    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::CompactFormat::new(decorator).build();
    let drain = std::sync::Mutex::new(drain).fuse();

    let log = slog::Logger::root(drain, o!());

    let logs = vec![log.clone(); 11];

    cuba_repro(logs, integrand);
}

use std::{ffi::c_void, thread};

type Integrand<T> = fn(&mut T, i32);

fn cuba_repro<T>(mut user_data: T, mut integrand: Integrand<T>) {
    // From the `vegas` method
    let user_data_ptr = &mut user_data as *mut _ as *mut c_void;
    let integrand_ptr = &mut integrand as *mut _ as *mut c_void;

    unsafe { cuba_repro_ffi::<T>(user_data_ptr, integrand_ptr) }
}

unsafe fn cuba_repro_ffi<T>(user_data: *const c_void, integrand: *const c_void) {
    let user_data = FfiDoesNotCareAboutSendOrSync(user_data);
    let integrand = FfiDoesNotCareAboutSendOrSync(integrand);

    let threads: Vec<_> = (0..4).map(move |i| {
        thread::spawn(move || {
            // C doesn't care about this pedantry
            let user_data = &mut *(user_data.0 as *const T as *mut T);
            let integrand = &mut *(integrand.0 as *const Integrand<T> as *mut Integrand<T>);

            // From the `c_integrand` function
            let k: &mut T = &mut *(user_data as *mut _);
            let _ignored = integrand(k, i);
        })
    }).collect();

    for t in threads { t.join().unwrap() }
}

#[derive(Copy, Clone)]
struct FfiDoesNotCareAboutSendOrSync<T>(T);
unsafe impl<T> Send for FfiDoesNotCareAboutSendOrSync<T> {}
unsafe impl<T> Sync for FfiDoesNotCareAboutSendOrSync<T> {}

Мне пришлось сделать многочисленными изменяется, чтобы компилятор Rust игнорировал огромное количество небезопасных и нарушающих правил библиотек Кубы и связанных с ними FFI.

Этот пример кода действительно фактически распечатывает 4 журналаЗаявления по порядку, так что это не полный ответ.Тем не менее, я вполне уверен, что библиотека Кубы вызывает неопределенное поведение, что означает, что возможен любой результат, в том числе явно работающий.

...