Как я могу мгновенно получить результат моего произведенного события в kafka и kafka-streams? - PullRequest
1 голос
/ 28 октября 2019

Я упрощаю свою проблему по следующему сценарию: 3 друга делят карту лояльности. Карта имеет два ограничения:

  1. может использоваться максимум 10 раз (не имеет значения, какая карта используется, т.е. friend_a может использовать ее 10 раз.
  2. максимальное количество денег на карте составляет200. Таким образом, при 1 «событии» со значением = 200 карта «завершена».

Я использую производителя kafka, который отправляет события в кластере kafka следующим образом:

{"name": "friend_1", "value": 10}

{"name": "friend_3", "value": 20}

событияопубликовано в теме, связанной с потоком кафки, который группируется по ключу и выполняет агрегацию для суммирования потраченных денег. Кажется, это работает, однако я сталкиваюсь с «проблемой параллелизма»

Давайте представим, что карта используется9 раз, так что остается использовать только 1 раз, а общее количество потраченных денег составляет 190, что означает, что осталось потратить 10 единиц.

Итак, friend_2 хочет купить что-то, что стоит 11 единиц (что не должно бытьразрешено) и friend_3 хочет купить что-то, что стоит 9 единиц, которые должны бытьllowed. Friend_3 изменит состояние, используя карту в 10-й раз. Все другие будущие попытки не должны что-либо изменять.

Поэтому пользователю карты разумно знать, изменило ли отправленное им событие максимальное использованное число и общее количество. Как я могу сделать это в Кафке? Используя агрегацию потоков, я всегда могу увеличить значения, но как мне узнать, изменило ли мое действие «изменение состояния» карты?

ОБНОВЛЕНИЕ: пользователь карты должен немедленно получить отрицательный отзыв, если транзакция подтверждаетсяправило.

1 Ответ

0 голосов
/ 29 октября 2019

Насколько я понимаю ваш вопрос, есть несколько вариантов.

Один из вариантов - это получить новый поток после агрегации, который вы filter() для данных, которые будут изменять «состояние» карты,как фильтрация всех событий, которые потратили > 200 единиц или > 10 использований. Этот поток может затем использоваться для информирования пользователя (ей) карты о том, что карта была израсходована, например, путем отправки электронного письма. Этот подход может быть реализован исключительно с DSL.

Когда требуется большая гибкость или более жесткий контроль, другой вариант заключается в использовании Processor API (который можно интегрировать с DSL, чтобы большая часть кода могла продолжать использоватьDSL), где вы самостоятельно выполняете этап агрегирования (работая с хранилищами состояний, которые вы прикрепляете к Transformer или Processor). Во время агрегации вы можете реализовать логику, которая проверяет, является ли входящее событие действительным (в вашем примере: friend_3 с 9 единицами является действительным, friend_2 с 11 единицами не является). Если это верно, агрегация увеличивает счетчики карты (единицы и использования), и это все. Если оно недопустимо, событие отбрасывается и не изменяет счетчики, Transformer / Processor может отправлять новое событие в другой поток, который сообщает пользователю (-ам) карты, что что-то не работает. Вы также можете реализовать эту функцию, чтобы информировать пользователей о том, что карта полностью использовалась, что карта больше не используется, или о любом другом «изменении состояния» этой карты.

Кроме того, в зависимости от того, что вы хотитеОбратите внимание на функцию интерактивных запросов в Kafka Streams. Иногда другие приложения могут захотеть выполнить быстрый поиск точек (запросов) последнего состояния чего-либо, например, состояния карты, что можно сделать с помощью интерактивных запросов, например, с помощью REST API.

Надеюсь, это поможет!

...