Почему `futures :: channel :: mpsc` может просто уведомить одного отправителя? - PullRequest
0 голосов
/ 11 ноября 2018

Я читаю futures-preview 0.3 источников, чтобы узнать, как правильно "уведомить любого". В mpsc::channel (что ограничено) несколько отправителей могут ожидать получения (в случае полного буфера).

Рассматривая реализацию next_message и unpark_one, получатель, похоже, уведомляет только одного отправителя за один прием.

Я сомневаюсь, что это работает в присутствии select!, потому что select! может привести к ложному уведомлению. Тем не менее, я не смог представить проблемное дело.

Вот моя попытка запутать mpsc:

[package]
name = "futures-mpsc-test"
version = "0.1.0"
edition = "2018"

[dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
tokio = "0.1.11"

и это:

#![feature(async_await, await_macro, futures_api, pin)]

use std::collections::HashSet;

use futures::prelude::*;

use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures::select;

async fn main2() {
    let channel_len = 1;
    let num_false_wait = 1000;
    let num_expected_messages = 100;

    let (mut send, mut recv) = channel(channel_len);
    // One extra capacity per sender. Fill the extras.
    await!(send.send(-2)).unwrap();

    // Fill buffers
    for _ in 0..channel_len {
        await!(send.send(-1)).unwrap();
    }

    // False waits. Should resolve and produce false waiters.
    for _ in 0..num_false_wait {
        await!(false_wait(&send));
    }

    // True messages.
    {
        let mut send = send.clone();
        await!(send.send(-2)).unwrap();
        tokio::spawn(async move {
            for i in 0..num_expected_messages {
                await!(send.send(i)).unwrap();
            }
            Ok(())
        }.boxed().compat());
    }

    // Drain receiver until all true messages are received.
    let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
    while !expects.is_empty() {
        let i = await!(recv.next()).unwrap();
        expects.remove(&i);
        eprintln!("Received: {}", i);
    }
}

// If `send` is full, it will produce false waits.
async fn false_wait(send: &Sender<i32>) {
    let (wait_send, wait_recv) = oneshot::channel();
    let mut send = send.clone();
    await!(send.send(-2)).unwrap();
    tokio::spawn(async move {
        let mut sending = send.send(-3);
        let mut fallback = future::ready(());
        select! {
            sending => {
                sending.unwrap();
            },
            fallback => {
                eprintln!("future::ready is selected");
            },
        };
        wait_send.send(()).unwrap();
        Ok(())
    }.boxed().compat());
    await!(wait_recv).unwrap();
}

fn main() {
    tokio::run(async {
        await!(main2());
        Ok(())
    }.boxed().compat());
}

Я ожидаю, что это произойдет:

  1. Буфер заполнен на -1. Поэтому последующие отправители блокируются.
  2. Есть как «настоящие официанты», так и «ложные официанты». Ложные официанты уже вышли, потому что другая рука select! немедленно завершается.
  3. В каждом вызове на await!(recv.next()) самое большее один ожидающий отправитель уведомление. Если ложный официант уведомлен, никто не может нажать на буфер, даже если в буфере есть свободная комната.
  4. Если все элементы опорожнены без настоящего уведомления, вся система застряла.

Несмотря на мои ожидания, асинхронная функция main2 успешно завершена. Почему?

Ответы [ 2 ]

0 голосов
/ 17 ноября 2018

Дальнейшее изучение исходного кода futures решило мою проблему. Наконец, я не могу перепутать mpsc таким образом.

Дело в том, что размер mpsc является гибким и может вырасти больше, чем изначально указано. Это поведение упоминается в документах :

Пропускная способность канала равна buffer + num-senders. Другими словами, каждый отправитель получает гарантированный интервал в пропускной способности канала, и, кроме того, есть буферные слоты «первым пришел, первым обслужен», доступный для всех отправителей.

Да, я сначала прочитал это, прежде чем экспериментировать, но я не мог понять важность этого в то время.

Проблема с фиксированным буфером

Подумайте о типичной реализации ограниченной очереди, в которой размер очереди не может быть больше, чем указано изначально. Спецификация такая:

  • Когда очередь пуста, получатели блокируются.
  • Когда очередь заполнена (то есть размер достигает границы), блоки отправителей.

В этой ситуации, если очередь заполнена, несколько отправителей ждут одного ресурса (размер очереди).

В многопоточном программировании это достигается примитивами типа notify_one. Однако в futures это ошибочно: в отличие от многопоточного программирования, уведомленная задача не обязательно использует ресурс , поскольку задача, возможно, уже прекратила получение ресурса (из-за таких конструкций, как select! или Deadline) Тогда спецификация просто нарушается (очередь не заполнена, но все живые отправители блокируются).

mpsc гибкий

Как указано выше, размер буфера для futures::channel::mpsc::channel не является строгим . Спецификация обобщается как:

  • Когда message_queue.len() == 0, блоки приемников.
  • Когда message_queue.len() >= buffer, отправители могут блокировать.
  • Когда message_queue.len() >= buffer + num_senders, отправители блокируются.

Здесь num_senders - это , в основном , число клонов Sender, но больше, чем в некоторых случаях. Точнее, num_senders - это число SenderTask с.

Итак, как нам избежать совместного использования ресурсов? Для этого у нас есть дополнительные состояния:

  • Каждый отправитель (экземпляр SenderTask) имеет is_parked логическое состояние.
  • Канал имеет другую очередь с именем parked_queue, очередь из Arc ссылается на SenderTask.

Канал поддерживает следующие инварианты:

  • message_queue.len() <= buffer + num_parked_senders. Обратите внимание, что мы не знаем значение num_parked_senders.
  • parked_queue.len() == min(0, message_queue.len() - buffer)
  • У каждого припаркованного отправителя есть по крайней мере одно сообщение в parked_queue.

Это достигается с помощью следующего алгоритма:

  • Для получения,
    • он выскакивает SenderTask с parked_queue и, если отправитель припаркован, распакуйте его.
  • Для отправки,
    • Он всегда ждет, пока is_parked будет false. Если message_queue.len() < buffer, как parked_queue.len() == 0, все отправители отменяются. Поэтому мы можем гарантировать прогресс в этом случае.
    • Если is_parked равно false, все равно отправить сообщение в очередь.
    • После этого, если message_queue.len() <= buffer, ему больше ничего не нужно делать.
    • если message_queue.len() > buffer, отправитель становится непаркованным и толкается на parked_queue.

Вы можете легко проверить, поддерживается ли инвариант в приведенном выше алгоритме.

Удивительно, но отправители больше не ждут общего ресурса . Вместо этого отправитель ожидает своего состояния is_parked. Даже если задача отправки была отброшена до завершения, она некоторое время остается в parked_queue и ничего не блокирует. Как это умно!

0 голосов
/ 12 ноября 2018

Я сомневаюсь, что это работает в присутствии select !, потому что select! может привести к ложному уведомлению.

Нет, вы не можете "перепутать" канал mpsc, используя select!:

select! не вызывает никаких уведомлений, связанных с mspc, оно просто возвращает будущее, которое заканчивается первым.

Когда очередь сообщений заполнена, await!(recv.next()) уведомляет одного производителя о том, что слот в ограниченном канале теперь доступен.

Другими словами: нет true waiters и false waiters: когда очередь сообщений канала заполнена, блоки производителей и ожидают, что сторона получателя потребляет помещенные в очередь сообщения.

...