Сервер очередей, который позволяет глобальный уровень потребления для всех работников - PullRequest
0 голосов
/ 23 июня 2019

У меня много задач, которые должны выполнять мои серверы.эти задачи должны выполняться с определенной заданной скоростью из-за ограничения скорости вызовов API, которому должны соответствовать сотрудники.

Чтобы гарантировать, что эти задачи не будут выполняться со скоростью, превышающей пределы скорости API, яхотел бы иметь возможность настроить скорость, с которой очередь отправляет сообщения для обработки.

Кроме того, эта очередь должна поддерживать порядок отправляемых сообщений и освобождать их в порядке FIFO, чтобы обеспечить честность.

Наконец, было бы здорово, если бы при кодировании это было бы прозрачно, когдаиспользуется для того, чтобы клиент отправлял сообщение в очередь через API-интерфейс, и тот же клиент получит обратно сообщение после его освобождения в очереди в соответствии с рабочей скоростью и соответствующим порядком.например, используя RxJava

waitForMessageToBeReleased(message, queue)
     .subscribe(message -> // do some stuff)  // message received to the same 
client after it was released by the queue according to the defined work rate.

В настоящее время я использую Redis для управления скоростью выполнения, создавая переменную с определенным количеством TTL, и другие вызовы ждут, пока эта переменная не истечет.Однако он не обрабатывает заказы и может привести к тому, что клиенты начнут голодать в случае высокой нагрузки.

1 Ответ

0 голосов
/ 24 июня 2019

Cadence Workflow способен поддержать ваш сценарий использования с минимальными усилиями.

Вот соломенный дизайн, который удовлетворяет вашим требованиям:

  • Отправить сигналWithStart запросдля рабочего процесса пользователя, используя идентификатор пользователя в качестве идентификатора рабочего процесса.Он либо доставляет сигнал в рабочий процесс, либо сначала запускает рабочий процесс и передает ему сигнал.
  • Все запросы к этому рабочему процессу буферизуются им.Cadence дает гарантию, что в открытом состоянии может существовать только один рабочий процесс с данным ID.Таким образом, все сигналы (события) гарантированно буферизируются в рабочем процессе, принадлежащем пользователю.
  • Внутренний цикл событий рабочего процесса отправляет эти запросы один за другим.
  • Когда буфер является пустым рабочим процессомможно завершить.

Вот код рабочего процесса, который реализует его в Java (также поддерживается Go-клиент):

public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

public interface TaskProcessorActivity {
    @ActivityMethod
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            processor.process(taskQueue.poll());
        }
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}

А затем код, который ставит эту задачу в рабочий процессМетод сквозного сигнала:

private void addTask(WorkflowClient cadenceClient, Task task) {
    // Set workflowId to userId
    WorkflowOptions options = new WorkflowOptions.Builder().setWorkflowId(task.getUserId()).build();
    // Use workflow interface stub to start/signal workflow instance
    SerializedExecutionWorkflow workflow = cadenceClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
    BatchRequest request = cadenceClient.newSignalWithStartRequest();
    request.add(workflow::execute);
    request.add(workflow::addTask, task);
    cadenceClient.signalWithStart(request);
}

Cadence предлагает множество других преимуществ по сравнению с использованием очередей для обработки задач.

  • Построен экспоненциальный повтор с неограниченным интервалом истечения
  • Обработка ошибок.Например, он позволяет выполнить задачу, которая уведомляет другую службу, если оба обновления не могут быть выполнены успешно в течение заданного интервала.
  • Поддержка длительных пульсирующих операций
  • Возможность реализации зависимостей сложной задачи.Например, для реализации цепочки вызовов или логики компенсации в случае неисправимых сбоев ( SAGA )
  • Обеспечивает полную видимость текущего состояния обновления.Например, при использовании очередей все, что вы знаете, если в очереди есть несколько сообщений, и вам нужна дополнительная БД для отслеживания общего прогресса.С Cadence каждое событие записывается.
  • Возможность отмены обновления в полете.
  • Распределенная поддержка CRON

См. презентация , которая идетМодель программирования Cadence.

...