Использование обратного вызова при обработке TCP-соединений с Tokio - PullRequest
0 голосов
/ 22 января 2019

Я пытаюсь создать структуру, которая запускает цикл обработки событий, прослушивает TCP-соединения и вызывает обратный вызов для каждого соединения.

(Обратному вызову будут переданы некоторые предопределенные данные из сокета. В моем примере ниже я просто передаю ему IP-адрес соединения, но в своем реальном коде я буду анализировать содержимое, полученное с помощью serde, в структуру и передать это в обратный вызов. Я надеюсь, что это не лишает законной силы следующий «неработающий пример»).

Мой Cargo.toml:

[package]
name = "lifetime-problem"
version = "0.1.0"
edition = "2018"
[dependencies]
tokio-tcp = "0.1.3"
tokio = "0.1.14"
[[bin]]
name = "lifetime-problem"
path = "main.rs"

и main.rs:

use tokio::prelude::*;

struct Test {
    printer: Option<Box<Fn(std::net::SocketAddr) + Sync>>,
}

impl Test {
    pub fn start(&mut self) -> Result<(), Box<std::error::Error>> {
        let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
        let listener = tokio::net::TcpListener::bind(&addr)?;
        let server = listener
            .incoming()
            .map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
            .for_each(move |socket: tokio::net::TcpStream| {
                let address = socket.peer_addr().expect("");
                match self.printer {
                    Some(callback) => { callback(address); }
                    None => { println!("{}", address); }
                }
                Ok(())
            });
        tokio::run(server);
        Ok(())
    }
}

fn main() {
    let mut x = Test{ printer: None };
    x.start();
}

Я пробовал несколько вещей, начиная с этого кода (который взят непосредственно из примера Tokio ).

  1. Если я использую код, указанный выше, я получаю:

    error[E0277]: (dyn std::ops::Fn(std::net::SocketAddr) + std::marker::Sync + 'static) cannot be sent between threads safely
    

    для строки 24 (tokio::run(server)).

  2. Если я добавлю черту Send на Fn в поле принтера XOR , если я удалю move в замыкании в вызове for_each, я получу еще одну ошибку вместо:

    error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
    

    , который указывает на закрытие, которое, очевидно, не может пережить метод start, где он определен, но tokio::run, кажется, имеет противоречивые требования к нему.

Знаете ли вы, что я неправильно обращаюсь с шаблоном обратного вызова или в моем коде есть какая-то незначительная ошибка?

1 Ответ

0 голосов
/ 23 января 2019

Перво-наперво:

Компилятор переведет Box<Fn(std::net::SocketAddr) + Sync> в Box<Fn(std::net::SocketAddr) + Sync + 'static>, если время жизни явно не указано.

Давайте посмотрим на ошибки:

error[E0277]: (dyn std::ops::Fn(std::net::SocketAddr) + std::marker::Sync + 'static) cannot be sent between threads safely

Это говорит само за себя. Вы пытаетесь переместить &mut T в другой поток, но не можете, потому что T здесь не Send. Для отправки &mut T в другой поток T тоже должен иметь тип Send.

Вот минимальный код, который выдаст ту же ошибку:

use std::fmt::Debug;

fn func<T> (i:&'static mut T) where T: Debug {
    std::thread::spawn(move || {
        println!("{:?}", i);
    });
}

Если я сделаю T выше также типа Send, ошибка исчезнет. Но в вашем случае, когда вы добавляете черту Send, это дает пожизненную ошибку. Почему?

&mut self имеет некоторое время жизни больше, чем функция start(), установленная вызывающей стороной, но нет гарантии, что ее 'static. Вы перемещаете эту ссылку в замыкание, которое передается потоку и потенциально может пережить область, над которой он закрывается, что приводит к висячей ссылке.

Вот минимальная версия, которая выдает ту же ошибку.

use std::fmt::Debug;

fn func<'a, T:'a> (i:&'a mut T) where T: Debug + Sync + Send {
    std::thread::spawn(move || {
        println!("{:?}", i);
    });
}

Sync здесь на самом деле не требуется, так как &mut T. Изменение &mut T на &T (с сохранением Sync) также приведет к той же ошибке. Бремя здесь - ссылки, а не изменчивость. Итак, вы видите, что существует некоторое время жизни 'a, и оно перемещается в замыкание (данное потоку), что означает, что замыкание теперь содержит ссылку (не пересекающуюся с основным контекстом). Итак, что же такое 'a и как долго оно будет жить с точки зрения замыкания, которое вызывается из другого потока? Не заразно! В результате компилятор жалуется, говоря cannot infer an appropriate lifetime due to conflicting requirements.

Если мы немного подправим код для;

impl Test {
    pub fn start(&'static mut self) -> Result<(), Box<std::error::Error>> {
        let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
        let listener = tokio::net::TcpListener::bind(&addr)?;
        let server = listener
            .incoming()
            .map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
            .for_each(move |socket: tokio::net::TcpStream| {
                let address = socket.peer_addr().expect("");
                match &self.printer {
                    Some(callback) => { callback(address); }
                    None => { println!("{}", address); }
                }
                Ok(())
            });
        tokio::run(server);
        Ok(())
    }
}

он будет хорошо скомпилирован. Там есть гарантия, что self имеет 'static срок службы. Обратите внимание, что в операторе match нам нужно передать &self.printer, поскольку вы не можете выйти из заимствованного контекста.

Однако ожидается, что Test будет объявлено как статическое и слишком изменчивое, что, как правило, не лучший способ, если у вас есть другие варианты.

Другой способ есть; если вы можете передать Test по значению start() и затем переместить его в for_each(), код будет выглядеть следующим образом:

use tokio::prelude::*;

struct Test {
    printer: Option<Box<Fn(std::net::SocketAddr) + Send>>,
}

impl Test {
    pub fn start(mut self) -> Result<(), Box<std::error::Error>> {
        let addr = "127.0.0.1:4242".parse::<std::net::SocketAddr>()?;
        let listener = tokio::net::TcpListener::bind(&addr)?;
        let server = listener
            .incoming()
            .map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
            .for_each(move |socket: tokio::net::TcpStream| {
                let address = socket.peer_addr().expect("");
                match &self.printer {
                    Some(callback) => {
                        callback(address);
                    }
                    None => {
                        println!("{}", address);
                    }
                }
                Ok(())
            });
        tokio::run(server);
        Ok(())
    }
}

fn main() {
    let mut x = Test { printer: None };
    x.start();
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...