Недавно я столкнулся с проблемой в потоковом приложении, с которой я ранее не сталкивался, и было довольно сложно отследить, что связано с вводом ключей / объединениями (и после обновления - разбиением).
У меня есть две темы (raw_events и processing_users), которые имеют одинаковые ключи, но когда я пытаюсь выполнить объединение по двум темам, только некоторые объединений успешны, несмотря на то, чтоТо же самое.
Рабочий процесс (для контекста)
Для краткости основные рабочие процессы приложения (-ей) следующие:
- Данныепоступает в тему
raw_event
через Producer. - Ряд потоковых приложений прослушивает тему
raw_event
и извлекает из нее различные сущности на основе ряда бизнес-правил (например, IP-адреса, пользователи и т. д.). .) - Объекты, идентифицированные из темы
raw_event
, помещаются в темы preprocessing_{type}
, которые содержат метаданные об извлечении и соответствующую информацию, найденную с помощью i. raw_event
(например, для пользователя это могут быть такие как имя учетной записи, адрес электронной почты и т. д.). Элементы в этих темах обозначаются raw_event
. - Еще одна серия потоковых приложений будет прослушивать различные темы
preprocessing_{type}
и оставит их объединенными против серии GlobalKTables, которые представляютвсе известные экземпляры данного объекта final_{type}
. Для успешных объединений экземпляр из final_{type}
будет обогащен новой информацией из тем raw_event/preprocessing_{type}
;неуспешные объединения укажут новую сущность данного типа, которая затем будет введена в ключ и помещена в тему final_{type}
. Все обогащенные экземпляры preprocessing_{type}
вставляются в тему processing_{type}
, которая содержит обогащенный (или новый) экземпляр сущности, а также метаданные, которые ее создали. Самое главное - элементы в теме processed_{type}
имеют ключевое слово raw_event
still. - Наконец, потоковое приложение запускается и пытается обогатить исходный экземпляр из
raw_event
, объединяясь с processing_{type}
, который будет иметь тот же ключ, и обогатить экземпляр raw_event
различной информацией из обогащенного объекта, прежде чем отправлять его в тему final_event
.
Проблема
Сама проблема возникает на шаге 5 выше (обогащение событий), поскольку только некоторые соединения между темой raw_event
и темой processing_users
работают, как и ожидалось.
Использование подмножестваиз 24 записей, прошедших весь конвейер, только 5 из 24 пар в теме успешно присоединились . Те, которые работают, выглядят одинаково непротиворечивыми, но я не вижу в данных ничего, что указывало бы, почему один будет работать, а другой нет:
raw_event keys processing_user keys
mawjuG0B9k3AiALz0_2S 0q0juG0B9k3AiALz8ApP
xEEcv20B9k3AiALzEN0m m60juG0B9k3AiALz5gU5
zqwjuG0B9k3AiALzz_tg ua0juG0B9k3AiALz7wqa
v60juG0B9k3AiALz6Aal xEEcv20B9k3AiALzEN0m
0q0juG0B9k3AiALz8ApP zqwjuG0B9k3AiALzz_tg
RK0juG0B9k3AiALz5QUw zK0juG0B9k3AiALz6Aal
0a0juG0B9k3AiALz6Aal Ta0juG0B9k3AiALz5QUw
8KwjuG0B9k3AiALz1v58 RKwjuG0B9k3AiALz1P7C
c60juG0B9k3AiALz5gU4 -60juG0B9k3AiALz3gGn
RKwjuG0B9k3AiALz1P7C Va0juG0B9k3AiALz5QUw
zK0juG0B9k3AiALz6Aal 560juG0B9k3AiALz3QGh
Ta0juG0B9k3AiALz5QUw mawjuG0B9k3AiALz0_2S
Va0juG0B9k3AiALz5QUw -K0juG0B9k3AiALz3QGh
pK0juG0B9k3AiALz5gU5 zq0juG0B9k3AiALz6Aal
Xa0juG0B9k3AiALz2QCh RK0juG0B9k3AiALz5QUw
560juG0B9k3AiALz3QGh v60juG0B9k3AiALz6Aal
-K0juG0B9k3AiALz3QGh Xa0juG0B9k3AiALz2QCh
-60juG0B9k3AiALz3gGn P60juG0B9k3AiALz5QUw
F60juG0B9k3AiALz3gKn pK0juG0B9k3AiALz5gU5
m60juG0B9k3AiALz5gU5 0a0juG0B9k3AiALz6Aal
zq0juG0B9k3AiALz6Aal 3K0juG0B9k3AiALz3QGh
ua0juG0B9k3AiALz7wqa 8KwjuG0B9k3AiALz1v58
3K0juG0B9k3AiALz3QGh F60juG0B9k3AiALz3gKn
P60juG0B9k3AiALz5QUw c60juG0B9k3AiALz5gU4
Я пробовал комбинацииприсоединение к темам как KStreams, так и KTables (и каждой комбинации, которую я могу придумать), однако из 24 сообщений в этом небольшом подмножестве только ~ 5 объединений являются успешными.
Текущий пример (и небольшое упрощение) текущего кода:
val events = streams.createKTable<RawEvent>("raw_events)
val users = streams.createKStream<ProcessingUser>("processing_users)
val finalEvents = events
.join(users, eventsProcessor::enrichWithUsers)
.to("final_events")
Учитывая, что есть соответствующие пары (1: 1) из тем raw_events
и processing_users
, есть ли объяснение, почему некоторые из объединенийбудет успешным, а другие потерпят неудачу? Только 5 из пар будут последовательно доходить до темы final_events
(всегда одни и те же пары).
Приветствуются любые дополнительные советы!
Конфигурация
Дляради подробностей, вот несколько вещей, которые стоит отметить в отношении настройки:
- Использование Kafka Streams 2.3.0
- Кэширование и ведение журнала включены / отключены соответственно для всех применимыхМатериализованные вызовы
- Оптимизация топологии включена
- Буферизация кэша установлена на 0
Обновление
Проблема в двух словах после нескольких часовобдумывая и копаясь в данных, похоже, связано с разделением.
Пять объединений, которые последовательно выполнялись, похоже, делают это только потому, что ключи расположены в одних и тех же разделах для каждой темы:
successful events raw_events partition processing_users partition
RK0juG0B9k3AiALz5QUw 3 3
m60juG0B9k3AiALz5gU5 7 7
ua0juG0B9k3AiALz7wqa 7 7
8KwjuG0B9k3AiALz1v58 8 8
RKwjuG0B9k3AiALz1P7C 9 9
Несмотря на все ключиПрисутствующие в обеих темах, они, кажется, не разбиты на разделы, используя одну и ту же стратегию (т.е. обе темы содержат все сообщения с одинаковыми ключами, но некоторые могут появляться в одном разделе в raw_events
, нодругой раздел в processing_users
), как показано в этом представлении раздела / счетчика ниже:
Стоит подчеркнуть, что сообщения, которые появляются внутри raw_events
тема создается вне потока операций приложений потоков, который был описан выше, что наводит меня на мысль, что на эти вопросы нужно будет ответить:
- Возможно разрешить бремястратегия разбиения, чтобы попасть исключительно в точку входа в рабочий поток потоков, предполагая, что это приводит к нормализованному распределению по разделам? (например, если данный ключ находится в разделе 7
raw_events
и вы отправляете запись с тем же ключом в preprocessing_users
, он попадет в раздел 7? - Если это так,Это разумная стратегия? Или есть способ реализовать это поведение без написания пользовательского разделителя, который используется всеми производителями и потоковыми приложениями?
- Если нет, можно ли принятьсуществующая тема (в данном случае
raw_event
и, в основном, перераспределение всей темы, чтобы использовалась стратегия разбиения по умолчанию?