Как сохранить агрегат / чтение модели из «EventStore» в базе данных? - PullRequest
3 голосов
/ 19 марта 2020

Попытка реализовать Event Sourcing и CQRS впервые, но застряла, когда дело дошло до сохранения агрегатов.

Вот где я сейчас нахожусь

  1. I настроил "EventStore" в потоке, "foos"
  2. Подключен к нему с node-eventstore-client
  3. Я подписываюсь на события с catchup

Это все работает нормально.

С помощью функции обработчика событий eventAppeared я могу создавать агрегат всякий раз, когда происходят события. Это замечательно, но что мне с этим делать?

Допустим, я строю и собираю список Foos

    [
      {
        id: 'some aggregate uuidv5 made from barId and bazId',
        barId: 'qwe',
        bazId: 'rty',
        isActive: true,
        history: [
          {
            id: 'some event uuid',
            data: {
              isActive: true,
            },
            timestamp: 123456788,
            eventType: 'IsActiveUpdated'
          }
          {
            id: 'some event uuid',
            data: {
              barId: 'qwe',
              bazId: 'rty',
            },
            timestamp: 123456789,
            eventType: 'FooCreated'
          }
        ]
      }
    ]

Чтобы следовать CQRS, я построю вышеуказанный агрегат внутри Читать модель, верно? Но как мне сохранить этот агрегат в базе данных?

Полагаю, для этого достаточно базы данных без sql, но я определенно нуждаюсь в БД, так как я поставлю gRP C APi перед этой и других читаемых моделей / агрегатов.

Но что я на самом деле go с того момента, когда я построил агрегат, до того, когда я сохранил его в БД?

Однажды я попытался следовать это руководство https://blog.insiderattack.net/implementing-event-sourcing-and-cqrs-pattern-with-mongodb-66991e7b72be, которое было очень простым, поскольку вы будете использовать mongodb как хранилище событий и просто создадите представление для агрегата и обновите его, когда поступят новые события. У него были fl aws и ограничения (конвейер агрегации), поэтому я теперь обратился к «EventStore» для части хранилища событий.

Но как сохранить агрегат, который в данный момент только что собран и хранится в коде / памяти из событий в "EventStore" ...?

Мне кажется, это глупый вопрос, но Мне нужно l oop для каждого элемента в массиве и вставить каждый элемент в таблицу / коллекцию БД, или у вас есть какой-то способ вывести туда весь массив / агрегат сразу?

Что происходит после ? Вы создаете материализованное представление для агрегата и запрашиваете его?

Я открыт для выбора наилучшего БД для этого, будь то postgres / other rdbms, mongodb, cassandra, redis, хранилище таблиц и т. Д. c.

Последний вопрос. Пока я просто использую один поток «foos», но на этом уровне я ожидаю, что новые события будут происходить довольно часто (каждые пару секунд или около того), но, насколько я понимаю, вы все равно сохраните его и обновите его с использованием материализованного взгляды правильно?

Таким образом, учитывая, что barId и bazId в комбинации могут использоваться для группировки событий, вместо одного потока, я бы подумал, что более специализированные потоки, такие как foos-barId-bazId, будут подходить к go, чтобы попытайтесь уменьшить частоту входящих новых событий до такой степени, что воссоздание материализованных представлений будет иметь смысл.

Существует ли общее правило, гласящее, что не следует повторно создавать / обновлять / обновлять sh материализованные представления при частоте обновления становится ниже определенного предела? Тогда единственной альтернативой будет запрос из обычной таблицы / коллекции?

Редактировать:

В конце я пытаюсь сделать gRP C API, который имеет только 2 rpcs - один для получения одного foo по id и один для получения всех foos (с необязательным полем для фильтрации по статусу - но это не так важно). Упрощенный прото будет выглядеть примерно так:

rpc GetFoo(FooRequest) returns (Foo)
rpc GetFoos(FoosRequest) returns (FooResponse)

message FooRequest {
    string id = 1; // uuid
}

// If the optional status field is not specified, return all foos
message FoosRequest {
    // If this field is specified only return the Foos that has isActive true or false
    FooStatus status = 1;

    enum FooStatus {
        UNKNOWN = 0;
        ACTIVE = 1;
        INACTIVE = 2;
    }
}

message FoosResponse {
    repeated Foo foos;
}

message Foo {
    string id = 1; // uuid
    string bar_id = 2 // uuid
    string baz_id = 3 // uuid
    boolean is_active = 4;
    repeated Event history = 5;
    google.protobuf.Timestamp last_updated = 6;
}

message Event {
    string id = 1; // uuid
    google.protobuf.Any data = 2;
    google.protobuf.Timestamp timestamp = 3;
    string eventType = 4;
}

Входящие события будут выглядеть примерно так:

{
  id: 'some event uuid',
  barId: 'qwe',
  bazId: 'rty',
  timestamp: 123456789,
  eventType: 'FooCreated'
}

