SS не может получить все значения ключей в Statestore
Это ожидаемое поведение. Данные в «логическом» хранилище состояний в Kafka Streams фактически распределяются (обрабатываются) между фактическими экземплярами хранилища состояний между запущенными экземплярами вашего распределенного приложения Kafka Streams (даже если вы запускаете только 1 экземпляр приложения, например 1 контейнер Docker). для вашего приложения). Позвольте мне объяснить ниже.
Упрощенный пример, иллюстрирующий природу разделенных хранилищ состояний: если ваше приложение читает из входной темы с 5 разделами, то при обработке топологии этого приложения будет использоваться 5 потоковых задач, и каждая потоковая задача получит один раздел из «логическое» хранилище состояний (см. Архитектура Kafka Streams ). Если вы запускаете только 1 экземпляр приложения (например, 1 контейнер Docker) для своего приложения, то этот единственный экземпляр будет выполнять все 5 потоковых задач, но эти потоковые задачи являются настройкой без совместного использования ресурсов - и это означает, что данные по-прежнему разделены , Это также относится к KTable
s в потоках Кафки, которые также разделены таким образом.
См. Также: Является ли Kafka Stream StateStore глобальным для всех экземпляров или только локальным?
Ваш приведенный выше пример будет работать только в особом случае, когда входная тема имеет только 1 раздел, потому что тогда есть только 1 потоковая задача и, следовательно, только 1 хранилище состояний (которое будет иметь доступ ко всем доступным ключам во входных данных ).
Попытка получить доступ ко всем значениям ключа в определенном хранилище состояний [...]
Теперь, если вы хотите иметь доступ ко всем доступным ключам во входных данных, у вас есть два варианта (если вы не хотите идти по маршруту особого случая входной темы только с 1 разделом):
- Вариант 1: Использовать глобальные хранилища состояний (или
GlobalKTable
) вместо обычных, многораздельных хранилищ состояний. Глобальные хранилища состояний могут быть определены / созданы с помощью StreamsBuilder#addGlobalStore(...)
, но IIRC вам не нужно явно добавлять («прикреплять») глобальные хранилища к процессорам, что вы должны сделать для обычных хранилищ состояний. Вместо этого любые процессоры могут автоматически получать доступ к глобальным хранилищам.
- Вариант 2: Использование функции интерактивных запросов (состояние запроса) в потоках Kafka.
Обратите внимание, что в обоих вариантах вы можете получить доступ к данным в хранилищах состояний только для чтения . Вы не можете писать напрямую в государственные магазины в этих двух ситуациях. Если вам нужно изменить данные, то вы должны обновить их косвенно через разделы ввода, которые используются для заполнения хранилищ.