Как я понял, прочитав эту статью https://blog.softwaremill.com/event-sourcing-using-kafka-53dfd72ad45d, мы можем создавать проекции (или представления) агрегатных состояний, используя ksqldb, но я столкнулся с проблемами и не вижу путей их решения.
Например, у меня есть events topi c, который содержит все события для всех типов агрегатов в моей системе - это мое хранилище событий. Тогда как создать проекцию (таблицу в терминах k sql) в ksqdb для совокупности user ? Каждое событие имеет уникальную структуру с собственным набором полей, например:
aggregate: user, aggregateId: 1, event: updatedUserPassword; fields: password;
aggregate: user, aggregateId: 1, event: updatedUserFirstName; fields: firstName;
aggregate: user, aggregateId: 1, event: updateBalance; fields: currentBalance;
Как я пытаюсь построить таблицу для user :
CREATE STREAM events (aggregateId VARCHAR, aggregate VARCHAR, firstName VARCHAR, password VARCHAR, balance: VARCHAR, messagesCount INTEGER) WITH (KAFKA_TOPIC='events', VALUE_FORMAT='JSON');
CREATE STREAM users WITH (KAFKA_TOPIC='users') AS SELECT * FROM events WHERE aggregate = 'user';
Какой шаг I делать дальше? Как создать проекцию, которая будет содержать все фактические состояния пользователей, сгруппированные по их aggregateId, который я могу затем сохранить во внешней БД?
Я в замешательстве: (
Я чувствую, что пытаюсь использовать не так, как задумано ...
Заранее спасибо.