Неверное количество параметров типа для функции перегрузки createDirectStream - PullRequest
0 голосов
/ 30 апреля 2018

Я новичок в работе со scala, и при попытке запустить этот простой код, который пытается прочитать из темы kafka, я застрял в ошибке при создании прямого потока, что я указал неверное количество параметров типа для функции перегрузки createDirectStream , Ниже строка, где я получаю ошибку

val messages = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder]
(streamingContext, kafkaParams, topicsSet)

А ниже приведен полный код.

package com.test.spark

import java.util.Properties

import org.apache.spark
import kafka.serializer.StringDecoder

import org.apache.spark.streaming.kafka010._

import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object KafkaAirDRsProcess {

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("AirDR Kafka to Spark")

    val sc = new SparkContext(sparkConf)
    val streamingContext = new StreamingContext(sc, Seconds(10))

    // Create direct kafka stream with brokers and topics
    val brokers = "10.21.165.145:6667 "
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

    val topics="AIRMAIN , dummy"
    val topicsSet = topics.split(",").toSet
    //val topicsSet=topics.map(_.toString).toSet

    val messages = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder]
    (streamingContext, kafkaParams, topicsSet)
    val LinesDStream = messages.map(_._2)
    val AirDRStream= LinesDStream.map(AirDRFilter.parseAirDR)

    AirDRStream.foreachRDD(foreachFunc = rdd => {
      System.out.println("--- New RDD with " + rdd.count() + " records");

      if (rdd.count() > 0) {
        rdd.toDF().registerTempTable("AirDRTemp")
        val FilteredCDR = sqlContext.sql("select * from AirDRTemp"  )

        println("======================print result =================")
        FilteredCDR.show()
      }

    });

    //streamingContext.checkpoint("/tmp/mytest/ckpt/")
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

Ниже приведен снимок intellij error

enter image description here

1 Ответ

0 голосов
/ 30 апреля 2018

Поскольку вы используете kafka-0-10, вы можете создать InputDStream, как показано ниже

    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe



  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "10.21.165.145:6667:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (true: java.lang.Boolean)
  )

  val topics = ???

  val stream = KafkaUtils.createDirectStream[String, String](
    streamingContext,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )

Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...