Как я могу асинхронно получать данные и изменять их с помощью эхо-сервера на основе Tokio? - PullRequest
0 голосов
/ 03 октября 2018

Я работаю на эхо-сервере, который берет данные из TCP и применяет некоторую логику к этим данным.Например, если данные клиента поступают как hello, я хочу ответить им как hello from server.

. Я могу переслать входные данные с помощью функции copy, но это бесполезно вмое дело.

Вот исходный код, над которым я работаю:

extern crate futures;
extern crate tokio_core;
extern crate tokio_io;

use futures::stream::Stream;
use futures::Future;
use std::net::SocketAddr;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use tokio_io::io::copy;
use tokio_io::AsyncRead;

fn main() {
    let addr = "127.0.0.1:15000".parse::<SocketAddr>().unwrap();
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let socket = TcpListener::bind(&addr, &handle).unwrap();
    println!("Listening on: {}", addr);

    let done = socket.incoming().for_each(move |(socket, addr)| {
        let (reader, writer) = socket.split();
        let amt = copy(reader, writer);

        let msg = amt.then(move |result| {
            match result {
                Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr),
                Err(e) => println!("error on {}: {}", addr, e),
            }

            Ok(())
        });

        handle.spawn(msg);

        Ok(())
    });

    core.run(done).unwrap();
}

Я знаю, что мне нужно добавить некоторую логику вместо этой функции копирования, но как?

let amt = copy(reader, writer);

1 Ответ

0 голосов
/ 05 октября 2018

Эхо-сервер является своего рода особенным в том смысле, что ровно за одним «запросом» от клиента следует ровно один ответ от сервера.Очень хороший пример для такого варианта использования - пример TinyDB .

от Tokio. Однако следует учитывать, что хотя UDP основан на пакетах, он поражает другую сторонув точной форме, с которой вы их отправили, TCP - нет.TCP является потоковым протоколом - он имеет строгие гарантии, касающиеся того, что пакет был получен другой стороной и что отправленные данные получены именно в том порядке, в котором они были отправлены. Однако не гарантируется, что один вызов «send»«с одной стороны приводит к ровно одному вызову« на прием », с другой стороны, возвращая точно такой же кусок данных, который был отправлен.Это особенно интересно при отправке очень длинных порций данных, когда отправка карт осуществляется на несколько приемовТаким образом, вы должны согласиться с разделителем, который сервер может подождать, прежде чем пытаться отправить ответ клиенту.В Telnet этим разделителем будет "\ r \ n".Именно здесь начинает действовать инфраструктура Tokio Decoder / Encoder.Примером реализации такого кодека является LinesCodec .Если вы хотите иметь Telnet, это делает именно то, что вы хотите.Он будет выдавать ровно одно сообщение за раз и позволит отправлять ровно одно такое сообщение за раз:

extern crate tokio;

use tokio::codec::Decoder;
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio::codec::LinesCodec;
use std::net::SocketAddr;

fn main() {
    let addr = "127.0.0.1:15000".parse::<SocketAddr>().unwrap();

    let socket = TcpListener::bind(&addr).unwrap();
    println!("Listening on: {}", addr);

    let done = socket.incoming()
        .map_err(|e| println!("failed to accept socket; error = {:?}", e))
        .for_each(move |socket| {
            // Fit the line-based codec on top of the socket. This will take on the task of
            // parsing incomming messages, as well as formatting outgoing ones (appending \r\n).
            let (lines_tx, lines_rx) = LinesCodec::new().framed(socket).split();

            // This takes every incomming message and allows to create one outgoing message for it,
            // essentially generating a stream of responses.
            let responses = lines_rx.map(|incomming_message| {
                // Implement whatever transform rules here
                if incomming_message == "hello" {
                    return String::from("hello from server");
                }
                return incomming_message;
            });

            // At this point `responses` is a stream of `Response` types which we
            // now want to write back out to the client. To do that we use
            // `Stream::fold` to perform a loop here, serializing each response and
            // then writing it out to the client.
            let writes = responses.fold(lines_tx, |writer, response| {
                //Return the future that handles to send the response to the socket
                writer.send(response)
            });

            // Run this request/response loop until the client closes the connection
            // Then return Ok(()), ignoring all eventual errors.
            tokio::spawn(
                writes.then(move |_| Ok(()))
            );

            return Ok(());
        });

    tokio::run(done);
}
...