Очередь заданий с привязкой к заданию - PullRequest
9 голосов
/ 22 мая 2019

В настоящее время я сталкиваюсь с проблемой, для которой я почти уверен, что есть официальное название, но я не знаю, что искать в Интернете.Я надеюсь, что если я опишу проблему и решение, которое я имею в виду, кто-нибудь сможет сказать мне имя шаблона проектирования (если есть тот, который соответствует тому, что я собираюсь описать).

В основномя хочу иметь очередь заданий: у меня есть несколько клиентов, которые создают задания (издатели), и несколько работников, которые обрабатывают эти задания (потребители).Теперь я хочу распространить задания, созданные издателями, среди различных потребителей, что в принципе выполнимо, используя практически любую очередь сообщений с балансировкой нагрузки по очереди, например, используя RabbitMQ или даже MQTT 5.

Однако теперьусложняется ... каждая работа относится к внешнему объекту, скажем, пользователю.Я хочу, чтобы задания для одного пользователя обрабатывались по порядку, но параллельно для нескольких пользователей.У меня нет требования, чтобы задания для пользователя X всегда передавались работнику Y, поскольку они все равно должны обрабатываться последовательно.

Теперь я мог бы решить эту проблему с помощью RabbitMQ и его последовательного обмена хэшированием, но тогда у меня естьгонка данных, когда новые работники входят в кластер, потому что RabbitMQ не поддерживает перемещение заданий, которые уже находятся в очереди.

MQTT 5 также не поддерживает это: здесь эта идея известна как «липкие общие подписки».", но это не официально.Он может быть частью MQTT 6 или не может.Кто знает.

Я также взглянул на NSQ, NATS и некоторых других брокеров.Большинство из них даже не поддерживают этот очень специфический сценарий, и те, которые используют согласованное хеширование, которое имеет ранее упомянутую проблему гонки данных.

Теперь проблема исчезнет, ​​если брокер не будет сортироватьзаданий в очереди, как только задания поступят, но если он будет отслеживать, обрабатывается ли уже задание для определенного пользователя: если это так, он должен задерживать все остальные задания для этого пользователя, но все задания для других пользователей все равно должны обрабатываться.Это, AFAICS, невозможно при использовании RabbitMQ и др.

Я почти уверен, что я не единственный человек, у которого есть пример использования для этого.Я мог бы, например, подумать о пользователях, загружающих видео на видео платформу, и хотя загруженные видео обрабатываются параллельно, все видео, загруженные одним пользователем, обрабатываются последовательно.

Итак, если коротко, коротко:что я описываю под известным общим именем?Что-то типа распределенная очередь заданий ? Диспетчер задач со сходством задач ?Или что-нибудь еще?Я перепробовал много терминов, но не смог.Это может означать, что для этого нет решения, но, как уже было сказано, трудно представить, что я единственный человек на планете с этой проблемой.

Есть идеи, что я мог бы искать?И: Есть ли инструменты, которые реализуют это?Любые протоколы?

PS: Просто использовать предопределенный ключ маршрутизации не вариант, так как идентификаторы пользователей (которые я только что использовал в качестве готового примера здесь) в основном UUID, так что их может быть миллиардыМне нужно что-то более динамичное.Следовательно, последовательное хеширование является в основном правильным подходом, но, как уже было сказано, распределение должно работать по частям, а не заранее, чтобы избежать скачек данных.

Ответы [ 6 ]

1 голос
/ 31 мая 2019

Сложно требовать обработки заказа для каждой организации.

Как долго длится каждое опубликованное задание? Если они всегда очень короткие, вы можете распределять задачи по хэшам и просто истощать рабочий пул запущенных заданий каждый раз, когда он меняет форму, не теряя при этом значительной производительности.

Если они работают дольше, возможно, это будет слишком медленно. В этом случае вы могли бы также потенциально заставить сотрудников снимать атомарные консультативные блокировки из быстрой центральной службы (например, Redis или чего-то еще) для user_id каждой задачи, которую они потребляют, на время ее выполнения. Эта служба также может быть отдельно масштабируемой, разделенной на диапазоны идентификаторов пользователей или что-то еще. Если между получением задачи и первыми побочными эффектами от ее выполнения достаточно разрыва, работнику даже не потребуется блокировать успешное выполнение блокировки до тех пор, пока она не будет готова к выполнению, и, следовательно, он может не увидеть значительного увеличения задержка. Конфликт * может быть редким: если вы уже используете некоторую непротиворечивую схему хеширования для user_id для распределения работы, они действительно будут редкими и будут происходить только при изменении топологии рабочего пула. Вы должны по крайней мере использовать распределение хэширования, чтобы гарантировать, что за блокировку борются только два рабочих: старый и новый. **

