Apache Kafka потребляет события из разных тем в определенном порядке - PullRequest
0 голосов
/ 23 сентября 2019

Допустим, у меня есть topicA, topicB и topicC, обе темы разделены по отдельным типам событий на основе сущностей домена.topicA работает только с eventA, topicB сохраняет eventB, topicC работает только с eventC.Все события связаны друг с другом бизнес-доменом, но производятся отдельными микросервисами и должны обрабатываться в определенном порядке.

Вопрос в том, как с помощью Apache Kafka представить события-потребители в определенном порядке, eventA, а затем ожидать получения событияBзатем, когда полученное событие C потребляет их все.

Цените любые отзывы, приветствуются любые вопросы.

Некоторые примечания: Kafka Streams - это хороший подход, но он ограничен политикой компании.

Кроме того, я просмотрел Шаблон соединения , но не нашел надежных подходов для реализации.

Ответы [ 3 ]

2 голосов
/ 24 сентября 2019

Если события связаны друг с другом, то они должны перейти к одной теме.Таким образом, микросервис-1 должен выдавать eventA с (ключ, значение) и меткой (eventA).Таким же образом microservice-2 и microservice-3 должны подтолкнуть данные к общей теме.

Это поможет вам на стороне потребителя.

1 голос
/ 25 сентября 2019

Вероятно, есть много подходов для решения проблемы.Вот пара, которую я могу предложить:

  • Введите идентификатор корреляции , который будет связывать события из тем A, B и C. Затемиспользуйте идентификатор корреляции следующим образом:

    1. Службы A, B и C генерируют события для соответствующих тем, но связанные события имеют такой же идентификатор корреляции

    2. Служба D использует события из отдельных тем.Каждый раз, когда он получает событие из любой темы, служба D либо вставляет данные события в базу данных по идентификатору корреляции, либо выполняет какое-либо действие, если все данные получены.

    Например, когда служба D получает событие C, она сначала выдает запрос, чтобы проверить, есть ли запись в базе данных с идентификатором корреляции из события C:

    • , еслизапись отсутствует, затем сохраняется входящее событие C,
    • , если какая-либо запись уже существует, тогда служба D проверяет, является ли событие C последним, необходимым для использования всех данных, и выполняет либо последнее действие, либо вставляет событие Cв базу данных.

    И так далее для каждого потребляемого события.

  • Цепные сервисы, которые генерируют события (A, Bи в).Например, цепочка может быть сформирована следующим образом:

    1. Служба A создает событие для темы A

    2. Служба B использует событие из темы A,и создает событие для темы B (возможно, агрегирует события A и B)

    3. Служба C использует событие из темы B и создает событие для темы C (возможно, агрегирует события A, B иC)

    4. Наконец, служба D получает событие из темы C (возможно, объединено с A, B и C) и выполняет требуемое действие.

    Вариация этого подхода (без агрегирования событий на каждом промежуточном этапе) заключалась бы в цепочке сервисов и прослушивании последнего события в цепочке.Когда используется последнее событие, выполните команду Kafka pull для соответствующих тем, чтобы получить события, созданные другими службами.

1 голос
/ 24 сентября 2019

Поскольку вы спрашиваете об упорядочении потребления сообщений между различными темами, тогда первым вариантом было бы, чтобы один потребитель выдал сообщение, накормив следующего потребителя (эти потребители могут или не могут быть частью одного и того же процесса).):

потребитель обрабатывает сообщение -> потребитель А помещает новое сообщение в другую тему -> потребитель Б получает это новое сообщение и обрабатывает -> потребитель Б помещает новое сообщение во вторую тему -> и т. Д ... и т. Д.

Я не удивлюсь, если потоки по сути делают это или похожий процесс под капотом.Вместо этого можно использовать любой другой тип интерфейса для межпроцессного взаимодействия: RDP, отображенные в память файлы, мьютекс, каналы;сделайте ваш выбор.

Если не в крайнем случае, я бы постарался не ставить разные события на одну и ту же тему.Когда вы помещаете несколько событий в одну очередь / тему, вы ограничиваете своих потребителей несколькими способами:

  1. Ваши контракты теперь тесно связаны для обоих событий.Чтобы изменить форму только одного из событий в этой отдельной теме, ваши потребители должны динамически десериализовать эти события на основе метаданных (магическое число, ключ-значение и т. Д.)
  2. Ваши шаблоны потребления могут быть меньшеэффективный.Что, если я просто заинтересован в одном из этих событий?Я должен прочитать событие, а затем выбросить его, если оно не то, что я ищу.

Реальный пример этого - парки развлечений.Допустим, у вас есть два типа посетителей парка развлечений: Fast-Pass и стандартные клиенты.Ваши бизнес-правила гласят, что клиенты Fast-Pass могут опередить обычных клиентов.

Если вы объедините их в одну очередь / тему, как вы это сделаете?Ответ - приоритетная очередь;Вы спрашиваете всех, кто присоединяется к очереди, если они быстро проходят, что склонно к ошибкам и неэффективно (это очередность с приоритетом; это может сработать, но это может быть не лучшим решением).Большинство парков развлечений решили эту проблему, создав две отдельные очереди (по одной для каждого типа клиента [событие / сообщение]).Теперь они могут распределять клиентов по двум отдельным операторам (один FastPass, один стандартный), или они могут иметь одного участника, выполняющего обе очереди, сначала освобождая очередь быстрого прохода.

В конце дня это зависит отВаши ограничения: это 10 сообщений в день или 1 миллиард, требуется ли вам немедленная согласованность или возможная согласованность, на устройстве IoT?

...