Как мне прочитать один пакет из TcpStream, используя Tokio? - PullRequest
0 голосов
/ 12 июня 2018

Я пытаюсь получить один пакет данных, используя tokio:

extern crate tokio;
extern crate tokio_io;

use tokio::net::{TcpListener};
use tokio::prelude::*;

use std::net::SocketAddr;
fn main() {
    let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap();
    let socket = TcpListener::bind(&addr).unwrap();
    println!("Listening on: {}", addr);

    let done = socket
        .incoming()
        .map_err(|e| println!("failed to accept socket; error = {:?}", e))
        .for_each(move |mut socket| {
            let mut bytes = vec![];
            bytes.reserve(1024);
            let processor = socket.read_buf(&mut bytes).into_future()
                .and_then(move |_size| {
                    println!("bytes: {:?}", bytes);
                    Ok(())
                })
                .map_err(|_| ());;
            tokio::spawn(processor)
        });
    tokio::run(done);
}

Этот код печатает пустой пакет.Как изменить этот код для печати полученного пакета с данными?

Ответы [ 2 ]

0 голосов
/ 13 июня 2018

Для себя я почти нашел ответ.Очень полезно Подобный вопрос .

struct AsWeGetIt<R>(R);

impl<R> Stream for AsWeGetIt<R>
    where
        R: AsyncRead,
{
    type Item = BytesMut;
    type Error = std::io::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        let mut buf = BytesMut::with_capacity(1000);

        self.0
            .read_buf(&mut buf)
            .map(|async| async.map(|_| Some(buf)))
    }
}
....
let processor = AsWeGetIt(socket).into_future()
.and_then(|(bytes,_)|  {
    println!("bytes: {:?}", bytes);
    Ok(())
}).map_err(|_| ());

Но для лучшего понимания, как обойтись без отдельной структуры ... А почему и для чего используется карта?

0 голосов
/ 13 июня 2018

Если ваша цель на самом деле получить один пакет, я думаю, что вы добились успеха!

Я несколько раз тестировал программу и получал ответ.Я тестирую с:

nc 127.0.0.1 8080 <<< hello

После нескольких попыток я получаю следующий вывод:

Listening on: 0.0.0.0:8080
bytes: [104, 101, 108, 108, 111, 10]
bytes: []
bytes: [104, 101, 108, 108, 111, 10]
bytes: []
bytes: [104, 101, 108, 108, 111, 10]
bytes: []
bytes: [104, 101, 108, 108, 111, 10]
bytes: [104, 101, 108, 108, 111, 10]

Как видите, иногда у нас есть данные, готовые, а иногдамы неЯ думаю, что в вашем тестировании вам просто не повезло, и вы получили ответы TCP только до того, как были отправлены какие-либо данные?

Я примерно на 90% уверен, что потоки TCP могут содержать пустые пакеты, и это то, что мы "видишь.(если у кого-то есть больше знаний здесь, не стесняйтесь редактировать ответ или комментарий).


Чтобы исправить вашу программу, вы можете пересмотреть свою цель.

Кажется, что чтение одного TCP-пакетаредко помогает.Вместо этого, как правило, вы хотите прочитать некоторое количество байтов и обрабатывать данные по мере их поступления.Мое понимание TCP - это поток байтов, а не поток пакетов.Пакеты - это просто способ передачи байтов из одного места в другое, и они могут быть любой длины без нарушения совместимости.«Один пакет» является довольно туманным понятием.

Вот пример чтения первых 16 байтов потока с использованием функции tokio::io::read_exact:

extern crate tokio;

use tokio::net::TcpListener;
use tokio::prelude::*;

use std::net::SocketAddr;

fn main() {
    let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap();
    let socket = TcpListener::bind(&addr).unwrap();
    println!("Listening on: {}", addr);

    let done = socket
        .incoming()
        .map_err(|e| println!("failed to accept socket; error = {:?}", e))
        .for_each(move |mut socket| {
            // this function deals with bytes a bit differently and will just fill the
            // buffer exactly rather than adding onto the end.
            let mut bytes = vec![0; 16];
            let processor = tokio::io::read_exact(socket, bytes)
                .and_then(move |(socket, bytes)| {
                    println!("bytes: {:?}", bytes);
                    Ok(())
                })
                .map_err(|_| ());
            tokio::spawn(processor)
        });
    tokio::run(done);
}
...