Уведомлять подписчиков, когда новые сообщения перестали поступать - PullRequest
1 голос
/ 22 мая 2019

В приложении, где пользователи должны внести несколько изменений за короткий промежуток времени, я хотел бы использовать очередь сообщений для сбора этих событий и только уведомлять слушателей, когда новые изменения перестают поступать в течение некоторого периода X.

Ожидаемый рабочий процесс:

  • Пользователь выполняет редактирование -> сообщение добавлено в очередь
  • Пользователь выполняет другое редактирование -> сообщение добавляется в очередь
  • Некоторое время проходит
  • Потребитель уведомляется обо всех ожидающих изменениях

Я изучил документацию по нескольким различным очередям сообщений, но, похоже, ни одна из них не имеет такого родасообщения пакетные из коробки.

Я нашел некоторые функции, которые могли бы помочь мне, например, у Кафки есть конфигурация производителя, называемая linger, которая говорит ему ждать X мсек, пока больше сообщений не будет добавлено в пакет, но это явно предназначено какулучшение производительности.Кроме того, эта опция на стороне производителя, в то время как для моего варианта использования она будет иметь больше смысла на стороне потребителя.

Может ли эта очередь сообщений поддержки сценария поддержки использоваться?Отсутствие результатов заставляет меня думать, что я, возможно, пытаюсь использовать очереди сообщений неправильно.

1 Ответ

0 голосов
/ 16 июня 2019

Очереди не подходят для таких случаев использования.Я бы порекомендовал использовать Cadence Workflow для реализации вашей логики с минимальными усилиями.

Вот простой дизайн, который удовлетворяет вашим требованиям:

  • Отправить сигналWithStartзапрос, содержащий информацию о редактировании рабочего процесса пользователя с использованием идентификатора пользователя в качестве идентификатора рабочего процесса.Он либо доставляет сигнал в рабочий процесс, либо сначала запускает рабочий процесс и передает ему сигнал.
  • Все запросы к этому рабочему процессу буферизуются им.Cadence дает твердую гарантию того, что в открытом состоянии может существовать только один рабочий процесс с данным ID.Таким образом, все сигналы (события) гарантированно буферизируются в рабочем процессе, принадлежащем пользователю.
  • После настроенного тайм-аута вызывается действие, уведомляющее пользователей о ожидающих изменениях.
  • Ожидающие измененияприменяются следующим действием.
  • Рабочий процесс завершен.

Вот код рабочего процесса, который реализует его в Java (также поддерживается клиент Go):

public interface BufferedEditsWorkflow {

    @WorkflowMethod
    void execute(String userId, Duration notifyAfter, Edit firstEdit);

    @SignalMethod
    void addEdit(Edit edit);
}

public interface BufferedEditsActivities {
    void notifyUser(String userId, List<Edit> edits);
    void process(String userId, List<Edit> edits);
}

public class BufferedEditsWorkflowImpl implements BufferedEditsWorkflow {

    private final List<Edit> edits = new ArrayList<>();
    private final BufferedEditsActivities activities = Workflow.newActivityStub(BufferedEditsActivities.class);

    @Override
    public void execute(String userId, Duration notifyAfter, Edit firstEdit) 
    {
        edits.add(firstEdit);
        // Cadence doesn't have limit on sleep duration.
        // It can sleep at this line for a year with no problem.
        Workflow.sleep(notifyAfter);
        activities.notifyUser(userId, edits);
        activities.process(userId, edits);
    }

    @Override
    public void addEdit(Edit edit) {
        edits.add(edit);
    }

}

Код, который запускает рабочий процесс для первого редактирования:

private void addFirstEdit(WorkflowClient cadenceClient, Edit edit) {
    WorkflowOptions options = new WorkflowOptions.Builder().setWorkflowId(edit.getUserId()).build();
    BufferedEditsWorkflow workflow = cadenceClient.newWorkflowStub(BufferedEditsWorkflow.class, options);
    workflow.execute(edit.getUserId(), Duration.ofHours(1), edit);
}

Код, который добавляет больше правок.

private void addEdit(WorkflowClient cadenceClient, Edit edit) {
    WorkflowOptions options = new WorkflowOptions.Builder().setWorkflowId(edit.getUserId()).build();
    BufferedEditsWorkflow workflow = cadenceClient.newWorkflowStub(BufferedEditsWorkflow.class, options);
    workflow.addEdit(edit);
}

Cadence предлагает множество других преимуществ по сравнению с использованием очередей для обработки задач.

  • Построен экспоненциальный повтор с неограниченным интервалом истечения
  • Обработка ошибок.Например, он позволяет выполнить задачу, которая уведомляет другую службу, если оба обновления не могут быть выполнены успешно в течение заданного интервала.
  • Поддержка длительных пульсирующих операций
  • Возможность реализации сложных задачных зависимостей.Например, для реализации цепочки вызовов или логики компенсации в случае неисправимых сбоев ( SAGA )
  • Обеспечивает полную видимость текущего состояния обновления.Например, при использовании очередей все, что вы знаете, если в очереди есть несколько сообщений, и вам нужна дополнительная БД для отслеживания общего прогресса.С Cadence каждое событие записывается.
  • Возможность отмены обновления в полете.

См. презентацию , которая идет поверх модели программирования Cadence.

...