Источник событий - почему выделенный магазин событий? - PullRequest
0 голосов
/ 24 июня 2019

Я пытаюсь реализовать источник событий / CQRS / DDD впервые, в основном для целей обучения, где есть идея хранилища событий и очереди сообщений, таких как Apache Kafka, и у вас есть события, поступающие из хранилища событий=> Kafka Connect JDBC / Debezium CDC => Kafka.

Мне интересно, почему необходимо отдельное хранилище событий, когда оно звучит так, как будто его назначение может быть выполнено самой Kafka с его основными функциями и сжатием журнала илинастройка хранения журнала для постоянного хранения.Должен ли я хранить свои события в специальном магазине, таком как СУБД, для подачи в Кафку, или я должен направлять их прямо в Кафку?

enter image description here

Ответы [ 3 ]

2 голосов
/ 24 июня 2019

Большая часть литературы по и исходит от сообщества [доменного дизайна];в своей самой ранней форме CQRS назывался DDDD ... распределенный проект, управляемый доменом.

Одним из распространенных шаблонов в проектировании, управляемом доменом, является наличие домена модель обеспечивающая целостность данных в вашем долговременном хранилище, то есть гарантирующая отсутствие внутренних противоречий ...

Мне интересно, почему должно быть отдельное событиехранить, когда это звучит так, как будто его назначение может быть выполнено самой Kafka с его основными функциями и сжатием журнала или настройкой хранения журнала для постоянного хранения.

Так что, если нам нужен поток событий без внутренних противоречий, какмы достигаем этого?Один из способов - убедиться, что только один процесс имеет разрешение на изменение потока.К сожалению, это приводит к единственной точке отказа - процесс умирает, и все заканчивается.

С другой стороны, если у вас есть несколько процессов, обновляющих один и тот же поток, вы рискуетеодновременные записи, гонки данных и противоречия, возникающие из-за того, что один автор еще не видел, что сделал другой.

С помощью СУБД или хранилища событий мы можем решить эту проблему с помощью транзакций или сравнить и поменять местами семантика;и попытка расширить поток новыми событиями отклоняется, если произошла одновременная модификация.

Кроме того, из-за его наследства DDD, долговечное хранилище обычно делится на множество очень мелкозернистыхперегородки (он же "агрегаты"). Одна корзина покупок может иметь разумно выделенные четыре потока.

Если у Кафки нет таких возможностей, то это будет паршивая замена хранилищу событий, KAFKA-2260 был открыт уже более четырех лет, поэтому нам, похоже, не хватает первого.Из того, что я смог различить в литературе о Kakfa, он также не доволен мелкозернистыми потоками (хотя с тех пор, как я проверил, прошло немало времени, возможно, все изменилось).

См. Также: Джеспер Хаммарбек пишет об этом 18 месяцев назад и приходит к выводам, аналогичным изложенным здесь.

0 голосов
/ 17 июля 2019

существует идея хранилища событий и очереди сообщений, таких как Apache Kafka, и у вас есть события, поступающие из хранилища событий => Kafka Connect JDBC / Debezium CDC => Kafka

В сущности источников событий со вкусом DDD нет места для очередей сообщений как таковых.Одним из тактических шаблонов DDD является совокупный шаблон, который служит транзакционной границей.DDD не волнует, как сохраняется совокупное состояние, и обычно люди используют постоянство на основе состояния с реляционными базами данных или базами данных документов.При применении основанного на событиях постоянства нам нужно сохранять новые события как одну транзакцию в хранилище событий таким образом, чтобы мы могли получить эти события позже, чтобы восстановить агрегатное состояние.Таким образом, для поддержки поиска событий в стиле DDD хранилище должно иметь возможность индексировать события по совокупному идентификатору, и мы обычно ссылаемся на концепцию потока событий, где такой поток однозначно идентифицируется по совокупному идентификатору и где всесобытия хранятся по порядку, поэтому поток представляет собой единый агрегат.

Поскольку мы редко можем жить с базой данных, которая позволяет нам извлекать только одну сущность по ее идентификатору, нам нужно место, где мы можемспроектировать эти события в, чтобы мы могли иметь запрашиваемый магазинЭто то, что показано на диаграмме справа, в виде материализованных представлений.Чаще это называется read read , а модели там называются read-models .В таком магазине не нужно хранить снимки агрегатов.Напротив, модели чтения служат для представления состояния системы способом, который может напрямую использоваться UI / API, и часто он не совпадает с моделью предметной области как таковой.

