Уведомление задачи из нескольких других задач без дополнительной работы - PullRequest
1 голос
/ 19 января 2020

Мое приложение основано на фьючерсах с async / await и имеет следующую структуру в одном из своих компонентов:

  1. "менеджер", который отвечает за запуск / остановку / перезапуск "работников ", основанный как на внешнем входе, так и на текущем состоянии" работников ";
  2. динамический набор" работников "c, которые выполняют некоторую непрерывную работу, но могут выйти из строя или могут быть остановлены извне.

Рабочий - это просто порожденная задача , которая выполняет некоторую работу ввода-вывода. Внутренне это все oop, которое должно быть бесконечным, но оно может выйти рано из-за ошибок или других причин, и в этом случае менеджер должен перезапустить рабочий с нуля.

Менеджер реализован как al oop, который ожидает по нескольким каналам, включая один, возвращаемый async_std::stream::interval, что по сути превращает менеджера в средство опроса - и действительно, мне это нужно, потому что мне нужно опросить некоторые Mutex -защищенное внешнее состояние. Основываясь на этом состоянии, менеджер, помимо всего прочего, создает или уничтожает своих работников.

Кроме того, менеджер хранит набор async_std::task::JoinHandle s, представляющих живых работников, и использует их ручками, чтобы проверить, вышли ли какие-либо рабочие, перезапуская их если так (Кстати, я делаю это в настоящее время, используя select(handle, future::ready()), что является совершенно неоптимальным, поскольку оно опирается на детали реализации select, в частности, на то, что сначала опрашивается левое будущее. Я не мог найти лучшего способа сделать это; что-то вроде race() будет иметь больше смысла, но race() потребляет оба фьючерса, что не сработает для меня, потому что я не хочу потерять JoinHandle, если он не готов. однако важно другое.)

Вы можете видеть, что в этом дизайне рабочие могут быть перезапущены только тогда, когда произойдет следующий «тик» в менеджере. Однако я не хочу использовать слишком маленький интервал для опроса, потому что в большинстве случаев опрос просто тратит впустую циклы процессора. Однако большие интервалы могут слишком задерживать перезапуск отказавшего / отмененного работника, что приводит к нежелательным задержкам. Поэтому я решил настроить еще один канал на () s от каждого работника до менеджера, который я бы добавил главному менеджеру l oop, поэтому, когда работник останавливается из-за ошибки или иным образом, сначала он отправит сообщение на свой канал, в результате чего менеджер будет разбужен раньше следующего опроса, чтобы сразу же перезапустить работника.

К сожалению, с любыми типами каналов это может привести к больше опросов, чем необходимо, в случае, если два или более рабочих остановятся примерно в одно и то же время (что, в силу характера моего заявления, скорее всего, произойдет). В таком случае имеет смысл запускать менеджер l oop только один раз, обрабатывая всех остановленных рабочих, но с каналами это обязательно приведет к числу опросов, равному количеству остановленных рабочих, даже если дополнительные опросы не ничего не делать.

Поэтому мой вопрос: как я могу уведомить менеджера от его работников, что они закончили, не приводя к дополнительным опросам менеджера? Я пробовал следующие вещи:

  1. Как объяснено выше, обычные неограниченные каналы просто не будут работать.
  2. Я думал, что, возможно, ограниченные каналы могут работать - если я использовал канал с емкостью 0, и был способ попытаться отправить в него сообщение, но просто отбросить сообщение, если канал заполнен (например, метод offer() в Java BlockingQueue), это, казалось бы, решило проблему. К сожалению, API каналов, хотя и предоставляет такой метод (похоже, try_send()), также имеет свойство иметь емкость, большую или равную количеству отправителей, что означает, что он не может быть использован для таких целей. уведомления.
  3. Какой-то атоми c или булевский флаг, защищенный мьютексом, также выглядят так, как будто он может работать, но нет ни атома c, ни мьютекса API, который бы предоставлял будущее, которое нужно ждать, и также потребует опроса.
  4. Перестройте реализацию менеджера, чтобы каким-то образом включить JoinHandle s в основную select. Это могло бы сработать, но это привело бы к большому рефакторингу, который я не хотел бы делать в этот момент. Если есть способ сделать то, что я хочу, без этого рефакторинга, я хотел бы сначала использовать его.
  5. Полагаю, может сработать какая-то комбинация атомов и каналов, что-то вроде установки атоми c Пометить и отправить сообщение, а затем пропустить любые дополнительные уведомления в диспетчере на основе этого флага (который переключается обратно после отключения одного уведомления), но это также кажется сложным подходом, и мне интересно, возможно ли что-нибудь попроще.

1 Ответ

0 голосов
/ 26 января 2020

Я рекомендую использовать тип FuturesUnordered из ящика futures. Эта коллекция позволяет вам собрать sh много фьючерсов одного типа в коллекцию и ждать завершения любого из них одновременно.

Он реализует Stream, поэтому, если вы импортируете StreamExt, вы можете использовать unordered.next() для получения будущего, которое завершается после завершения любого будущего в коллекции.

Если вам также нужно подождать тайм-аут или мьютекс и т. Д. c., Вы можно использовать select, чтобы создать будущее, которое завершится, как только истечет время ожидания или один из дескрипторов соединения. Будущее, возвращаемое next(), реализует Unpin, поэтому его можно использовать с select без проблем.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...