Я использую следующую топологию API процессора:
Source: STopics (topics: [A, B, C])
--> P1
Processor: P1 (stores: [P1_Store])
--> CSink
<-- STopics
Sink: CSink (topic: Result)
<-- P1
Темы имеют целочисленные ключи, скажем, ID .Каждая тема A , B и C имеет одинаковое количество разделов, скажем N .У меня есть сценарий, в котором записи с одинаковым ключом могут появляться в любой из исходных тем.Предположим, у нас есть N экземпляров, созданных из P1 процессора (или N потоковых задач), каждый из которых имеет локальную копию хранилища состояний P1_Store .
Есть ли способ определить количество экземпляров P1 во время выполнения, чтобы я мог отправлять записи с одним и тем же ключом на один и тот же экземпляр процессора, например, используя ID% N , чтобы использовать предыдущее значение в локальном хранилище состояний для ключа ID ?
Обновление. Я перезаписываю все записи с пустыми ключами, приходящими в тему A, и этот новый идентификатор ключа может появляться в любых темах, здесь A, B или C.
Дополнительный вопрос: Если у меня есть два родительских процессора P00 и P01 одного процессора P1.P00 работает на экземпляре X, изменяет ключ на 123 и вперед, в то время как P01 работает на экземпляре Y и пересылает ключ 123. Как гарантировать, что ключ 123 от P00-X и P01-Y всегда идет к одному и тому же экземпляру P1 (скажем, P1-Z, так что местное государственное хранилище для 123 всегда доступно в Z)?Я не хочу, чтобы P00 и P01 сначала писали в промежуточную тему, а затем P1 читает из этой промежуточной темы.Есть ли альтернатива для разработки единой топологии?