Добавить параметры конфигурации - spark & ​​Kafka: acks и компрессия - PullRequest
1 голос
/ 21 июня 2019

Я хочу добавить некоторые параметры в мое приложение spark & ​​Kafka для записи Dataframe в тему kafka.

Я не нашел acks иression.codec в документации по spark-kafka

   .write
   .format("kafka")
   .option("kafka.sasl.mechanism", Config.KAFKA_SASL_MECHANISM)
   .option("kafka.security.protocol", Config.KAFKA_SECURITY_PROTOCOL)
   .option("kafka.sasl.jaas.config", KAFKA_JAAS_CONFIG)
   .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
   .option("fetchOffset.numRetries", 6)
   .option("acks","all")
   .option("compression.codec","lz4")
   .option("kafka.request.timeout.ms", 120000)
   .option("topic", topic)
   .save()```

Ответы [ 2 ]

0 голосов
/ 21 июня 2019

Для сериализаторов создайте класс case или иным образом столбец данных с одним-тремя столбцами, который просто содержит поля Array[Byte] для key и value (также будет работать String). Тогда topic Строковое поле. Если вам нужно только значение Кафки, то вам нужен только один столбец Dataframe

Вам необходимо отобразить текущие данные, чтобы сериализовать их все перед записью в Kafka.

Затем в документации говорится, что любое другое свойство производителя имеет префикс kafka.

.

Больше информации здесь https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

Для свойств SASL, я думаю, вам нужно использовать spark.executor.options и передавать ключевые вкладки или файлы jaas с --files во время отправки, хотя

0 голосов
/ 21 июня 2019

Вы можете использовать это конкретное свойство для определения вашего сериализатора: default.value.serde

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...