Я пытаюсь использовать addGlobalStore в DSL Kafka, где необходимо хранить несколько значений, которые мне понадобятся для общего доступа ко всем моим потокам / экземплярам.
Моя проблема заключается в том, что мне нужно периодически обновлять эти значения в моей топологии и информировать все работающие потоки о новых значениях.
Я инициализировал глобальное хранилище через builder.addGlobalStore
и использовал функцию init()
процессора, который использовался в качестве последнего аргумента этой функции, но я не могу найти способ обновить значения внутри глобального хранилища.
Следующим шагом в моей топологии является Transformer, где я могу подключиться через `` `init ()` `в глобальном хранилище и прочитать сохраненные значения, но, к сожалению, я не могу обновить их глобально. Я имею в виду, я могу обновить
локальная копия для запущенного потока, но другие потоки / экземпляры не видят изменения.
Я где-то читал, что это невозможно сделать на Transformer, но даже я использую процессор вместо этого, проблема остается
Итак, есть ли способ обновить globalStateStore в топологии Kafka DSL,
и если да, то как это возможно? Или для использования глобального хранилища мне нужно использовать низкоуровневый API процессора?