Как проверить, существует ли ключ в теме Кафки? - PullRequest
0 голосов
/ 21 мая 2018

Мне нужна функция, скажем, checkKey (), которая должна работать следующим образом:

def checkKey(key):
    if(key in topic-name):
        return True
    return False

Я не смог найти это в документации Кафки.Мне известно, что для дедупликации данных Kafka может обновить ключ.Однако я не хочу обновление, я хочу знать, существует ли оно уже или нет.И если он существует, я хочу обновить его значение следующим образом:

def updateValue(key):
    if(checkKey(key)):
        value of key in topic-name += 1

Это мне нужно выполнить с помощью Python, поэтому пример кода того же будет очень полезен.

Ответы [ 2 ]

0 голосов
/ 22 мая 2018

Для этого вы можете использовать Потоки Кафки .Просто определите KTable для вашей темы, используя StreamBuilder. table , предоставив имя для хранилища состояний, используя Materialized.as("store-name"), а затем вы можете запросить его, используя Interactive Queries , см. Эту страницу для других примеров.но это так же просто, как streams.store("store-name", QueryableStoreTypes.keyValueStore()).get(key).

0 голосов
/ 22 мая 2018

Кафка это не таблица, это очередь.Чтобы увидеть, существует ли ключ в теме, вам нужно прочитать всю тему или, если это вообще возможно, сохранить локальную копию темы.Возможно, вам удастся ограничить поиск определенным разделом, если вы знаете логику разделения.

При этом Confluent имеет потоковый SQL-движок под названием KSQL, который может помочь вам.Вы можете посмотреть это здесь

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...