Процессоры Kafka Streams - хранилище состояний и разделы входных тем - PullRequest
0 голосов
/ 10 октября 2018

Я хотел бы полностью понять правила, которым должны следовать процессоры kafka-streams в отношении разделения входных данных процессора и его состояния (состояний).В частности, я хотел бы понять:

  1. Возможно ли это и каковы возможные последствия использования ключа для хранилища состояний, который не совпадает с ключом вводаtopic
  2. Совместно ли используются ключи хранилища состояний между разделами, т.е. получу ли я одно и то же значение, если попытаюсь получить доступ к одному и тому же ключу в процессоре, пока он обрабатывает записи, принадлежащие двум разным разделам

Я провел некоторое исследование по этому вопросу, и ответы, которые я нашел, кажутся не очень ясными и иногда противоречивыми: например, этот , кажется, предполагает, что магазины абсолютно независимы иВы можете использовать любую клавишу, в то время как этот говорит, что вы никогда не должны использовать магазин с ключом, отличным от того, который указан в теме ввода.

Спасибо за любые разъяснения.

1 Ответ

0 голосов
/ 10 октября 2018

Вы должны различать входные разделы и хранить разделы / разделы журнала изменений для полной картины.Кроме того, это зависит от того, используете ли вы DSL или Processor API, потому что DSL выполняет автоматическое перераспределение, а Processor API - нет.Поскольку DSL компилируется до API процессора, я начну с этого.

Если у вас есть тема с, скажем, 4 разделами, и вы создаете процессор с состоянием, который использует эту тему, вы получите 4 задачи,каждая задача работает с экземпляром процессора, который поддерживает один осколок хранилища.Обратите внимание, что общее состояние делится на 4 сегмента, и каждый фрагмент в основном изолирован от других сегментов.

С точки зрения среды выполнения API процессора, разделы входной темы и сегменты хранилища состояний (включая их соответствующиеразделы журнала изменений) - это единица параллелизма.Следовательно, темой журнала изменений для хранилища является создание с 4 разделами, а changelog-topic-partition-X сопоставляется с input-topic-partition-X.Обратите внимание, что Kafka Streams не использует разбиение на основе хеша при записи в тему журнала изменений, но явно указывает номер раздела, чтобы гарантировать, что «экземпляр процессора X», который обрабатывает input-topic-partition-X, только чтение / запись из / в changelog-topic-partition-X.

Таким образом, среда выполнения не зависит от ключей, если вы хотите.

Если входная тема не разделена ключами,Сообщения с одним и тем же ключом будут обрабатываться другой задачей.В зависимости от программы это может быть нормально (например, фильтрация) или нет (например, количество на ключ).

Аналогично состоянию: вы можете поместить любой ключ в хранилище состояний, но этот ключ "местный "до соответствующего осколка.Другие задачи, никогда не увидим этот ключ.Таким образом, если вы используете один и тот же ключ в хранилище для разных задач, они будут полностью независимы друг от друга (как если бы они были двумя ключами).

Используя Processor API, вы несете ответственность за разбиение входных данныхправильно и правильно использовать хранилища, в зависимости от необходимой вам семантики оператора.

На уровне DSL Kafka Streams обеспечит правильное разбиение данных для обеспечения правильной семантики оператора.Во-первых, предполагается, что входные темы разбиты по ключам.Если ключ модифицируется, например, с помощью selectKey(), а нижестоящий оператор является агрегатором, Kafka Streams сначала перераспределяет данные, чтобы гарантировать, что записи с одним и тем же ключом находятся в одном и том же тематическом разделе.Это гарантирует, что каждый ключ будет использоваться в одном осколке магазина.Таким образом, DSL всегда будет разделять данные таким образом, чтобы один ключ никогда не обрабатывался на разных шардах.

...