В настоящее время я работаю над сценарием использования, в котором отслеживается взаимодействие пользователя с платформой, что создает поток событий, который сохраняется в kafka и впоследствии обрабатывается в Kafka Streams / K SQL.
Но я столкнулся с проблемой, связанной с политиками хранения хранилища и списка изменений topi c. Сеансы пользователей могут происходить независимо друг от друга во времени, поэтому я должен гарантировать, что состояние будет сохраняться в течение этого периода и восстанавливаться в случае сбоев узла и всего кластера. Во время наших поисков мы обнаружили следующую информацию:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Internal+Data+Management
Kafka Streams позволяет обрабатывать потоки с сохранением состояния, то есть операторы, которые имеют внутреннее состояние. (...). Реализация по умолчанию, используемая DSL Kafka Streams, представляет собой отказоустойчивое хранилище состояний, использующее 1. внутренне созданный и сжатый список изменений topi c (для отказоустойчивости) и 2. один (или несколько) экземпляров RocksDB (для кэшированного значения ключа поиски). Таким образом, в случае запуска / остановки приложений и перемотки / повторной обработки эти внутренние данные должны корректно обрабатываться.
(...) Таким образом, требование к памяти в RocksDB не увеличивается бесконечно (в отличие от toplog * changelog) 1065 *). (KAFKA-4015 была исправлена в выпуске 0.10.1, и оконные разделы журнала изменений не становятся неограниченными, поскольку к ним применяется дополнительный параметр времени хранения).
Время хранения в kafka локальное хранилище состояний / журнал изменений
"Для оконных таблиц KTables существует локальное время хранения и время хранения журнала изменений. Вы можете установить время хранения локального хранилища с помощью Materialized.withRetentionTime (.. .) - значение по умолчанию: 24 ч.
Если создается новое приложение, разделы журнала изменений создаются с тем же временем хранения, что и у локального хранилища. "
https://docs.confluent.io/current/streams/developer-guide/config-streams.html
Состояния параметра windowstore.changelog.additional.retention.ms:
Добавлен в windows keepMs, чтобы гарантировать отсутствие данных удален из журнала преждевременно. Допускается смещение тактовой частоты.
Казалось бы, Kafka Streams 'поддерживает как (реплицированное) локальное хранилище состояний, так и список изменений topi c для отказоустойчивости, причем оба имеют конечное значение, настраиваемый срок хранения, и, очевидно, удалит записи по истечении времени хранения. Это может привести к неприемлемой потере данных на нашей платформе, что поднимает следующие вопросы:
Чистит ли Kafka Streams со временем хранилище состояний по умолчанию или я что-то неправильно понял? Существует ли реальный риск потери данных?
В этом случае целесообразно или вообще возможно установить политику хранения с неограниченным сроком хранения в хранилище состояний? Или, может быть, существует другой способ убедиться, что состояние будет сохраняться, например, использовать более традиционную базу данных в качестве хранилища состояний, если это имеет смысл?
Применяется ли политика хранения к резервные реплики?
Если невозможно постоянно сохранить состояние, может ли существовать другая среда потоковой обработки, которая лучше подходит для нашего варианта использования?
Любые разъяснения будут оценены.