Как уже упоминалосьв одном из ответов здесь типичный поток обработчика команд:

  1. Загрузка одного состояния агрегата по идентификатору путем чтения всех событий для этого агрегата.Уже требуется, чтобы хранилище событий поддерживало такую ​​нагрузку, которую Кафка не может сделать.
  2. Вызовите модель домена (метод совокупного корня), чтобы выполнить какое-либо действие.
  3. Сохраните новые события вагрегатный поток, все или ничего.

Если вы сейчас начнете записывать события в хранилище и публиковать их где-то еще, вы получите проблему двухфазного принятия, которую трудно решить.Поэтому мы обычно предпочитаем использовать такие продукты, как EventStore , в котором есть возможность создать подписку на догонялки для всех записанных событий.Кафка это тоже поддерживает.Также полезно иметь возможность создавать новые индексы событий в магазине, связывая их с существующими событиями, особенно если у вас несколько систем, использующих один магазин.В EventStore это можно сделать с помощью внутренних проекций, вы также можете сделать это с потоками Кафки.

Я бы сказал, что вам действительно не нужна система обмена сообщениями между сторонами записи и чтения.Сторона записи должна позволять вам подписываться на канал событий, начиная с любой позиции в журнале событий, чтобы вы могли создавать свои модели чтения.

Однако Kafka работает только в системах, которые не используютагрегированный шаблон, потому что важно иметь возможность использовать события, а не снимок, как источник правды, хотя это, конечно, обсуждается.Я хотел бы взглянуть на возможность изменить способ, которым события изменяют состояние объекта (например, исправление ошибки), и когда вы используете события для восстановления состояния объекта, все будет в порядке, снимки останутся прежними, и выМне понадобится применить события исправления, чтобы исправить все снимки.

Лично я предпочитаю не быть тесно связанным с какой-либо инфраструктурой в моей модели домена.На самом деле, мои доменные модели не имеют никакой зависимости от инфраструктуры.Принося логику моментальных снимков в конструктор потоков Kafka, я был бы немедленно связан, и, с моей точки зрения, это не лучшее решение.

0 голосов
/ 26 июня 2019

Kafka можно использовать в качестве хранилища событий DDD, но при этом возникают некоторые сложности из-за отсутствующих функций.

Две ключевые функции, которые люди используют при получении событий от агрегатов:

  1. Загрузка агрегата путем чтения событий для просто , которые агрегируют
  2. При одновременной записи новых событий для агрегата убедитесь, что только один пишущий преуспевает, чтобы не повредить агрегат и не нарушить его инварианты.

Кафка в настоящее время не может выполнить ни одно из этих действий, поскольку 1 завершается ошибкой, поскольку обычно требуется один поток на агрегат тип (он не масштабируется до одного потока на агрегати это не обязательно было бы желательно в любом случае), поэтому нет способа загрузить только события для одного агрегата, и 2 завершается ошибкой, поскольку https://issues.apache.org/jira/browse/KAFKA-2260 не был реализован.

Так что вы должнынапишите систему так, чтобы возможности 1 и 2 не были нужны.Это можно сделать следующим образом:

  1. Вместо непосредственного вызова обработчиков команд, запишите их в потоки.Иметь поток команд для каждого типа агрегата, защищенный идентификатором агрегата (они не требуют постоянного хранения).Это гарантирует, что вы когда-либо обрабатываете только одну команду для определенного агрегата за один раз.
  2. Напишите код моментального снимка для всех типов агрегатов
  3. При обработке командного сообщения выполните следующие действия:
    1. Загрузка совокупного снимка
    2. Проверка команды по нему
    3. Запись новых событий (или возврат ошибки)
    4. Применение событий к совокупности
    5. Сохранение нового совокупного снимка, включая текущее смещение потока для потока событий
    6. Возвращение успеха клиенту (возможно, с помощью ответного сообщения)

Единственная другая проблема - обработка сбоев (например, сбой моментального снимка).Это может быть обработано во время запуска определенного раздела обработки команд - ему просто нужно воспроизвести любые события с момента успешного выполнения последнего снимка и обновить соответствующие снимки перед возобновлением обработки команды.

Кажется, что Kafka Streams обладает возможностями длясделать это очень просто - у вас есть KStream команд, которые вы преобразуете в KTable (содержащее снимки, снабженные агрегированным идентификатором) и KStream событий (и, возможно, другой поток, содержащий ответы).Kafka позволяет всем этим работать транзакционно, поэтому нет риска сбоя при обновлении снимка.Он также будет обрабатывать перенос разделов на новые серверы и т. Д. (Автоматическая загрузка снимка KTable в локальную RocksDB, когда это произойдет).

...