Как мы можем включить сжатие журналов, используя весенний облачный поток? - PullRequest
0 голосов
/ 04 июня 2018

Как настроить log.cleanup.policy: сжатие с использованием конфигурации пружины

Ответы [ 3 ]

0 голосов
/ 04 июня 2018

Это свойство является Конфигурацией брокера: http://kafka.apache.org/documentation/#brokerconfigs. Поэтому его необходимо настроить на стороне брокера.С точки зрения Spring Cloud Stream Kafka Binder делать нечего.Это просто клиент для существующего брокера Apache Kafka.

Если вы говорите о KafkaEmbedded с точки зрения Spring Kafka, есть следующие варианты:

/**
 * Specify the properties to configure Kafka Broker before start, e.g.
 * {@code auto.create.topics.enable}, {@code transaction.state.log.replication.factor} etc.
 * @param brokerProperties the properties to use for configuring Kafka Broker(s).
 * @return this for chaining configuration
 * @see KafkaConfig
 */
public KafkaEmbedded brokerProperties(Map<String, String> brokerProperties) {
    this.brokerProperties.putAll(brokerProperties);
    return this;
}

/**
 * Specify a broker property.
 * @param property the property name.
 * @param value the value.
 * @return the {@link KafkaEmbedded}.
 * @since 2.1.4
 */
public KafkaEmbedded brokerProperty(String property, Object value) {
    this.brokerProperties.put(property, value);
    return this;
}
0 голосов
/ 04 июня 2018

log.cleanup.policy - это конфигурация посредника (в server.properties), а не свойство клиента.

Чтобы изменить политику для отдельной темы

kafka-topics --zookeeper localhost:2181 --alter --topic myTopic --config cleanup.policy=compact

или

kafka-configs --zookeeper localhost:2181 --entity-type=topics --entity-name=mytopic --alter --add-config cleanup.policy=compact

(поскольку первая устарела)

WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
     Going forward, please use kafka-configs.sh for this functionality
0 голосов
/ 04 июня 2018

Обновление этого ответа на основе ответов Гари и Артема ниже, чтобы избежать путаницы.

Вы можете передать любую произвольную конфигурацию клиента kafka, используя ключ spring.cloud.stream.kafka.binder.configurarion....Однако, поскольку log.cleanup.policy является свойством уровня брокера, его нельзя использовать таким образом из подшивки.Вам нужно установить его на брокера.Пожалуйста, смотрите ответы ниже для получения дополнительной информации.

...