Kafka-streams: установка политики очистки внутренних тем для удаления не работает - PullRequest
0 голосов
/ 02 сентября 2018

Я использую функцию уменьшения потоков kafka, и она создает некоторую внутреннюю тему журнала изменений хранилища состояний (например, app-KSTREAM-REDUCE-STATE-STORE-0000000002-changelog).

Я хотел установить байты хранения и изменить политику очистки на удалить , чтобы предотвратить переполнение хранилища. Поэтому я установил следующие конфиги в коде потоков kafka:

Properties props = new Properties();
props.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.RETENTION_BYTES_CONFIG, Constants.INTERNAL_TOPICS_RETENTION_BYTES);
props.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
KafkaStreams streams = new KafkaStreams(builder.build(), props);

Однако при создании новой темы только конфигурация хранения применяется к вновь созданной внутренней теме, и политика очистки остается compact .

Есть ли пропущенный шаг для этого? (или нельзя ли установить политику очистки внутренних тем для удаления?)

Я использую версию kafka 1.0.0 и версию kafka-streams 1.0.0

1 Ответ

0 голосов
/ 02 сентября 2018

Спасибо Гожану за ответ в kafka рассылке :

Описанная вами проблема выглядит, как старая ошибка, решаемая с 1.1.0 (как часть исправления в https://jira.apache.org/jira/browse/KAFKA-6150).

... Вам не нужно обновлять брокер, чтобы использовать более новую библиотеку Streams версии.

Обновление версии kafka-streams до 1.1.0 решило проблему.

...