Использование FramedRead с подключением слушателя, кажется, всегда не готово - PullRequest
0 голосов
/ 17 апреля 2019

извиняюсь, вероятно, глупый вопрос новичка от кого-то, кто плохо знаком с Rust и Tokio, но я был бы признателен, если бы кто-то мог сообщить мне, что / почему это причиняет мне боль.Я, вероятно, могу обойти эту проблему, используя другие методы, но я хотел бы знать, почему это не работает.

Я пытаюсь обработать некоторый код в Tokio, который создает прослушиватель TCP, а затемавтоматически превращает полученный JSON в структуру Rust.Может показаться, что использование кодеков было бы хорошим решением, но то, что я пробовал до сих пор, похоже, не работает.

Ниже приведена минимальная программа, в которой я создаю такого слушателя и просто использую предоставленный LinesCodec.чтобы попытаться получить это для интерпретации подключений к нему:

extern crate tokio;

use std::net::SocketAddr;

use tokio::codec::{LinesCodec, FramedRead};
use tokio::net::TcpListener;
use tokio::prelude::*;

fn main() {
    let addr = "127.0.0.1:12345".parse::<SocketAddr>().unwrap();
    let listener = TcpListener::bind(&addr).unwrap();

    tokio::run(listener.incoming()
        .map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
        .for_each(move |socket| {
            let transport = FramedRead::new(socket, LinesCodec::new());

            transport.for_each(|msg| {
                println!("Received: {:?}", msg);
                Ok(())
            });

            Ok(())
        })
    );
}

Это компилируется, и когда я отправляю строку через telnet или через быстрый скрипт на python, отправляя строку с новой строкой, я никогда не получаю распечатанныйсообщение, как я ожидал.

Компилятор выдает полезное предупреждение:

warning: unused `tokio::prelude::stream::ForEach` that must be used
  --> src/main.rs:18:13
   |
18 | /             transport.for_each(|msg| {
19 | |                 println!("Received: {:?}", msg);
20 | |                 Ok(())
21 | |             });
   | |_______________^
   |
   = note: #[warn(unused_must_use)] on by default
   = note: streams do nothing unless polled

Поэтому я попытался изменить transport.for_each следующим образом:

            let mut stream = transport.for_each(|msg| {
                println!("Received: {:?}", msg);
                Ok(())
            });

            loop {
                match stream.poll() {
                    Ok(s) => {
                        match s {
                            Async::Ready(t) => println!("Ready: {:?}", t),
                            Async::NotReady => println!("Not ready"),
                        }
                    }
                    <snip>
                }
            }

Предупреждениетеперь уходит, и я просто получаю множество строк, говорящих «Не готов» к терминалу.Что я делаю не так?

Я также пытался создать свой собственный базовый LinesCodec с отладкой, но, насколько я могу судить, это никогда не вызывается.Тем не менее, из того, что я прочитал в конце https://tokio.rs/docs/going-deeper/frames/, он должен получать мои сообщения.

Буду очень признателен за любую помощь, включая ответы, в которых говорится о переходе с tokio-0.1.18 на 0.3.xпоскольку есть лучшая поддержка того, чего вы пытаетесь достичь.

1 Ответ

0 голосов
/ 19 апреля 2019

Спасибо сообществу Tokio за помощь в этом.Предупреждение было правильным и правильным указанием, однако опрос здесь не является правильным решением, так как он будет вызывать опрос бесконечно.Вместо этого мы получаем будущее с item () и ошибкой io :: Error, мы можем использовать map_err, чтобы сопоставить это с будущим с item () и error (), а затем мы можем запустить tokio :: spawn для этого будущего, и этоработы.

...