API процессора Kafka Streams: пересылка записи в конкретную потоковую задачу на основе ключа - PullRequest
0 голосов
/ 24 октября 2018

Я использую следующую топологию API процессора:

Source: STopics (topics: [A, B, C])
  --> P1
Processor: P1 (stores: [P1_Store])
  --> CSink
  <-- STopics
Sink: CSink (topic: Result)
  <-- P1

Темы имеют целочисленные ключи, скажем, ID .Каждая тема A , B и C имеет одинаковое количество разделов, скажем N .У меня есть сценарий, в котором записи с одинаковым ключом могут появляться в любой из исходных тем.Предположим, у нас есть N экземпляров, созданных из P1 процессора (или N потоковых задач), каждый из которых имеет локальную копию хранилища состояний P1_Store .

Есть ли способ определить количество экземпляров P1 во время выполнения, чтобы я мог отправлять записи с одним и тем же ключом на один и тот же экземпляр процессора, например, используя ID% N , чтобы использовать предыдущее значение в локальном хранилище состояний для ключа ID ?

Обновление. Я перезаписываю все записи с пустыми ключами, приходящими в тему A, и этот новый идентификатор ключа может появляться в любых темах, здесь A, B или C.

Дополнительный вопрос: Если у меня есть два родительских процессора P00 и P01 одного процессора P1.P00 работает на экземпляре X, изменяет ключ на 123 и вперед, в то время как P01 работает на экземпляре Y и пересылает ключ 123. Как гарантировать, что ключ 123 от P00-X и P01-Y всегда идет к одному и тому же экземпляру P1 (скажем, P1-Z, так что местное государственное хранилище для 123 всегда доступно в Z)?Я не хочу, чтобы P00 и P01 сначала писали в промежуточную тему, а затем P1 читает из этой промежуточной темы.Есть ли альтернатива для разработки единой топологии?

1 Ответ

0 голосов
/ 25 октября 2018

Если ваши входные темы уже разбиты по ключам (что является значением по умолчанию, если идентификатор установлен как ключ сообщения), вам не нужно ничего делать.Kafka Streams назначит разделы задаче так, чтобы разделение было сохранено.

В частности, если у вас есть N разделов на тему, будет N задач, и задача 0 получит разделы A-0, B-0, C-0 и т. Д. (Т. Е. Разделы с одинаковым номером из разных тем объединяются автоматически).Кроме того, количество экземпляров вашего процессора совпадает с количеством задач.Задача X формы процессора будет обрабатывать все записи из разделов AX, BX и CX.

Если идентификатор не является ключевым в ваших темах ввода, вам потребуется перераспределить данные после установки идентификатора в качестве ключачерез дополнительную тему:

// using the DSL
stream.selectKey(...)
      .groupByKey()
      .aggregate(...)

// using Processor API
topology.addSource(...); // read input topics
topology.addProcessor(...); // set ID as key
topology.addSink(...); write to new topic for repartitioning
topology.addSource(...); // read from repartition topic
topology.addProcessor(...); // your processor updating the state
...