Написать в GlobalStateStore на Кафки Потоки - PullRequest
0 голосов
/ 21 мая 2019

Я пытаюсь использовать addGlobalStore в DSL Kafka, где необходимо хранить несколько значений, которые мне понадобятся для общего доступа ко всем моим потокам / экземплярам.

Моя проблема заключается в том, что мне нужно периодически обновлять эти значения в моей топологии и информировать все работающие потоки о новых значениях.

Я инициализировал глобальное хранилище через builder.addGlobalStore и использовал функцию init() процессора, который использовался в качестве последнего аргумента этой функции, но я не могу найти способ обновить значения внутри глобального хранилища.

Следующим шагом в моей топологии является Transformer, где я могу подключиться через `` `init ()` `в глобальном хранилище и прочитать сохраненные значения, но, к сожалению, я не могу обновить их глобально. Я имею в виду, я могу обновить локальная копия для запущенного потока, но другие потоки / экземпляры не видят изменения.

Я где-то читал, что это невозможно сделать на Transformer, но даже я использую процессор вместо этого, проблема остается

Итак, есть ли способ обновить globalStateStore в топологии Kafka DSL, и если да, то как это возможно? Или для использования глобального хранилища мне нужно использовать низкоуровневый API процессора?

Ответы [ 2 ]

1 голос
/ 22 мая 2019

Я инициализировал глобальное хранилище через builder.addGlobalStore и использовал функцию init () процессора, который использовался в качестве последнего аргумента этой функции, но я не могу найти способ обновить значения в глобальном хранилище.

Вы не можете напрямую обновить глобальное хранилище.Вместо этого вы должны обновить (= написать сообщение) базовую тему этого глобального хранилища.

0 голосов
/ 21 мая 2019

В случае, если это соответствует вашим потребностям, вы, вероятно, могли бы использовать GlobalKTable вместо GlobalStore

...