Как сбалансировать несколько очередей сообщений - PullRequest
0 голосов
/ 20 июня 2019

У меня есть задача, которая потенциально долго выполняется (часы). Задача выполняется несколькими работниками (экземпляры AWS ECS в моем случае), которые читают из очереди сообщений (в моем случае AWS SQS). У меня есть несколько пользователей, добавляющих сообщения в очередь. Проблема в том, что если Боб добавляет 5000 сообщений в очередь, этого достаточно для того, чтобы рабочие были заняты в течение 3 дней, Алиса приходит и хочет обработать 5 задач, Алиса должна будет ждать 3 дня, прежде чем любая из задач Алисы даже запустится.

Я хотел бы с одинаковой скоростью подавать сообщения рабочим от Алисы и Боба, как только Алиса отправляет задания.

Я решил эту проблему в другом контексте, создав несколько очередей (вложенных очередей) для каждого пользователя (или даже каждой партии, отправляемой пользователем) и чередуя все вложенные очереди, когда потребитель запрашивает следующее сообщение.

Кажется, по крайней мере в моем мире, это общая проблема, и мне интересно, знает ли кто-нибудь об устоявшемся способе ее решения.

Я не вижу решения с ActiveMQ. Я немного посмотрел на Кафку с его способностью разбивать разделы по теме, и это может сработать. Прямо сейчас я реализую что-то, используя Redis.

1 Ответ

0 голосов
/ 20 июня 2019

Я бы рекомендовал Cadence Workflow вместо очередей, поскольку он поддерживает длительные операции и управление состоянием из коробки.

В вашем случае я бы создал экземпляр рабочего процесса для каждого пользователя.Каждое новое задание будет отправлено в рабочий процесс пользователя через сигнальный API.Затем экземпляр рабочего процесса помещает в очередь полученные задачи и выполняет их одну за другой.

Вот схема реализации:

public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

public interface TaskProcessorActivity {
    @ActivityMethod
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            processor.process(taskQueue.poll());
        }
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}

И затем код, который ставит эту задачу в очередь.рабочий процесс с использованием метода сигналов:

private void addTask(WorkflowClient cadenceClient, Task task) {
    // Set workflowId to userId
    WorkflowOptions options = new WorkflowOptions.Builder().setWorkflowId(task.getUserId()).build();
    // Use workflow interface stub to start/signal workflow instance
    SerializedExecutionWorkflow workflow = cadenceClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
    BatchRequest request = cadenceClient.newSignalWithStartRequest();
    request.add(workflow::execute);
    request.add(workflow::addTask, task);
    cadenceClient.signalWithStart(request);
}

Cadence предлагает множество других преимуществ по сравнению с использованием очередей для обработки задач.

  • Построены экспоненциальные повторные попытки с неограниченным интервалом истечения
  • Обработка ошибок.Например, он позволяет выполнить задачу, которая уведомляет другую службу, если оба обновления не могут быть выполнены успешно в течение заданного интервала.
  • Поддержка длительных пульсирующих операций
  • Возможность реализации зависимостей сложной задачи.Например, для реализации цепочки вызовов или логики компенсации в случае неисправимых сбоев ( SAGA )
  • Обеспечивает полную видимость текущего состояния обновления.Например, при использовании очередей все, что вы знаете, если в очереди есть несколько сообщений, и вам нужна дополнительная БД для отслеживания общего прогресса.С Cadence каждое событие записывается.
  • Возможность отмены обновления в полете.

См. презентацию , которая идет поверх модели программирования Cadence.

...