Если предоставление блокировки обслуживалось в порядке очередности поступления запросов и блокировки запрашиваются быстрее, чем изменения топологии рабочего пула (то есть рабочие ставятся в очередь на блокировки, как только получают задание от издателя) это может даже дать вам довольно хорошие гарантии относительно порядка, даже если топология меняется довольно быстро.

редактирует:

* Я изначально написал «Неудачи»; не совсем то, что я имел в виду. Идея состоит в том, что эта служба блокировки почти никогда не столкнется с конфликтом блокировок, если не изменить топологию, поскольку задачи для данного пользователя всегда будут отправляться одному и тому же работнику в обычном режиме.

** Еще одна возможность: Вы также можете дать хорошие гарантии только с частичным стоком рабочего пула. Без консультативных блокировок на уровне пользователя, если вы используете согласованную схему хеширования для распределения задач и можете поддерживать низкий уровень для завершения отправленных задач, вы можете отложить запуск задач, целевой работник которых отличается от это было бы, когда запускалась самая старая из выполняемых в настоящее время задач (т. е. выполнялись пустые задачи только для пользователей, чей назначенный работник изменился). Это изрядное количество дополнительной сложности; если вы можете эффективно отследить нижнюю отметку и у вас нет длинных хвостов длительных задач, это может быть хорошим вариантом, позволяющим вам отказаться от службы блокировки. Однако на момент написания этой статьи мне не было ясно, будет ли это когда-нибудь дешевле замков; низкие отметки уровня воды обычно не дешевы для надежного внедрения, и смерть работника в неподходящее время может задержать обработку для всей когорты 1 / N, которая изменила работников, а не только пользователей, чьи задачи выполнялись в полете на работнике в время, когда оно умерло.

1 голос
/ 31 мая 2019

я хочу иметь очередь заданий: у меня есть несколько клиентов, которые создают задания (издатели), и несколько работников, которые обрабатывают эти задания (потребители).Теперь я хочу распространить задания, созданные издателями, среди различных потребителей, что в основном выполнимо, используя практически любую очередь сообщений с балансировкой нагрузки по очереди, например, используя RabbitMQ или даже MQTT 5.

Однако теперьусложняется ... каждая работа относится к внешнему объекту, скажем, пользователю.Я хочу, чтобы задания для одного пользователя обрабатывались по порядку, но параллельно для нескольких пользователей.У меня нет требования, чтобы задания для пользователя X всегда передавались работнику Y, поскольку они все равно должны обрабатываться последовательно.

Даже если это не был конкретный случай использования, я провел опрос (динамическое) планирование задач [ 0 ] [ 1 ] пару месяцев назад, и ничего подобного не появлялось.

Каждый алгоритм планирования, о котором я читал, имеет некоторые общие свойствако всем другим задачам, таким как приоритет, возраст, время постановки в очередь, имя задачи (и, соответственно, среднее время обработки).Если бы все ваши задачи были связаны с пользователем, вы могли бы построить планировщик, который учитывает user_id для выбора задачи из очереди.

Но я думаю, вы не хотитеСоздайте свой собственный планировщик, в любом случае это будет напрасно, потому что, исходя из опыта такой необходимости, существующие очереди сообщений позволяют выполнить ваше требование.

Чтобы обобщить ваши требования, вам необходимо:

Aпланировщик, который запускает только одну задачу для пользователя одновременно.

Решение состоит в том, чтобы использовать распределенную блокировку, например, REDIS distlock , и получить блокировку до запуска задачи иобновляйте его регулярно во время выполнения задачи.Если новое задание для того же пользователя входит и пытается выполнить его, оно не сможет получить блокировку и будет повторно поставлено в очередь.

Вот псевдокод:

def my_task(user_id, *args, **kwargs):
    if app.distlock(user_id, blocking=False):
        exec_my_task(user_id, *args, **kwargs)
    else:
        raise RetryTask()

Не забудьте обновить и освободить блокировку .

Аналогичный подход применяется для обеспечения задержки robots.txt между каждым запросом вИскатели.

0 голосов
/ 16 июня 2019

Cadence Workflow способен поддержать ваш сценарий использования с минимальными усилиями.

Вот соломенный дизайн, который удовлетворяет вашим требованиям:

  • Отправьте запрос signalWithStart рабочему процессу пользователя, используя ID пользователя в качестве идентификатора рабочего процесса. Он либо доставляет сигнал в рабочий процесс, либо сначала запускает рабочий процесс и передает ему сигнал.
  • Все запросы к этому рабочему процессу буферизуются им. Cadence дает гарантию, что в открытом состоянии может существовать только один рабочий процесс с данным ID. Таким образом, все сигналы (события) гарантированно буферизируются в рабочем процессе, принадлежащем пользователю.
  • Внутренний цикл событий рабочего процесса отправляет эти запросы один за другим.
  • Когда буфер пуст, рабочий процесс может быть завершен.

Вот код рабочего процесса, который реализует его в Java (также поддерживается Go-клиент):

public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

