Я пытаюсь запустить потоковое приложение искры с Kafka, используя пряжу.Я получаю следующую ошибку трассировки стека -
Причина: org.apache.kafka.common.config.ConfigException: отсутствует требуемая конфигурация "partition.assignment.strategy", которая не имеет значения по умолчанию.в org.apache.kafka.common.config.ConfigDef.parse (ConfigDef.java:124) в org.apache.kafka.common.config.AbstractConfig. (AbstractConfig.java:48) в org.apache.kafka.clients.consumer.ConsumerConfig. (ConsumerConfig.java:194) в org.apache.kafka.clients.consumer.KafkaConsumer. (KafkaConsumer.java:380) в org.apache.kafka.clients.consumer.KafkaConsumer: KafkaConsumer.) в org.apache.kafka.clients.consumer.KafkaConsumer. (KafkaConsumer.java:350) в org.apache.spark.streaming.kafka010.CachedKafkaConsumer. (CachedKafkaConsumer.scala: 45) в org.streamingsp.kafka010.CachedKafkaConsumer $ .get (CachedKafkaConsumer.scala: 194) в org.apache.spark.streaming.kafka010.KafkaRDDIterator. (KafkaRDD.scala: 252) в org.apache.spark.streaming.kaDkaDkaDkaDkaDka0kaFka0scala: 212) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 324) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 288) в org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 49) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 324) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 288) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.sc:87) в org.apache.spark.scheduler.Task.run (Task.scala: 109) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 345)
Вот фрагмент моего кода того, как я создаю свой KafkaStream с помощью spark stream-
val ssc = new StreamingContext(sc, Seconds(60))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "*boorstrap_url:port*",
"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "annotation-test",
//Tried commenting and uncommenting this property
//"partition.assignment.strategy"->"org.apache.kafka.clients.consumer.RangeAssignor",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val topics = Array("*topic-name*")
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
val valueKafka = kafkaStream.map(record => record.value())
Я прошел следующий пост -
- https://issues.apache.org/jira/browse/KAFKA-4547
- Pyspark Structured Streaming Ошибка конфигурации Kafka
В соответствии с этим я обновил мою утилиту kafka util jar в своей толстой банке до 0.10.2.0 версии от 0.10.1.0 упакован по умолчанию из spark-stream-kafka-jar как временная зависимость.Также моя работа работает нормально, когда я запускаю ее на одном узле, установив master как локальный.Я использую версию 2.3.1.