Выполнить код, когда два предыдущих события были обработаны (Apache Kafka) - PullRequest
0 голосов
/ 13 января 2020

Я новичок в Apache Кафка и Spring Boot. Я пытаюсь создать прослушиватель Spring Boot, который генерирует новое событие, только когда получены два указанных c сообщения (отправленных через Apache Kafka) (для определенного ресурса).

Очевидное решение использовать базу данных для изменения состояния ресурса при появлении первого события и выполнения кода при наступлении второго события (если клиент находится в правильном состоянии в базе данных). В этом случае я беспокоюсь, если оба события прибывают одновременно.

Есть ли способ агрегировать оба сообщения в Spring Boot / Apache Кафка, вместо этого сделать это вручную?

Спасибо.

1 Ответ

0 голосов
/ 13 января 2020

Вы можете сделать это с потоками кафки . Пример топологии:

  1. входной поток (ключ / значение из ввода topi c A)
  2. filter (фильтрация по типу события например)
  3. groupBy (группировать события по ключу или некоторому полю)
  4. агрегировать (агрегировать события в новую структуру данных)
  5. фильтр (проверить, завершено ли агрегирование)
  6. map (создать новое выходное событие со значениями агрегирования)
  7. поток вывода (ключ / значение для topi c B)

    Проверьте подробности в официальном do c: https://kafka.apache.org/24/documentation/streams/developer-guide/dsl-api.html#creating -source-streams-from-kafka

...