Как я могу применить ограничение к количеству байтов, прочитанных futures :: Stream :: concat2? - PullRequest
0 голосов
/ 04 ноября 2018

Ответ на Как мне прочитать весь текст гипер-запроса Tokio? предлагает:

Вы можете установить ограничение на количество прочитанных байтов [при использовании futures::Stream::concat2]

Как я могу на самом деле достичь этого? Например, вот код, который имитирует злонамеренного пользователя, отправляющего моему сервису бесконечный объем данных:

extern crate futures; // 0.1.25

use futures::{prelude::*, stream};

fn some_bytes() -> impl Stream<Item = Vec<u8>, Error = ()> {
    stream::repeat(b"0123456789ABCDEF".to_vec())
}

fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {
    some_bytes().concat2()
}

fn main() {
    let v = limited().wait().unwrap();
    println!("{}", v.len());
}

1 Ответ

0 голосов
/ 04 ноября 2018

Одним из решений является создание потокового комбинатора, который завершает поток после прохождения некоторого порога байтов. Вот одна из возможных реализаций:

struct TakeBytes<S> {
    inner: S,
    seen: usize,
    limit: usize,
}

impl<S> Stream for TakeBytes<S>
where
    S: Stream<Item = Vec<u8>>,
{
    type Item = Vec<u8>;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        if self.seen >= self.limit {
            return Ok(Async::Ready(None)); // Stream is over
        }

        let inner = self.inner.poll();
        if let Ok(Async::Ready(Some(ref v))) = inner {
            self.seen += v.len();
        }
        inner
    }
}

trait TakeBytesExt: Sized {
    fn take_bytes(self, limit: usize) -> TakeBytes<Self>;
}

impl<S> TakeBytesExt for S
where
    S: Stream<Item = Vec<u8>>,
{
    fn take_bytes(self, limit: usize) -> TakeBytes<Self> {
        TakeBytes {
            inner: self,
            limit,
            seen: 0,
        }
    }
}

Это может быть затем присоединено к потоку до concat2:

fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {
    some_bytes().take_bytes(999).concat2()
}

Эта реализация имеет предостережения:

  • работает только для Vec<u8>. Конечно, вы можете использовать дженерики, чтобы сделать его более применимым.
  • он позволяет вводить больше байтов, чем предел, он просто останавливает поток после этой точки. Эти типы решений зависят от приложения.

Еще одна вещь, которую нужно иметь в виду, это то, что вы хотите попытаться решить эту проблему настолько низко, насколько это возможно - если источник данных уже выделил гигабайт памяти, размещение ограничения не поможет так же.

...