Как читать по очереди из UnixStream, подключенного к сокету? - PullRequest
1 голос
/ 27 сентября 2019

Я пытаюсь прочитать некоторые данные неизвестного размера из UnixStream (в этом коде они называются socket).Данные состоят из заголовка из 6 байтов, а последние два байта указывают, как долго должна быть оставшаяся часть сообщения.

Метод .into_buf(), который я вызываю, взят изIntoBuf черта в bytes ящике Токио.

let mut header = [0u8; 6];
let response1 = self.socket.read(&mut header);
let mut cursor = header.into_buf();

let evt_code = cursor.get_u16_le();
let controller = cursor.get_u16_le();
let param_size = cursor.get_u16_le() as usize;

let mut param = vec![0u8; param_size];
let response2 = self.socket.read(&mut param);
let mut cursor = param.into_buf();

Проблема, с которой я сталкиваюсь, заключается в том, что response2 всегда является Err E_WOULDBLOCK, потому что мой UnixStream подключен к неблокирующему сокету,Кажется, что первый вызов read() читает 6 байтов, как и предполагалось, но затем просто отбрасывает остальное содержимое в потоке.

Как мне обойти это / как я могу сделать read() оставить дополнительные данные в потоке?

Кажется, простейшим решением было бы просто увеличить размер моего исходного буфера, а затем просто прочитать все сразу, но проблема в том, что самое большое возможное сообщение, которое я мог получить, составляет чуть более 64 КБ.Выделение или повторное обнуление буфера 64 КБ для каждого чтения кажется довольно расточительным, особенно потому, что большинство сообщений намного меньше этого

.

Ответы [ 2 ]

0 голосов
/ 28 сентября 2019

Ошибка io::ErrorKind::WouldBlock означает, что базовый сокет еще не готов для чтения / записи, и вам следует повторить попытку. Документация :

Это приведет к тому, что операции чтения, записи, получения и отправки станут неблокирующими, то есть немедленно вернутся из своих вызовов.Если операция ввода-вывода прошла успешно, возвращается Ok, и никаких дальнейших действий не требуется.Если операция ввода-вывода не может быть завершена и ее необходимо повторить, возвращается ошибка с видом io :: ErrorKind :: WillBlock.

С помощью mio::Poll для проверки готовности UnixStream, работает следующий код;без этого он вызывает ошибку WouldBlock.Так что он должен был точно воспроизвести то, что вы видели.

//# bytes = "0.4.12"
//# mio = "0.6.19"
//# mio-uds = "0.6.7"
use bytes::{IntoBuf, buf::Buf};
use mio::{Events, Poll, PollOpt, Ready, Token};
use mio_uds::UnixStream;
use std::io::{Read, Write};
use std::thread;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (mut s1, mut s2) = UnixStream::pair()?;
    let mut buf = [0xab; 0x2000 + 6];
    buf[4] = 0x00;
    buf[5] = 0x20;

    let send = thread::Builder::new()
        .name("send".to_string())
        .spawn(move || {
            s1.write_all(&buf).unwrap();
        })?;

    let recv = thread::Builder::new()
        .name("recv".to_string())
        .spawn(move || {
            let poll = Poll::new().unwrap();
            let mut events = Events::with_capacity(1024);
            poll.register(&s2, Token(0), Ready::readable(), PollOpt::level())
                .unwrap();
            poll.poll(&mut events, None).unwrap();

            let mut header = [0; 6];
            let _ = s2.read(&mut header).unwrap();
            let mut cursor = header.into_buf();
            let _ = cursor.get_u16_le();
            let _ = cursor.get_u16_le();
            let data_size = cursor.get_u16_le() as usize;
            assert_eq!(data_size, 0x2000);

            let mut data = vec![0; data_size];
            let _ = s2.read(&mut data).unwrap();
            assert_eq!(data[0], 0xab);
        })?;

    send.join().unwrap();
    recv.join().unwrap();
    Ok(())
}

Обратите внимание, что код является просто демонстрацией и не настолько надежен, как это может быть.Без проверки того, как на самом деле считываются байты, предполагается, что не было ложного пробуждения и сработало только одно событие готовности.

0 голосов
/ 28 сентября 2019

Я обошел это, используя необработанный файловый дескриптор вместо UnixStream и вызывая recv() с MSG_PEEK.

let mut header = [0u8; 6];

// need MSG_PEEK otherwise recv() clears the socket and we get EAGAIN when
// we try to read the socket again; this is the main reason for using a raw
// fd instead of a UnixStream
if unsafe { libc::recv(self.fd, header.as_mut_ptr() as *mut c_void, 6, libc::MSG_PEEK) } < 0 {
    return Err(io::Error::last_os_error().into());
}

let mut cursor = header.into_buf();

let evt_code = cursor.get_u16_le();
let controller = cursor.get_u16_le();
let param_size = cursor.get_u16_le() as usize;


// since calling recv() with MSG_PEEK doesn't consume the header, we need to make
// this buffer 6 bytes bigger, but that's fine
let mut param = vec![0u8; param_size + 6];

if unsafe { libc::recv(self.fd, param.as_mut_ptr() as *mut c_void, param.len(), 0) } < 0 {
    return Err(io::Error::last_os_error().into());
}

let mut cursor = param.into_buf();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...