В стандартном примере Spark Streaming отсутствует ошибка требуемой конфигурации «bootstrap.servers» - PullRequest
0 голосов
/ 26 сентября 2018

Я немного новичок в Scala и Spark, поэтому не стесняйтесь судить меня, но не слишком сложно.

Я пытаюсь запустить стандартный пример DirectKafkaWordCount (поставляется с установкой Spark2), чтобы проверить, как Spark Streamingработает с Kafka.

Это код примера (также можно найти здесь ):

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: DirectKafkaWordCount <brokers> <topics>
 *   <brokers> is a list of one or more Kafka brokers
 *   <topics> is a list of one or more kafka topics to consume from
 *
 * Example:
 *    $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \
 *    topic1,topic2
 */
object DirectKafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(s"""
        |Usage: DirectKafkaWordCount <brokers> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println

При попытке запустить его пришлось поставитьspark-streaming-kafka-0-10_2.11-2.3.1.jar и kafka-clients-0.10.0.1.jar в каталог /usr/hdp/3.0.0.0-1634/spark2/jars/ (что меня несколько удивило,так как я предполагал, что все стандартные примеры, предоставляемые с установкой, должны были работать из коробки, но пример WordCount требовался для этих пакетов).После добавления этих jar-файлов я попытался прочитать записи из темы test и выполнить подсчет слов с помощью команды

/ usr / hdp / 3.0.0.0-1634 / spark2 / bin /run-example streaming.DirectKafkaWordCount localhost: 9092 test

Однако происходит сбой приложения, и полученная ошибка выглядит следующим образом:

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.
        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:421)
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:55)
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
        at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:376)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
        at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:84)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:70)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:240)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
        at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
        at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
        at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
        at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
        at org.apache.spark.examples.streaming.DirectKafkaWordCount$.main(DirectKafkaWordCount.scala:70)
        at org.apache.spark.examples.streaming.DirectKafkaWordCount.main(DirectKafkaWordCount.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Это смущает меня, так как у меня естьпредоставил загрузочный сервер (localhost: 9092) в команде запуска.Есть идеи, куда копать отсюда?

Моя конфигурация:

Spark - 2.3.1

Кафка - 2.11-1.0.1

Ответы [ 3 ]

0 голосов
/ 27 сентября 2018

Этот пример не обновлялся более года, но, похоже, вам необходимо переименовать metadata.broker.list в bootstrap.servers, то есть имя свойства, которое используют все другие клиенты Kafka.

Яне уверен, что сценарий run-example правильно передает аргументы, но вы захотите указать внешний IP или имя хоста брокера (ов) Kafka, а не localhost.

Кроме того, рекомендуется использовать структурированную потоковую передачу и API Dataframe в Spark2 + поверх DStream и RDD

0 голосов
/ 27 апреля 2019

В случае, если вы работаете с весенней загрузкой с kafka, и если вы сталкиваетесь с этой ошибкой

org.apache.kafka.common.config.ConfigException: отсутствует требуемая конфигурация "bootstrap.servers", который не имеет значения по умолчанию.

Убедитесь, что у вас есть эти вещи:

  1. spring.kafka.bootstrap-servers задают это свойство в файле poperrty или yml.
  2. Сервер Zookeeper и kafka запущен.
  3. Потребитель работает по этой команде "kafka-console-consumer.bat / sh" (в зависимости от ОС).
  4. spring.kafka.consumer.group-id должен быть установлен.
  5. spring.kafka.consumer.auto-offset-reset = ранее

Это поможет кому-то.

Спасибо,

Атул

0 голосов
/ 26 сентября 2018

Вам необходимо добавить bootstrap.servers в параметрах kafka, поскольку потребителю требуются серверы начальной загрузки для приема сообщений из любой темы по spark-streaming-kafka-0-10_2.11-2.3.1.jar.

val kafkaParams = Map[String, Object]("bootstrap.servers" -> "alpha-kafka-1.com:9092,alpha-kafka-2.com:9092,alpha-kafka-3.com:9092")

Ресурс: https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-a-direct-stream

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...