Одинаковые ключевые темы, которые не могут присоединиться к потокам Kafka - PullRequest
1 голос
/ 26 октября 2019

Недавно я столкнулся с проблемой в потоковом приложении, с которой я ранее не сталкивался, и было довольно сложно отследить, что связано с вводом ключей / объединениями (и после обновления - разбиением).

У меня есть две темы (raw_events и processing_users), которые имеют одинаковые ключи, но когда я пытаюсь выполнить объединение по двум темам, только некоторые объединений успешны, несмотря на то, чтоТо же самое.

Рабочий процесс (для контекста)

Для краткости основные рабочие процессы приложения (-ей) следующие:

  1. Данныепоступает в тему raw_event через Producer.
  2. Ряд потоковых приложений прослушивает тему raw_event и извлекает из нее различные сущности на основе ряда бизнес-правил (например, IP-адреса, пользователи и т. д.). .)
  3. Объекты, идентифицированные из темы raw_event, помещаются в темы preprocessing_{type}, которые содержат метаданные об извлечении и соответствующую информацию, найденную с помощью i. raw_event (например, для пользователя это могут быть такие как имя учетной записи, адрес электронной почты и т. д.). Элементы в этих темах обозначаются raw_event.
  4. Еще одна серия потоковых приложений будет прослушивать различные темы preprocessing_{type} и оставит их объединенными против серии GlobalKTables, которые представляютвсе известные экземпляры данного объекта final_{type}. Для успешных объединений экземпляр из final_{type} будет обогащен новой информацией из тем raw_event/preprocessing_{type};неуспешные объединения укажут новую сущность данного типа, которая затем будет введена в ключ и помещена в тему final_{type}. Все обогащенные экземпляры preprocessing_{type} вставляются в тему processing_{type}, которая содержит обогащенный (или новый) экземпляр сущности, а также метаданные, которые ее создали. Самое главное - элементы в теме processed_{type} имеют ключевое слово raw_event still.
  5. Наконец, потоковое приложение запускается и пытается обогатить исходный экземпляр из raw_event, объединяясь с processing_{type}, который будет иметь тот же ключ, и обогатить экземпляр raw_event различной информацией из обогащенного объекта, прежде чем отправлять его в тему final_event.

Проблема

Сама проблема возникает на шаге 5 выше (обогащение событий), поскольку только некоторые соединения между темой raw_event и темой processing_users работают, как и ожидалось.

Использование подмножестваиз 24 записей, прошедших весь конвейер, только 5 из 24 пар в теме успешно присоединились . Те, которые работают, выглядят одинаково непротиворечивыми, но я не вижу в данных ничего, что указывало бы, почему один будет работать, а другой нет:

raw_event keys          processing_user keys
mawjuG0B9k3AiALz0_2S    0q0juG0B9k3AiALz8ApP 
xEEcv20B9k3AiALzEN0m    m60juG0B9k3AiALz5gU5 
zqwjuG0B9k3AiALzz_tg    ua0juG0B9k3AiALz7wqa 
v60juG0B9k3AiALz6Aal    xEEcv20B9k3AiALzEN0m 
0q0juG0B9k3AiALz8ApP    zqwjuG0B9k3AiALzz_tg 
RK0juG0B9k3AiALz5QUw    zK0juG0B9k3AiALz6Aal
0a0juG0B9k3AiALz6Aal    Ta0juG0B9k3AiALz5QUw 
8KwjuG0B9k3AiALz1v58    RKwjuG0B9k3AiALz1P7C 
c60juG0B9k3AiALz5gU4    -60juG0B9k3AiALz3gGn 
RKwjuG0B9k3AiALz1P7C    Va0juG0B9k3AiALz5QUw 
zK0juG0B9k3AiALz6Aal    560juG0B9k3AiALz3QGh 
Ta0juG0B9k3AiALz5QUw    mawjuG0B9k3AiALz0_2S 
Va0juG0B9k3AiALz5QUw    -K0juG0B9k3AiALz3QGh 
pK0juG0B9k3AiALz5gU5    zq0juG0B9k3AiALz6Aal 
Xa0juG0B9k3AiALz2QCh    RK0juG0B9k3AiALz5QUw 
560juG0B9k3AiALz3QGh    v60juG0B9k3AiALz6Aal 
-K0juG0B9k3AiALz3QGh    Xa0juG0B9k3AiALz2QCh 
-60juG0B9k3AiALz3gGn    P60juG0B9k3AiALz5QUw 
F60juG0B9k3AiALz3gKn    pK0juG0B9k3AiALz5gU5 
m60juG0B9k3AiALz5gU5    0a0juG0B9k3AiALz6Aal 
zq0juG0B9k3AiALz6Aal    3K0juG0B9k3AiALz3QGh 
ua0juG0B9k3AiALz7wqa    8KwjuG0B9k3AiALz1v58 
3K0juG0B9k3AiALz3QGh    F60juG0B9k3AiALz3gKn 
P60juG0B9k3AiALz5QUw    c60juG0B9k3AiALz5gU4 

