Я использую функцию уменьшения потоков kafka, и она создает некоторую внутреннюю тему журнала изменений хранилища состояний (например, app-KSTREAM-REDUCE-STATE-STORE-0000000002-changelog).
Я хотел установить байты хранения и изменить политику очистки на удалить , чтобы предотвратить переполнение хранилища. Поэтому я установил следующие конфиги в коде потоков kafka:
Properties props = new Properties();
props.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.RETENTION_BYTES_CONFIG, Constants.INTERNAL_TOPICS_RETENTION_BYTES);
props.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
Однако при создании новой темы только конфигурация хранения применяется к вновь созданной внутренней теме, и политика очистки остается compact .
Есть ли пропущенный шаг для этого? (или нельзя ли установить политику очистки внутренних тем для удаления?)
Я использую версию kafka 1.0.0 и версию kafka-streams 1.0.0