Как я могу создать поток, в котором элементы основаны на элементах, которые поток возвращал ранее? - PullRequest
3 голосов
/ 11 января 2020

У меня есть функция, которая генерирует futures::Stream на основе аргумента. Я хочу вызвать эту функцию несколько раз и сгладить потоки вместе. Сложным является тот факт, что я хочу передать значения, возвращаемые потоком, в качестве аргумента исходной функции.

Конкретно, у меня есть функция, которая возвращает поток чисел до нуля:

fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

Я хочу вызывать эту функцию, начиная с 5. Функция также должна вызываться для каждого возвращаемого нечетного значения. Общий набор вызовов на numbers_down_to_zero будет:

numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);

Создание общего потока

4
3
2
1
0
2
1
0
0
0

Какие существуют методы, позволяющие это сделать?

Ответы [ 3 ]

3 голосов
/ 12 января 2020

Вы можете решить это с помощью unfold. У вас будет структура «state», которая хранит как «базовый поток» (в данном случае с обратным отсчетом до нуля), так и список элементов, создающих новый поток, и использует его в качестве аргумента для unfold, чтобы сохранить состояние при развертывании.

Таким образом, компилятору не нужно рассуждать о владении на протяжении жизни, поскольку состояние можно перемещать в блок async для каждого вызова замыкания.

/// Base stream (counting down to zero).
fn f(n: i32) -> impl Stream<Item = i32> {
    stream::iter((0..n).rev())
}

/// "Recursive" stream
fn g(n: i32) -> impl Stream<Item = i32> {
    /// Helper struct to keep state while unfolding
    struct StreamState<S> {
        inner_stream: S,
        item_queue: VecDeque<i32>,
    }

    // Build helper struct
    let state = StreamState {
        inner_stream: f(n),
        item_queue: VecDeque::new(),
    };

    // Unfold with state
    stream::unfold(state, |mut state| async move {
        loop {
            if let Some(item) = state.inner_stream.next().await {
                // Iterate inner stream, and potentially push item to queue
                if item % 2 == 1 {
                    state.item_queue.push_front(item);
                }
                break Some((item, state));
            } else if let Some(item) = state.item_queue.pop_back() {
                // If inner stream is exhausted, produce new stream from queue
                // and repeat loop
                state.inner_stream = f(item);
            } else {
                // If queue is empty, we are done
                break None;
            }
        }
    })
}

Пример полной игровой площадки

StreamExt::next требует, чтобы внутренний поток реализовал Unpin, и поэтому он не может использоваться с произвольными потоками. Вместо этого вы всегда можете использовать Box::pin(stream), поскольку Pin<Box<T>> равен Unpin и реализует Stream, если T: Stream.

2 голосов
/ 14 января 2020

Используя (as) asyn c / await, genawaiter crate управляет синтаксисом генератора c в стабильном Rust сегодня. В сочетании с futures::pin_mut для закрепления значения в стеке, здесь есть решение как без выделения ресурсов, так и совместимое с произвольными потоками:

//# futures = "0.3"
//# genawaiter = { version = "0.2", features = ["futures03"] }
//# tokio = { version = "0.2", features = ["full"] }
use futures::{
    pin_mut,
    stream::{self, Stream, StreamExt},
};
use genawaiter::{generator_mut, stack::Co};
use std::collections::VecDeque;

async fn g(n: i32, co: Co<'_, i32>) {
    let mut seeds = VecDeque::from(vec![n]);
    while let Some(seed) = seeds.pop_back() {
        let stream = f(seed);
        pin_mut!(stream);
        while let Some(x) = stream.next().await {
            if x % 2 != 0 {
                seeds.push_front(x);
            }
            co.yield_(x).await;
        }
    }
}

fn f(n: i32) -> impl Stream<Item = i32> {
    stream::iter((0..n).rev())
}

#[tokio::main]
async fn main() {
    generator_mut!(stream, |co| g(5, co));
    stream
        .for_each(|v| async move {
            println!("v: {}", v);
        })
        .await;
}

Некоторые недостатки:

  • есть один небезопасный вызов внутри generator_mut macro
  • интерфейс немного негерметичен. Вызывающие могут увидеть некоторые детали реализации.

С одним выделением кучи genawaiter::rc::Gen может избавиться от всего этого. Но опять же, с размещением на столе есть и другие варианты.

use futures::{
    pin_mut,
    stream::{Stream, StreamExt},
};
use genawaiter::rc::Gen;
use std::collections::VecDeque;

