Как получить доступ к параметрам источника данных (например, kafka)? - PullRequest
1 голос
/ 26 октября 2019

Я устанавливаю параметры пакетного процесса Spark для использования из Kafka, но когда я пытаюсь получить свойства конфигурации, он отображается как None. почему так?

val df = sparkSession
        .read
        .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
        .option("kafka.bootstrap.servers", "kafka.brokers".getConfigValue) 
        .option("subscribe", "kafka.devicelocationdatatopic".getConfigValue) 
        .option("startingOffsets", "kafka.startingOffsets".getConfigValue)
        .option("endingOffsets", "kafka.endingOffsets".getConfigValue)
        .option("failOnDataLoss", "false") // any failure regarding data loss in topic or else, not supposed to fail, it has to continue...
        .option("maxOffsetsPerTrigger", "3")
        .load()

println("maxOffsetsPerTrigger "  + df.sparkSession.conf.getOption("maxOffsetsPerTrigger"))

Токовый выход

None

Желаемый выход

maxOffsetsPerTrigger 3

1 Ответ

0 голосов
/ 26 октября 2019

Когда я пытаюсь получить свойства конфигурации, он отображается как None. почему так?

Они доступны только для базового источника данных. Spark SQL пытается скрыть тонкости работы с различными источниками данных, и это одна из многих деталей реализации.

df.sparkSession.conf.getOption ("maxOffsetsPerTrigger")

Это отличается от параметров, которые вы указываете при описании источника данных (например, kafka).

В приведенном выше примере вы хотите получить доступ к свойству Spark maxOffsetsPerTrigger, а часть Option относится к возвращаемому типу Scala. не общее значение слова "option".

Вы можете указать свойство Spark, используя --conf в командной строке. Обратите внимание, что допускаются только свойства с префиксом spark..

$ spark-shell \
    --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.4 \
    --conf spark.maxOffsetsPerTrigger=3

scala> spark.conf.getOption("spark.maxOffsetsPerTrigger")
res0: Option[String] = Some(3)

Желаемый вывод

Поскольку он не доступен из коробки, вы должны его обойти иОбойти некоторые "частные" заборы.

Следующий код должен сделать свое дело. Используйте на свой страх и риск.

// spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.4

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "demo:9092")
  .option("subscribe", "demo")
  .option("maxOffsetsPerTrigger", "3")
  .load

val plan = df.queryExecution.logical

scala> println(plan.numberedTreeString)
00 Relation[key#0,value#1,topic#2,partition#3,offset#4L,timestamp#5,timestampType#6] KafkaRelation(strategy=Subscribe[demo], start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit)

// :paste -raw
// BEGIN
package org.apache.spark.sql.kafka010
object Util {
  import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
  def bypassPrivateKafka010(plan: LogicalPlan) = {
    import org.apache.spark.sql.execution.datasources.LogicalRelation
    import org.apache.spark.sql.kafka010.KafkaRelation
    plan.collect { case LogicalRelation(r: KafkaRelation, _, _, _) => r }
  }
}
// END

import org.apache.spark.sql.kafka010.Util.bypassPrivateKafka010
import org.apache.spark.sql.kafka010.KafkaRelation
val kafkaRelation = bypassPrivateKafka010(plan).head

// sourceOptions is a private field of KafkaRelation
// :paste -raw
// BEGIN
package org.apache.spark.sql.kafka010
object Util2 {
  import org.apache.spark.sql.kafka010.KafkaRelation
  def bypassPrivate(r: KafkaRelation): Map[String, String] = {
    val clazz = classOf[KafkaRelation]
    val sourceOptions = clazz.getDeclaredField("sourceOptions")
    sourceOptions.setAccessible(true)
    sourceOptions.get(r).asInstanceOf[Map[String, String]]
  }
}
// END

import org.apache.spark.sql.kafka010.Util2.bypassPrivate
val options = bypassPrivate(kafkaRelation)
scala> options.foreach(println)
(maxoffsetspertrigger,3)
(subscribe,demo)
(kafka.bootstrap.servers,demo:9092)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...