Я пробовал комбинацииприсоединение к темам как KStreams, так и KTables (и каждой комбинации, которую я могу придумать), однако из 24 сообщений в этом небольшом подмножестве только ~ 5 объединений являются успешными.

Текущий пример (и небольшое упрощение) текущего кода:

val events = streams.createKTable<RawEvent>("raw_events)
val users = streams.createKStream<ProcessingUser>("processing_users)

val finalEvents = events
    .join(users, eventsProcessor::enrichWithUsers)
    .to("final_events")

Учитывая, что есть соответствующие пары (1: 1) из тем raw_events и processing_users, есть ли объяснение, почему некоторые из объединенийбудет успешным, а другие потерпят неудачу? Только 5 из пар будут последовательно доходить до темы final_events (всегда одни и те же пары).

Приветствуются любые дополнительные советы!

Конфигурация

Дляради подробностей, вот несколько вещей, которые стоит отметить в отношении настройки:

  • Использование Kafka Streams 2.3.0
  • Кэширование и ведение журнала включены / отключены соответственно для всех применимыхМатериализованные вызовы
  • Оптимизация топологии включена
  • Буферизация кэша установлена ​​на 0

Обновление

Проблема в двух словах после нескольких часовобдумывая и копаясь в данных, похоже, связано с разделением.

Пять объединений, которые последовательно выполнялись, похоже, делают это только потому, что ключи расположены в одних и тех же разделах для каждой темы:

successful events       raw_events partition  processing_users partition
RK0juG0B9k3AiALz5QUw    3                     3
m60juG0B9k3AiALz5gU5    7                     7
ua0juG0B9k3AiALz7wqa    7                     7
8KwjuG0B9k3AiALz1v58    8                     8
RKwjuG0B9k3AiALz1P7C    9                     9

Несмотря на все ключиПрисутствующие в обеих темах, они, кажется, не разбиты на разделы, используя одну и ту же стратегию (т.е. обе темы содержат все сообщения с одинаковыми ключами, но некоторые могут появляться в одном разделе в raw_events, нодругой раздел в processing_users), как показано в этом представлении раздела / счетчика ниже:

enter image description here

Стоит подчеркнуть, что сообщения, которые появляются внутри raw_events тема создается вне потока операций приложений потоков, который был описан выше, что наводит меня на мысль, что на эти вопросы нужно будет ответить:

  • Возможно разрешить бремястратегия разбиения, чтобы попасть исключительно в точку входа в рабочий поток потоков, предполагая, что это приводит к нормализованному распределению по разделам? (например, если данный ключ находится в разделе 7 raw_events и вы отправляете запись с тем же ключом в preprocessing_users, он попадет в раздел 7?
  • Если это так,Это разумная стратегия? Или есть способ реализовать это поведение без написания пользовательского разделителя, который используется всеми производителями и потоковыми приложениями?
  • Если нет, можно ли принятьсуществующая тема (в данном случае raw_event и, в основном, перераспределение всей темы, чтобы использовалась стратегия разбиения по умолчанию?

1 Ответ

0 голосов
/ 28 октября 2019

Как подробно описано в обновлениях к исходному сообщению, сама проблема была вызвана несоответствием стратегии разделения приложению .NET Producer, которое по умолчанию использует стратегию разделения consistent_random, а не приложения потоков Java по умолчанию. которые используют стратегию murmur2random.

Существует несколько вариантов решения этой проблемы, но в данном конкретном случае самым простым подходом было настроить производителя на использование соответствующей стратегии:

// Set the default partitioning strategy 
ProducerConfig.Partitioner = Partitioner.Murmur2Random;

Другой подход может состоять в том, чтобы написать класс CustomPartitioner, который бы реализовывал вашу предпочтительную стратегию секционирования для имитации ваших производителей.

...