Как составить проекцию агрегатного состояния в К SQL? - PullRequest
0 голосов
/ 19 февраля 2020

Как я понял, прочитав эту статью https://blog.softwaremill.com/event-sourcing-using-kafka-53dfd72ad45d, мы можем создавать проекции (или представления) агрегатных состояний, используя ksqldb, но я столкнулся с проблемами и не вижу путей их решения.

Например, у меня есть events topi c, который содержит все события для всех типов агрегатов в моей системе - это мое хранилище событий. Тогда как создать проекцию (таблицу в терминах k sql) в ksqdb для совокупности user ? Каждое событие имеет уникальную структуру с собственным набором полей, например:

aggregate: user, aggregateId: 1, event: updatedUserPassword; fields: password;
aggregate: user, aggregateId: 1, event: updatedUserFirstName; fields: firstName;
aggregate: user, aggregateId: 1, event: updateBalance; fields: currentBalance;

Как я пытаюсь построить таблицу для user :

CREATE STREAM events (aggregateId VARCHAR, aggregate VARCHAR, firstName VARCHAR, password VARCHAR, balance: VARCHAR, messagesCount INTEGER) WITH (KAFKA_TOPIC='events', VALUE_FORMAT='JSON');
CREATE STREAM users WITH (KAFKA_TOPIC='users') AS SELECT * FROM events WHERE aggregate = 'user';

Какой шаг I делать дальше? Как создать проекцию, которая будет содержать все фактические состояния пользователей, сгруппированные по их aggregateId, который я могу затем сохранить во внешней БД?

Я в замешательстве: (

Я чувствую, что пытаюсь использовать не так, как задумано ...

Заранее спасибо.

...