Spark Kafka продолжает напоминать несоответствие типов при вызове mapWithState - PullRequest
0 голосов
/ 02 октября 2018

Я попытался реализовать wordCount от Kafka и получил ошибку «несоответствие типов» при использовании функции mapWithState.

Вот мой код:

// make a connection to Kafka and read (key, value) pairs from it
    val sparkConf = new SparkConf().setAppName("DirectKafkaAvg").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val kafkaConf = Map(
        "metadata.broker.list" -> "localhost:9092",
        "zookeeper.connect" -> "localhost:2181",
        "group.id" -> "kafka-spark-streaming",
        "zookeeper.connection.timeout.ms" -> "1000")
    val topics = Set("avg")
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topics)
    val value = messages.map{case (key, value) => value.split(',')}
    val pairs = value.map(record => (record(1), record(2)))

    // measure the average value for each key in a stateful manner
    def mappingFunc(key: String, value: Option[Double], state: State[Double]): Option[(String, Double)] = {
        val sum = value.getOrElse(0.0) + state.getOption.getOrElse(0.0)
        val output = Option(key, sum)
        state.update(sum)
        output
    }
    val spec = StateSpec.function(mappingFunc _)
    val stateDstream = pairs.mapWithState(spec)

    // store the result in Cassandra
    stateDstream.print()

    ssc.start()
    ssc.awaitTermination()

Вот журнал ошибок:

[error] KafkaSpark.scala:50: type mismatch;
[error]  found   : org.apache.spark.streaming.StateSpec[String,Double,Double,Option[(String, Double)]]
[error]  required: org.apache.spark.streaming.StateSpec[String,String,?,?]
[error]     val stateDstream = pairs.mapWithState(spec)
[error]                                           ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

Кто-нибудь знает, как с этим бороться?

Ответы [ 2 ]

0 голосов
/ 02 октября 2018

Поток pair в вашем коде - это пара строк , но ваш mappingFunc предполагает, что 2-е значение пары имеет тип Double.Попробуйте изменить строку

val pairs = value.map(record => (record(1), record(2)))

на

val pairs = value.map(record => (record(1), record(2).toDouble))
0 голосов
/ 02 октября 2018

Вы должны добавить параметры типа, чтобы

val spec = StateSpec.function[String,Double,Double,Option[(String, Double)]](mappingFunc _)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...