Мое приложение основано на фьючерсах с async / await и имеет следующую структуру в одном из своих компонентов:
- "менеджер", который отвечает за запуск / остановку / перезапуск "работников ", основанный как на внешнем входе, так и на текущем состоянии" работников ";
- динамический набор" работников "c, которые выполняют некоторую непрерывную работу, но могут выйти из строя или могут быть остановлены извне.
Рабочий - это просто порожденная задача , которая выполняет некоторую работу ввода-вывода. Внутренне это все oop, которое должно быть бесконечным, но оно может выйти рано из-за ошибок или других причин, и в этом случае менеджер должен перезапустить рабочий с нуля.
Менеджер реализован как al oop, который ожидает по нескольким каналам, включая один, возвращаемый async_std::stream::interval
, что по сути превращает менеджера в средство опроса - и действительно, мне это нужно, потому что мне нужно опросить некоторые Mutex
-защищенное внешнее состояние. Основываясь на этом состоянии, менеджер, помимо всего прочего, создает или уничтожает своих работников.
Кроме того, менеджер хранит набор async_std::task::JoinHandle
s, представляющих живых работников, и использует их ручками, чтобы проверить, вышли ли какие-либо рабочие, перезапуская их если так (Кстати, я делаю это в настоящее время, используя select(handle, future::ready())
, что является совершенно неоптимальным, поскольку оно опирается на детали реализации select
, в частности, на то, что сначала опрашивается левое будущее. Я не мог найти лучшего способа сделать это; что-то вроде race()
будет иметь больше смысла, но race()
потребляет оба фьючерса, что не сработает для меня, потому что я не хочу потерять JoinHandle
, если он не готов. однако важно другое.)
Вы можете видеть, что в этом дизайне рабочие могут быть перезапущены только тогда, когда произойдет следующий «тик» в менеджере. Однако я не хочу использовать слишком маленький интервал для опроса, потому что в большинстве случаев опрос просто тратит впустую циклы процессора. Однако большие интервалы могут слишком задерживать перезапуск отказавшего / отмененного работника, что приводит к нежелательным задержкам. Поэтому я решил настроить еще один канал на ()
s от каждого работника до менеджера, который я бы добавил главному менеджеру l oop, поэтому, когда работник останавливается из-за ошибки или иным образом, сначала он отправит сообщение на свой канал, в результате чего менеджер будет разбужен раньше следующего опроса, чтобы сразу же перезапустить работника.
К сожалению, с любыми типами каналов это может привести к больше опросов, чем необходимо, в случае, если два или более рабочих остановятся примерно в одно и то же время (что, в силу характера моего заявления, скорее всего, произойдет). В таком случае имеет смысл запускать менеджер l oop только один раз, обрабатывая всех остановленных рабочих, но с каналами это обязательно приведет к числу опросов, равному количеству остановленных рабочих, даже если дополнительные опросы не ничего не делать.
Поэтому мой вопрос: как я могу уведомить менеджера от его работников, что они закончили, не приводя к дополнительным опросам менеджера? Я пробовал следующие вещи:
- Как объяснено выше, обычные неограниченные каналы просто не будут работать.
- Я думал, что, возможно, ограниченные каналы могут работать - если я использовал канал с емкостью 0, и был способ попытаться отправить в него сообщение, но просто отбросить сообщение, если канал заполнен (например, метод
offer()
в Java BlockingQueue
), это, казалось бы, решило проблему. К сожалению, API каналов, хотя и предоставляет такой метод (похоже, try_send()
), также имеет свойство иметь емкость, большую или равную количеству отправителей, что означает, что он не может быть использован для таких целей. уведомления. - Какой-то атоми c или булевский флаг, защищенный мьютексом, также выглядят так, как будто он может работать, но нет ни атома c, ни мьютекса API, который бы предоставлял будущее, которое нужно ждать, и также потребует опроса.
- Перестройте реализацию менеджера, чтобы каким-то образом включить
JoinHandle
s в основную select
. Это могло бы сработать, но это привело бы к большому рефакторингу, который я не хотел бы делать в этот момент. Если есть способ сделать то, что я хочу, без этого рефакторинга, я хотел бы сначала использовать его. - Полагаю, может сработать какая-то комбинация атомов и каналов, что-то вроде установки атоми c Пометить и отправить сообщение, а затем пропустить любые дополнительные уведомления в диспетчере на основе этого флага (который переключается обратно после отключения одного уведомления), но это также кажется сложным подходом, и мне интересно, возможно ли что-нибудь попроще.