Поделиться дугой между замыканиями - PullRequest
0 голосов
/ 29 октября 2018

Я пытаюсь написать простой tcp сервер, который будет читать и транслировать сообщения.
Я использую Tokio, но я думаю, что это больше вопрос Rust.

У меня есть дуга с общим состоянием:
let state = Arc::new(Mutex::new(Shared::new(server_tx)));

Позже я хочу создать 2 потока, которые будут использовать ссылку на это состояние:

let server = listener.incoming().for_each(move |socket| {
    // error[E0382]: capture of moved value: `state`
    process(socket, state.clone());
    Ok(())
}).map_err(|err| {
    println!("accept error = {:?}", err);
});

let receive_sensor_messages = sensors_rx.for_each(move |line| {
    println!("Received sensor message, broadcasting: {:?}", line);

    // error[E0597]: borrowed value does not live long enough
    // error[E0507]: cannot move out of borrowed content 
    for (_, tx) in state.clone().lock().unwrap().clients {
        tx.unbounded_send(line.clone()).unwrap();
    }
    Ok(())
}).map_err(|err| {
    println!("line reading error = {:?}", err);
});

( 1012 * площадка *)

Насколько я понимаю, он пытается сказать, что state заимствовано в первом закрытии listener.incoming().for_each(move |socket| {, поэтому, когда я пытаюсь сделать это снова в sensors_rx.for_each(move |line| {, оно говорит, что это невозможно.

Мой вопрос: как мне это решить? Разве Arc не должен решить проблему разделения переменной между потоками? Я пробовал разные комбинации clone (делал клон вне замыкания, а затем снова делал clone внутри), но ни одна не работала.

Ура!

Ответы [ 2 ]

0 голосов
/ 29 октября 2018

По сути, ваша проблема может быть сведена к до следующего MCVE:

use std::sync::{Arc, Mutex};

struct Bar;

fn foo(_ : &Bar){
    println!("foo called");
}

fn main(){
    let example = Arc::new(Mutex::new(Bar));
    std::thread::spawn(move ||{
        let _ = example.clone();
    });
    // --- (1) ---

    std::thread::spawn(move ||{
        foo(&example.clone().lock().unwrap());
    });
}

Теперь первая проблема в том, что example перемещен. То есть, как только мы пересекли (1), оригинал example считается перемещенным из. Вместо этого нам нужно сначала clone и , затем move:

    let example = Arc::new(Mutex::new(Bar));
    let local_state = example.clone();
    std::thread::spawn(move ||{
        let _ = local_state; // now fine!
    });

Другая ошибка связана с недолговечным Arc. По сути, он живет только для нас lock на базовом Mutex. Хотя мы знаем, что по крайней мере еще один Arc указывает на память, компилятор не может этого доказать. Однако, если мы избавимся от clone(), это нормально:

    let local_state = example.clone();        
    std::thread::spawn(move ||{
        foo(&local_state.lock().unwrap());
    });

Однако вы также перебираете контейнер, потребляя его содержимое (clients). Вместо этого используйте & там, например, &local_state().unwrap().clients).

Вы можете найти полный фиксированный код ниже или на детской площадке :

use std::sync::{Arc, Mutex};

struct Bar;

fn foo(_ : &Bar){
    println!("foo called");
}

fn main(){
    let example = Arc::new(Mutex::new(Bar));
    let local_state = example.clone();
    std::thread::spawn(move ||{
        let _ = local_state;
    });
    let local_state = example.clone();
    std::thread::spawn(move ||{
        foo(&local_state.lock().unwrap());
    }).join();
}
0 голосов
/ 29 октября 2018

Для каждого закрытия вы должны предоставить свой Arc, поэтому вам нужно clone ваш Arc заранее.

let state = Arc::new(Mutex::new(Shared::new(server_tx)));
let state1 = Arc::clone(&state);
let state2 = Arc::clone(&state);

let server = listener.incoming().for_each(move |socket| {
    process(socket, state1.clone());
    Ok(())
});

let receive_sensor_messages = sensors_rx.for_each(move |line| {
    println!("Received sensor message, broadcasting: {:?}", line);
    let shared = state2.lock().unwrap();
    for (_, tx) in &shared.clients { // better: `for tx in shared.clients.values()`
        tx.unbounded_send(line.clone()).unwrap();
    }
    Ok(())
});

Вы можете опустить state1 здесь, но я считаю, что делать это так чище.

Причина этого в том, что вы перемещаете значение state в первое закрытие, поэтому вы не можете использовать его во втором закрытии, потому что оно уже перемещено (имеет смысл, не так ли?).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...