Дальнейшее изучение исходного кода futures
решило мою проблему. Наконец, я не могу перепутать mpsc таким образом.
Дело в том, что размер mpsc
является гибким и может вырасти больше, чем изначально указано. Это поведение упоминается в документах :
Пропускная способность канала равна buffer + num-senders
. Другими словами, каждый отправитель получает гарантированный интервал в пропускной способности канала, и, кроме того, есть буферные слоты «первым пришел, первым обслужен», доступный для всех отправителей.
Да, я сначала прочитал это, прежде чем экспериментировать, но я не мог понять важность этого в то время.
Проблема с фиксированным буфером
Подумайте о типичной реализации ограниченной очереди, в которой размер очереди не может быть больше, чем указано изначально. Спецификация такая:
- Когда очередь пуста, получатели блокируются.
- Когда очередь заполнена (то есть размер достигает границы), блоки отправителей.
В этой ситуации, если очередь заполнена, несколько отправителей ждут одного ресурса (размер очереди).
В многопоточном программировании это достигается примитивами типа notify_one
. Однако в futures
это ошибочно: в отличие от многопоточного программирования, уведомленная задача не обязательно использует ресурс , поскольку задача, возможно, уже прекратила получение ресурса (из-за таких конструкций, как select!
или Deadline
) Тогда спецификация просто нарушается (очередь не заполнена, но все живые отправители блокируются).
mpsc
гибкий
Как указано выше, размер буфера для futures::channel::mpsc::channel
не является строгим . Спецификация обобщается как:
- Когда
message_queue.len() == 0
, блоки приемников.
- Когда
message_queue.len() >= buffer
, отправители могут блокировать.
- Когда
message_queue.len() >= buffer + num_senders
, отправители блокируются.
Здесь num_senders
- это , в основном , число клонов Sender
, но больше, чем в некоторых случаях. Точнее, num_senders
- это число SenderTask
с.
Итак, как нам избежать совместного использования ресурсов? Для этого у нас есть дополнительные состояния:
- Каждый отправитель (экземпляр
SenderTask
) имеет is_parked
логическое состояние.
- Канал имеет другую очередь с именем
parked_queue
, очередь из Arc
ссылается на SenderTask
.
Канал поддерживает следующие инварианты:
message_queue.len() <= buffer + num_parked_senders
. Обратите внимание, что мы не знаем значение num_parked_senders
.
parked_queue.len() == min(0, message_queue.len() - buffer)
- У каждого припаркованного отправителя есть по крайней мере одно сообщение в
parked_queue
.
Это достигается с помощью следующего алгоритма:
- Для получения,
- он выскакивает
SenderTask
с parked_queue
и, если отправитель припаркован, распакуйте его.
- Для отправки,
- Он всегда ждет, пока
is_parked
будет false
. Если message_queue.len() < buffer
, как parked_queue.len() == 0
, все отправители отменяются. Поэтому мы можем гарантировать прогресс в этом случае.
- Если
is_parked
равно false
, все равно отправить сообщение в очередь.
- После этого, если
message_queue.len() <= buffer
, ему больше ничего не нужно делать.
- если
message_queue.len() > buffer
, отправитель становится непаркованным и толкается на parked_queue
.
Вы можете легко проверить, поддерживается ли инвариант в приведенном выше алгоритме.
Удивительно, но отправители больше не ждут общего ресурса . Вместо этого отправитель ожидает своего состояния is_parked
. Даже если задача отправки была отброшена до завершения, она некоторое время остается в parked_queue
и ничего не блокирует. Как это умно!