public interface TaskProcessorActivity {
    @ActivityMethod
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            processor.process(taskQueue.poll());
        }
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}

А затем код, который ставит эту задачу в рабочий процесс с помощью метода сигнала:

private void addTask(WorkflowClient cadenceClient, Task task) {
    // Set workflowId to userId
    WorkflowOptions options = new WorkflowOptions.Builder().setWorkflowId(task.getUserId()).build();
    // Use workflow interface stub to start/signal workflow instance
    SerializedExecutionWorkflow workflow = cadenceClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
    BatchRequest request = cadenceClient.newSignalWithStartRequest();
    request.add(workflow::execute);
    request.add(workflow::addTask, task);
    cadenceClient.signalWithStart(request);
}

Cadence предлагает множество других преимуществ по сравнению с использованием очередей для обработки задач.

  • Построены экспоненциальные повторы с неограниченным интервалом истечения
  • Обработка ошибок. Например, он позволяет выполнить задачу, которая уведомляет другую службу, если оба обновления не могут быть выполнены в течение заданного интервала.
  • Поддержка длительных операций сердцебиения
  • Возможность реализации сложных задачных зависимостей. Например, реализовать цепочку вызовов или логику компенсации в случае неисправимых сбоев ( SAGA )
  • Обеспечивает полную видимость текущего состояния обновления. Например, при использовании очередей все, что вы знаете, если в очереди есть несколько сообщений, и вам нужна дополнительная БД для отслеживания общего прогресса. С Cadence каждое событие записывается.
  • Возможность отмены обновления в полете.
  • Распределенная поддержка CRON

См. презентацию , которая охватывает модель программирования Cadence.

0 голосов
/ 31 мая 2019

Если я правильно понимаю ваш сценарий, я считаю, что описываемая вами функция очень похожа на то, как сеансы сообщений работают в служебная шина Azure .

В основном вы устанавливаете свойство SessionId сообщения на UserId, прежде чем помещать их в очередь.

Каждый потребитель будет блокировать сеанс, обрабатывающий сообщения одно за другим, и эти сообщения будут принадлежатьтот же пользователь.После этого потребитель может просто перейти к следующему доступному сеансу.

Кроме того, Функции Azure недавно выпустили поддержку сеансов служебной шины, которая находится в режиме предварительного просмотра, но позволяет достичь всего этого с помощьюочень мало усилий.

К сожалению, я недостаточно знаком, чтобы знать, существует ли эта функция в одном из альтернатив с открытым исходным кодом, но я надеюсь, что это поможет.

0 голосов
/ 29 мая 2019

Кафка может помочь, так как она хранит сообщения некоторое время, так что вы можете опросить их снова

0 голосов
/ 25 мая 2019

Мне удалось найти это обсуждение типа поведения, которое вы описываете, выполнив поиск "очереди заданий с упорядочением по категориям" .

К сожалению, похоже, у них нет решения вашей проблемы.

Существует ответ на предыдущий вопрос , в котором предлагается не использовать какой-либо сервис брокера сообщений для задач, чувствительных к заказу или бизнес-логики, по причинам, которыеможет или не может относиться к тому, что вы делаете.Он также указывает на технику, которая, кажется, может делать то, что вы пытаетесь сделать, но которая может не подходить для поставленной задачи.

Если бы у вас была опция stickiness , это решит вашу проблему аккуратно и с минимальной дополнительной неэффективностью.Конечно, липкость имеет свои собственные режимы отказа;нет никаких оснований полагать, что вы найдете реализацию, в которой были достигнуты именно те компромиссы, которые вы сделали.

Я полагаю, поскольку вы задали здесь вопрос о том, что последовательность для каждого пользователя важно .В приведенном вами примере обработки загрузок на видео-платформе нарушение последовательности не составит большого труда.В более широком смысле, большинству людей, которым нужны очереди заданий с большой пропускной способностью и нагрузкой, не нужны сильные гарантии в отношении порядка обработки данных.

Если вам в конечном итоге понадобится создать вещьсебя, у вас будет много вариантов.У меня сложилось впечатление, что вы ожидаете огромную пропускную способность, высокопараллельную архитектуру и низкую частоту коллизий идентификаторов пользователей .В этом случае вы можете рассмотреть вопрос о ведении списка предпосылок :
Когда появляется новая задача, балансировщик ищет все незавершенные, назначенные и еще не назначенные задания для любых, которые соответствуютключ задания (user_id).
Если существует существующее совпадение, то новое задание добавляется в список еще не назначенных, причем самое старое задание делится своим ключом в качестве обязательного условия.
Каждый раз, когда задание завершается,работник должен проверить еще не назначенный список, чтобы убедиться, что он только что выполнил чье-либо условие.Если это так, работник может либо пометить это дочернее задание для назначения, либо просто обработать само дочернее задание.
Конечно, у него есть свои собственные режимы сбоев;вам придется делать компромиссы.

...