перегруженное значение метода createDirectStream с альтернативами - PullRequest
0 голосов
/ 19 февраля 2020

Моя версия spark - 1.6.2, а версия My kafka - 0.10.1.0. И я хочу отправить пользовательский объект в качестве типа значения kafka, и я пытаюсь поместить этот sh этот пользовательский объект в kafka topi c. И используйте потоковую передачу для чтения данных. И я использую прямой подход. Ниже приведен мой код:

import com.xxxxx.kafka.{KafkaJsonDeserializer, KafkaObjectDecoder, pharmacyData}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object sparkReadKafka {
  val sparkConf = new SparkConf().setAppName("SparkReadKafka")
  val sc = new SparkContext(sparkConf)
  val ssc = new StreamingContext(sc, Seconds(1))

  def main(args: Array[String]): Unit = {
    val kafkaParams = Map[String, Object] (
      "bootstrap.servers" -> "kafka.kafka-cluster-shared.non-prod-5-az-scus.prod.us.xxxxx.net:9092",
      //"key.deserializer" -> classOf[StringDeserializer],
      //"value.deserializer" -> classOf[KafkaJsonDeserializer],
      "group.id" -> "consumer-group-2",
      "auto.offset.reset" -> "earliest",
      "auto.commit.interval.ms" -> "1000",
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "session.timeout.ms" -> "30000"
    )

    val topic = "hw_insights"

    val stream = KafkaUtils.createDirectStream[String, pharmacyData, StringDecoder, KafkaObjectDecoder](ssc, kafkaParams, Set(topic))
  }
}

И ошибка, которую я получил, похожа на эту (я должен удалить какую-то часть в целях безопасности):

Ошибка: (29, 47) перегруженное значение метода createDirectStream с альтернативами: (jss c: org. apache .spark.streaming.api. java .JavaStreamingContext, keyClass: Class [String], valueClass: Class [com.xxxxxxx.kafka. pharmacyData], keyDecoderClass: класс [kafka.serializer.StringDecoder], valueDecoderClass: класс [com.xxxxxxx.kafka.KafkaObjectDecoder], kafkaParams: java .util.Map [строка, строка], темы: * 1018 .u. Установите [String]) org. apache .spark.streaming.api. java .JavaPairInputDStream [String, com.xxxxxxx.kafka.pharmacyData] (ss c: org. apache .spark.streaming.StreamingContext , kafkaParams: scala .collection.immutable.Map [String, String], темы: scala .collection.immutable.Set [String]) (неявное доказательство $ 19: scala .reflect.ClassTag [String], неявное доказательства $ 20: scala .reflect.ClassTag [com.xxxxxxx.kafka.pharmacyData], неявные доказательства $ 21: scala .reflec t.ClassTag [kafka.serializer.StringDecoder], неявное доказательство $ 22: scala .reflect.ClassTag [com.xxxxxxx.kafka.KafkaObjectDecoder]) org. apache .spark.streaming.dstream.InputDStream [(String, com) .xxxxxxx.kafka.pharmacyData)] нельзя применить к (org. apache .spark.streaming.StreamingContext, scala .collection.immutable.Map [String, Object], scala .collection.immutable.Set [ String]) val stream = KafkaUtils.createDirectStream [String, pharmacyData, StringDecoder, KafkaObjectDecoder] (ss c, kafkaParams, Set (topi c)) И ниже мой класс декодера клиента:

import kafka.serializer.Decoder
import org.codehaus.jackson.map.ObjectMapper

class KafkaObjectDecoder extends Decoder[pharmacyData] {
  override def fromBytes(bytes: Array[Byte]): pharmacyData = {
    val mapper = new ObjectMapper()
    val pdata = mapper.readValue(bytes, classOf[pharmacyData])
    pdata
  }
}

Может кто-нибудь помочь мне с вопросами? Спасибо!

1 Ответ

0 голосов
/ 19 февраля 2020

Ошибка говорит, что ваши параметры неверны

не может быть применен к (org.apache.spark.streaming.StreamingContext, scala.collection.immutable.Map[String,Object], scala.collection.immutable.Set[String])

ближайший метод , который он считает нужным

(jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass: Class[String],valueClass: Class[com.xxxxxxx.kafka.pharmacyData],keyDecoderClass: Class[kafka.serializer.StringDecoder],valueDecoderClass: Class[com.xxxxxxx.kafka.KafkaObjectDecoder],kafkaParams: java.util.Map[String,String],topics: java.util.Set[String])

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...