Невозможно установить потребительские конфиги kafka spark - PullRequest
0 голосов
/ 05 февраля 2020

Я использую версию spark- sql -2.4.x с клиентом kafka.

Даже после установки параметра конфигурации потребителя, например, max.partition.fetch.bytes & max.poll.records

он не устанавливается должным образом и показывает значения по умолчанию, как показано ниже

Dataset<Row> df = sparkSession
                      .readStream()
                      .format("kafka")
                      .option("kafka.bootstrap.servers", server1)
                      .option("subscribe", TOPIC1) 
                      .option("includeTimestamp", true)
                      .option("startingOffsets", "latest")
                      .option("max.partition.fetch.bytes", "2097152") // default 1000,000
                      .option("max.poll.records", 6000)  // default 500
                      .option("metadata.max.age.ms", 450000) // default 300000
                      .option("failOnDataLoss", false)
                      .load();

Он все еще отображается в журналах, как показано ниже, при запуске потребителя:

[Executor task launch worker for task 21] INFO  org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = none
        check.crcs = true
        client.id =
        connections.max.idle.ms = 540000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Как правильно это установить?

1 Ответ

2 голосов
/ 05 февраля 2020

Из документации :

Собственные конфигурации Кафки можно установить через DataStreamReader.option с kafka. префикс, например, stream.option ("kafka. bootstrap .servers", "host: port"). Для возможных параметров kafka см. Конфигурация потребителя Kafka документы для параметров, связанных с чтением данных, и Конфигурация производителя Kafka документы для параметров, связанных с записью данных.

Я считаю, что вам нужно добавить «Кафка». на ваш выбор, например:

.option("kafka.max.poll.records", 6000) 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...