Spark Streaming Kafka CreateDirectStream не разрешается - PullRequest
0 голосов
/ 16 мая 2018

Нужна помощь, пожалуйста.

Я использую IntelliJ с SBT для сборки своих приложений.

Я работаю над приложением, чтобы прочитать тему Кафки в Spark Streaming, чтобысделать некоторые ETL работу над этим.К сожалению, я не могу читать с Kafka.

KafkaUtils.createDirectStream не разрешается и продолжает давать мне ошибки (НЕВОЗМОЖНО РАЗРЕШИТЬ СИМВОЛ).Я провел свое исследование, и, похоже, у меня правильные зависимости.

Вот мой build.sbt:

name := "ASUIStreaming"
version := "0.1"
scalacOptions += "-target:jvm-1.8"
scalaVersion := "2.11.11"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0"
libraryDependencies += "org.apache.kafka" %% "kafka-clients" % "0.8.2.1"
libraryDependencies += "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4"

Есть предложения?Я должен также упомянуть, что у меня нет доступа администратора на ноутбуке, так как это рабочий компьютер, и я использую портативную установку JDK и IntelliJ.Тем не менее, мои коллеги по работе находятся в такой же ситуации, и у них это нормально работает.

Заранее спасибо!

Ответы [ 2 ]

0 голосов
/ 17 мая 2018

Мне удалось решить проблему.После повторного создания проекта и повторного добавления всех зависимостей я обнаружил, что в Intellij определенный код должен находиться в одной строке, в противном случае он не будет компилироваться.

В этом случае добавим val kafkaParams код в той же строке (а не в блоке кода) решил проблему!

0 голосов
/ 16 мая 2018

Вот основной фрагмент кода Spark Streaming, который я использую. Примечание : я замаскировал некоторые конфиденциальные рабочие данные, такие как IP-адрес, название темы и т. Д.

import org.apache.kafka.clients.consumer.ConsumerRecord
import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.serialization.StringDeserializer
import scala.util.parsing.json._
import org.apache.spark.streaming.kafka._



object ASUISpeedKafka extends App

{
  // Create a new Spark Context
  val conf = new SparkConf().setAppName("ASUISpeedKafka").setMaster("local[*]")
  val sc = new SparkContext(conf)
  val ssc = new StreamingContext(sc, Seconds(2))

  //Identify the Kafka Topic and provide the parameters and Topic details
  val kafkaTopic = "TOPIC1"
    val topicsSet = kafkaTopic.split(",").toSet
    val kafkaParams = Map[String, String]
  (

    "metadata.broker.list" -> "IP1:PORT, IP2:PORT2",
    "auto.offset.reset" -> "smallest"
  )

  val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]
  (
  ssc, kafkaParams, topicsSet
  )
}
...