Очереди не подходят для таких случаев использования.Я бы порекомендовал использовать Cadence Workflow для реализации вашей логики с минимальными усилиями.
Вот простой дизайн, который удовлетворяет вашим требованиям:
- Отправить сигналWithStartзапрос, содержащий информацию о редактировании рабочего процесса пользователя с использованием идентификатора пользователя в качестве идентификатора рабочего процесса.Он либо доставляет сигнал в рабочий процесс, либо сначала запускает рабочий процесс и передает ему сигнал.
- Все запросы к этому рабочему процессу буферизуются им.Cadence дает твердую гарантию того, что в открытом состоянии может существовать только один рабочий процесс с данным ID.Таким образом, все сигналы (события) гарантированно буферизируются в рабочем процессе, принадлежащем пользователю.
- После настроенного тайм-аута вызывается действие, уведомляющее пользователей о ожидающих изменениях.
- Ожидающие измененияприменяются следующим действием.
- Рабочий процесс завершен.
Вот код рабочего процесса, который реализует его в Java (также поддерживается клиент Go):
public interface BufferedEditsWorkflow {
@WorkflowMethod
void execute(String userId, Duration notifyAfter, Edit firstEdit);
@SignalMethod
void addEdit(Edit edit);
}
public interface BufferedEditsActivities {
void notifyUser(String userId, List<Edit> edits);
void process(String userId, List<Edit> edits);
}
public class BufferedEditsWorkflowImpl implements BufferedEditsWorkflow {
private final List<Edit> edits = new ArrayList<>();
private final BufferedEditsActivities activities = Workflow.newActivityStub(BufferedEditsActivities.class);
@Override
public void execute(String userId, Duration notifyAfter, Edit firstEdit)
{
edits.add(firstEdit);
// Cadence doesn't have limit on sleep duration.
// It can sleep at this line for a year with no problem.
Workflow.sleep(notifyAfter);
activities.notifyUser(userId, edits);
activities.process(userId, edits);
}
@Override
public void addEdit(Edit edit) {
edits.add(edit);
}
}
Код, который запускает рабочий процесс для первого редактирования:
private void addFirstEdit(WorkflowClient cadenceClient, Edit edit) {
WorkflowOptions options = new WorkflowOptions.Builder().setWorkflowId(edit.getUserId()).build();
BufferedEditsWorkflow workflow = cadenceClient.newWorkflowStub(BufferedEditsWorkflow.class, options);
workflow.execute(edit.getUserId(), Duration.ofHours(1), edit);
}
Код, который добавляет больше правок.
private void addEdit(WorkflowClient cadenceClient, Edit edit) {
WorkflowOptions options = new WorkflowOptions.Builder().setWorkflowId(edit.getUserId()).build();
BufferedEditsWorkflow workflow = cadenceClient.newWorkflowStub(BufferedEditsWorkflow.class, options);
workflow.addEdit(edit);
}
Cadence предлагает множество других преимуществ по сравнению с использованием очередей для обработки задач.
- Построен экспоненциальный повтор с неограниченным интервалом истечения
- Обработка ошибок.Например, он позволяет выполнить задачу, которая уведомляет другую службу, если оба обновления не могут быть выполнены успешно в течение заданного интервала.
- Поддержка длительных пульсирующих операций
- Возможность реализации сложных задачных зависимостей.Например, для реализации цепочки вызовов или логики компенсации в случае неисправимых сбоев ( SAGA )
- Обеспечивает полную видимость текущего состояния обновления.Например, при использовании очередей все, что вы знаете, если в очереди есть несколько сообщений, и вам нужна дополнительная БД для отслеживания общего прогресса.С Cadence каждое событие записывается.
- Возможность отмены обновления в полете.
См. презентацию , которая идет поверх модели программирования Cadence.