Как получить доступ ко всем данным хранилища состояний в методе Kstream DSL .transform () - PullRequest
2 голосов
/ 01 апреля 2019

Пытаясь получить доступ ко всем значениям ключа в определенном хранилище состояний, но в методе .transform () я могу получить доступ только с одним ключом (который является исходным ключом)

KeyValueStore<String, String> SS=context.getStateStore("macs");

SS не можетполучить все значения ключей в Statestore

SS.get("key1");
SS.get("key2");
SS.get("key3");
SS.get("key4");

только 1 из 4 возвращает значения rest all возвращает null

1 Ответ

1 голос
/ 02 апреля 2019

SS не может получить все значения ключей в Statestore

Это ожидаемое поведение. Данные в «логическом» хранилище состояний в Kafka Streams фактически распределяются (обрабатываются) между фактическими экземплярами хранилища состояний между запущенными экземплярами вашего распределенного приложения Kafka Streams (даже если вы запускаете только 1 экземпляр приложения, например 1 контейнер Docker). для вашего приложения). Позвольте мне объяснить ниже.

Упрощенный пример, иллюстрирующий природу разделенных хранилищ состояний: если ваше приложение читает из входной темы с 5 разделами, то при обработке топологии этого приложения будет использоваться 5 потоковых задач, и каждая потоковая задача получит один раздел из «логическое» хранилище состояний (см. Архитектура Kafka Streams ). Если вы запускаете только 1 экземпляр приложения (например, 1 контейнер Docker) для своего приложения, то этот единственный экземпляр будет выполнять все 5 потоковых задач, но эти потоковые задачи являются настройкой без совместного использования ресурсов - и это означает, что данные по-прежнему разделены , Это также относится к KTable s в потоках Кафки, которые также разделены таким образом.

См. Также: Является ли Kafka Stream StateStore глобальным для всех экземпляров или только локальным?

Ваш приведенный выше пример будет работать только в особом случае, когда входная тема имеет только 1 раздел, потому что тогда есть только 1 потоковая задача и, следовательно, только 1 хранилище состояний (которое будет иметь доступ ко всем доступным ключам во входных данных ).

Попытка получить доступ ко всем значениям ключа в определенном хранилище состояний [...]

Теперь, если вы хотите иметь доступ ко всем доступным ключам во входных данных, у вас есть два варианта (если вы не хотите идти по маршруту особого случая входной темы только с 1 разделом):

  • Вариант 1: Использовать глобальные хранилища состояний (или GlobalKTable) вместо обычных, многораздельных хранилищ состояний. Глобальные хранилища состояний могут быть определены / созданы с помощью StreamsBuilder#addGlobalStore(...), но IIRC вам не нужно явно добавлять («прикреплять») глобальные хранилища к процессорам, что вы должны сделать для обычных хранилищ состояний. Вместо этого любые процессоры могут автоматически получать доступ к глобальным хранилищам.
  • Вариант 2: Использование функции интерактивных запросов (состояние запроса) в потоках Kafka.

Обратите внимание, что в обоих вариантах вы можете получить доступ к данным в хранилищах состояний только для чтения . Вы не можете писать напрямую в государственные магазины в этих двух ситуациях. Если вам нужно изменить данные, то вы должны обновить их косвенно через разделы ввода, которые используются для заполнения хранилищ.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...