fn g(n: i32) -> impl Stream<Item = i32> {
    Gen::new(|co| async move {
        let mut seeds = VecDeque::from(vec![n]);
        while let Some(seed) = seeds.pop_back() {
            let stream = f(seed);
            pin_mut!(stream);
            while let Some(x) = stream.next().await {
                if x % 2 != 0 {
                    seeds.push_front(x);
                }
                co.yield_(x).await;
            }
        }
    })
}
2 голосов
/ 11 января 2020

Это частичные решения, которые я нашел, но которых по разным причинам не хватает.

Использование комбинаторов с изменчивостью внутренней части

Мне не нравится это решение, потому что я считаю, что изменчивость внутренней части должна не требуется для этой общей проблемы, но здесь требуется, потому что контролер заимствований не знает, как чередуются вызовы замыканий.

use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;

fn x(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

use std::{cell::RefCell, rc::Rc};

fn y0() -> impl Stream<Item = i32> {
    let to_visit = Rc::new(RefCell::new(VecDeque::from(vec![5])));
    let to_visit_b = to_visit.clone();

    stream::unfold(to_visit, |to_visit| async {
        let i = to_visit.borrow_mut().pop_back()?;

        Some((x(i), to_visit))
    })
    .flatten()
    .inspect(move |&x| {
        if x % 2 != 0 {
            to_visit_b.borrow_mut().push_front(x);
        }
    })
}

#[tokio::main]
async fn main() {
    y0().for_each(|v| async move {
        println!("v: {}", v);
    })
    .await;
}

детская площадка

Пользовательская реализация Stream::poll_next

Мне не нравится это решение, потому что оно многословно и требует хитрого unsafe кода, который трудно рассуждать (я даже не уверен, что у меня правильно! )

use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;

fn x(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

use std::{
    pin::Pin,
    task::{Context, Poll},
};

struct X<St, C, R, S>
where
    C: Fn(&mut St) -> Option<S>,
    R: Fn(&mut St, &mut S::Item),
    S: Stream,
{
    state: St,
    create: C,
    review: R,
    current: Option<S>,
}

impl<St, C, R, S> Stream for X<St, C, R, S>
where
    C: Fn(&mut St) -> Option<S>,
    R: Fn(&mut St, &mut S::Item),
    S: Stream,
{
    type Item = S::Item;

    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let (state, create, review, current) = unsafe {
            let Self {
                state,
                create,
                review,
                current,
            } = self.get_unchecked_mut();
            (state, create, review, current)
        };

        loop {
            if let Some(current) = current {
                let v = unsafe { futures::ready!(Pin::new_unchecked(current).poll_next(ctx)) };
                if let Some(mut v) = v {
                    review(state, &mut v);
                    return Poll::Ready(Some(v));
                }
            }

            *current = create(state);
            if current.is_none() {
                return Poll::Ready(None);
            }
        }
    }
}

fn y1() -> impl Stream<Item = i32> {
    X {
        state: VecDeque::from(vec![5]),
        create: |to_visit| {
            let i = to_visit.pop_back()?;

            Some(x(i))
        },
        review: |to_visit, &mut x| {
            if x % 2 != 0 {
                to_visit.push_front(x);
            }
        },
        current: None,
    }
}

#[tokio::main]
async fn main() {
    y1().for_each(|v| async move {
        println!("v: {}", v);
    })
    .await;
}

игровая площадка


Использование каналов (не работает)

Это не работает, поскольку отправитель никогда не удаляется потому что получатель никогда не отбрасывается, потому что отправитель никогда не отбрасывается ...

Помимо неработоспособности, у этого есть ряд недостатков:

  • Состояние должно неявно быть очередью ( который соответствует тому, что я хочу сделать, но не очень общее).
  • Требуется, чтобы моя функция сама стала async до pu sh начальным значением для посещения.
  • Я должен обработать ошибки, которые кажутся несущественными.
  • Я должен клонировать Sender внутри then замыкания.
use futures::{stream, Stream, StreamExt};

fn x(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

use futures::channel::mpsc;
use futures::sink::SinkExt;

async fn y2() -> impl Stream<Item = i32> {
    let (mut tx, rx) = mpsc::unbounded();

    tx.send(5).await.unwrap();

    rx.map(x).flatten().then(move |x| {
        let mut tx = tx.clone();
        async move {
            if x % 2 != 0 {
                tx.send(x).await.unwrap();
            }
            x
        }
    })
}

#[tokio::main]
async fn main() {
    y2().await
        .for_each(|v| async move {
            println!("v: {}", v);
        })
        .await;
}

детская площадка

...