{
  id: 'some event uuid',
  isActive: true,
  timestamp: 123456788,
  eventType: 'IsActiveUpdated'
}

Как вы можете видеть, нет uuid, чтобы сделать возможным GetFoo (uuid) в API gRP C, поэтому я сгенерирую uuidv5 с barId и bazId, которые будут объединены, и будут действительным uuid. Я делаю это в проекции / агрегате, который вы видите выше.

Также GetFoos rp c либо вернет все foos (если поле состояния оставлено неопределенным), либо, в качестве альтернативы, вернет foo, что Имеет isActive, который соответствует полю состояния (если указано).

Пока я не могу понять, как продолжить работу с обработчиком подписки при получении.

У меня есть события, хранящиеся в «EventStore» (https://eventstore.com/), используя подписку с перехватом, я построил агрегат / проекцию с массивом Foo в той форме, в которой я их хочу, но чтобы иметь возможность получить один Foo по идентификатору из моего API gRP C, я думаю, мне нужно будет сохранить всю эту совокупность / проекцию в какой-то базе данных, чтобы я мог подключаться и получать данные из API gRP C? И каждый раз, когда приходит новое событие, мне нужно добавить это событие в базу данных или как это работает?

Я думаю, что я прочитал каждый ресурс, который я могу найти в inte rnet , но все же я упускаю некоторые ключевые сведения, чтобы выяснить это.

gRP C не так важен. Полагаю, это может быть REST, но мой большой вопрос заключается в том, как сделать агрегированные / спроецированные данные доступными для службы API (возможно, потребуется больше API)? Я предполагаю, что мне нужно будет сохранить агрегированные / спроецированные данные с сгенерированными полями uuid и history в базе данных, чтобы иметь возможность извлекать их по uuid из службы API, но для какой базы данных и как выполняется этот процесс хранения, из события catchup обработчик, где я строю агрегат?

1 Ответ

0 голосов
/ 20 марта 2020

Я точно знаю, что ты чувствуешь! Это в основном то, что случилось со мной, когда я впервые попробовал сделать CQRS и ES.

Я думаю, у вас есть пара пробелов в ваших знаниях, которые, я уверен, вы быстро исправите. Вы гидратируете агрегат из потока событий, как вы делаете. То есть ваша совокупность сохранилась. Модель чтения - это нечто другое. Позвольте мне объяснить ...

Ваша модель чтения - это то, что вы используете для выполнения запросов и, например, для предоставления данных для отображения в пользовательском интерфейсе. Ваши агрегаты не (напрямую) не вовлечены в это. На самом деле они должны быть заключены в капсулу. Это означает, что вы не можете «видеть» их состояние со стороны. т.е. нет геттера и сеттера, за исключением агрегатного идентификатора, который будет иметь геттер.

Эта статья дает вам полезный обзор того, как все это сочетается: CQRS + Event Sourcing - шаг за шагом

Идея состоит в том, что когда агрегат изменяется, он может сделать это только через событие, которое оно генерирует. Вы сохраняете это событие в хранилище событий. Это событие также публикуется, так что читаемые модели могут быть обновлены.

Также, глядя на ваш агрегат, он больше похож на типичный объект модели чтения или DTO. Агрегат заинтересован в функциональности, а не в свойствах. Таким образом, вы ожидаете увидеть функции void publi c для выдачи команд агрегату. Но не публичные c свойства, такие как isActive или history.

Надеюсь, это имеет смысл.

EDIT:

Вот еще несколько практических предложений.

«Чтобы следовать CQRS, я построю вышеуказанный агрегат в модели чтения, верно?» Вы не строите агрегаты в модели чтения. Это разные вещи на разных сторонах CQRS-части уравнения. Агрегаты находятся на командной стороне. Запросы выполняются для моделей чтения, которые отличаются от агрегатов.

Агрегаты имеют публичные c пустые функции и не имеют методов получения или установки (за исключением идентификатора агрегата). Они заключены в капсулу. Они генерируют события, когда их состояние изменяется в результате выполнения команды. Эти события хранятся в хранилище событий и используются для восстановления состояния агрегата. Другими словами, именно так хранится агрегат.

События go будут опубликованы, чтобы обработчики событий и другие процессы могли реагировать на них и обновлять модель чтения и / или запускать новые каскадные команды.

"Последний вопрос. Пока я просто использую один поток" foos ", но на этом уровне я ожидаю, что новые события будут происходить довольно часто (каждые пару секунд или около того), но, насколько я понимаю, вы все еще сохраняете это и обновляете его, используя материализованные представления, верно? Я больше обеспокоен сохранением и обновлением, используя материализованные представления. Я не знаю, что вы имеете в виду под этим, но не похоже, что у вас есть правильная идея. Представления должны быть очень простыми для чтения моделей. Не нужно сложных отношений, как вы найдете в RDMS. И поэтому высоко оптимизирован для быстрого чтения.

...