Как устранить ошибку компиляции «Unpin не реализовано для GenFuture», возникающей в результате адаптации потока с помощью комбинатора? - PullRequest
0 голосов
/ 15 апреля 2020

Я не могу решить, как устранить ошибку компиляции, возникающую в результате адаптации потока с помощью комбинатора.

Следующий Rust Playground демонстрирует довольно минимальный пример:

use futures::prelude::*;
use futures::StreamExt;

#[derive(Debug)]
pub enum Input {
    A,
    B(i32),
    C(u16),
}

#[derive(Debug)]
enum Output {
    Int(i32),
    Short(u16),
}

pub struct StreamMaker;

impl StreamMaker {
    /// make a stream with a series of inputs
    pub fn create(self) -> impl Stream<Item = Input> {
        stream::iter(vec![Input::A, Input::C(1u16), Input::B(2)])
    }
}

/// consume the creator, and make output messages for a subset
pub fn adapt_stream(creator: StreamMaker) -> impl Stream<Item = Output> {
    let mut upstream = creator.create();
    upstream.filter_map(|message| async move {
        match message {
            Input::A => None,
            Input::B(v) => Some(Output::Int(v)),
            Input::C(v) => Some(Output::Short(v)),
        }
    })
}

#[tokio::main]
async fn main() -> Result<(), ()> {
    let creator = StreamMaker {};
    let mut stream = adapt_stream(creator);

    while let Some(message) = stream.next().await {
        println!("message: {:?}", message)
    }

    Ok(())
}

Сбой компиляции с:

error[E0277]: the trait bound `std::future::GenFuture<[static generator@src/main.rs:29:46: 35:6 message:Input {}]>: std::marker::Unpin` is not satisfied in `impl core::future::future::Future`
  --> src/main.rs:43:38
   |
43 |     while let Some(message) = stream.next().await {
   |                                      ^^^^ within `impl core::future::future::Future`, the trait `std::marker::Unpin` is not implemented for `std::future::GenFuture<[static generator@src/main.rs:29:46: 35:6 message:Input {}]>`
   |
   = help: the following implementations were found:
             <std::future::GenFuture<T> as std::marker::Unpin>
   = note: required because it appears within the type `impl core::future::future::Future`
   = note: required because of the requirements on the impl of `std::marker::Unpin` for `futures_util::stream::stream::filter_map::FilterMap<impl futures_core::stream::Stream, impl core::future::future::Future, [closure@src/main.rs:29:25: 35:6]>`
   = note: required because it appears within the type `impl futures_core::stream::Stream`

error[E0277]: the trait bound `std::future::GenFuture<[static generator@src/main.rs:29:46: 35:6 message:Input {}]>: std::marker::Unpin` is not satisfied in `impl core::future::future::Future`
  --> src/main.rs:43:31
   |
43 |     while let Some(message) = stream.next().await {
   |                               ^^^^^^^^^^^^^^^^^^^ within `impl core::future::future::Future`, the trait `std::marker::Unpin` is not implemented for `std::future::GenFuture<[static generator@src/main.rs:29:46: 35:6 message:Input {}]>`
   |
   = help: the following implementations were found:
             <std::future::GenFuture<T> as std::marker::Unpin>
   = note: required because it appears within the type `impl core::future::future::Future`
   = note: required because of the requirements on the impl of `std::marker::Unpin` for `futures_util::stream::stream::filter_map::FilterMap<impl futures_core::stream::Stream, impl core::future::future::Future, [closure@src/main.rs:29:25: 35:6]>`
   = note: required because it appears within the type `impl futures_core::stream::Stream`
   = note: required because of the requirements on the impl of `core::future::future::Future` for `futures_util::stream::stream::next::Next<'_, impl futures_core::stream::Stream>`

Я могу поставить pin_mut!(stream); в основную, но я хотел бы иметь возможность набрать sh в восходящем направлении.

1 Ответ

3 голосов
/ 15 апреля 2020

Если вы не хотите, чтобы потребитель вашего потока сам связывал его, вам нужно вернуть поток, который реализует черту Unpin, что означает, что безопасно перемещаться в памяти даже после того, как он был закреплен .

pub fn adapt_stream(creator: StreamMaker) -> impl Stream<Item = Output> + Unpin {
//                                                      add Unpin trait --^

Добавляя это, ваш компилятор должен жаловаться, что возвращаемое значение не реализует Unpin. Это связано с тем, что блоки async move { ... } не реализуют Unpin, поскольку они могут быть самоссылочными (например, содержать ссылки на свои собственные переменные). Самый общий способ обойти это - закрепить поток в куче с помощью Pin<Box<_>>, используя конструктор Box::pin:

pub fn adapt_stream(creator: StreamMaker) -> impl Stream<Item = Output> + Unpin {
    let mut upstream = creator.create();
    Box::pin(upstream.filter_map(|message| async move {
//  ^-- pin stream to heap
        match message {
            Input::A => None,
            Input::B(v) => Some(Output::Int(v)),
            Input::C(v) => Some(Output::Short(v)),
        }
    }))
}

Поскольку мы теперь возвращаем указатель Pin<Box<_>> на поток, этот указатель можно безопасно перемещать в памяти, в то время как внутренний поток хранится в том же месте.

Полный пример игровой площадки

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...