Скала Спарк потоковая кафка - PullRequest
0 голосов
/ 14 сентября 2018

Я создал образец темы в kafka и пытаюсь использовать содержимое в spark, используя приведенный ниже скрипт:

import org.apache.spark._
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.kafka._
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import 
org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe


 class Kafkaconsumer {
  val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "host1:port,host2:port2,host3:port3",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
  )
  val sparkConf = new SparkConf().setMaster("yarn")
 .setAppName("kafka example")
  val streamingContext = new StreamingContext(sparkConf, Seconds(10))
  val topics = Array("topicname")
  val topicsSet = topics.split(",").toSet
  val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](kafkaParams,topicsSet)
  )
  stream.print()
  stream.map(record => (record.key, record.value))
  streamingContext.start()
  streamingContext.awaitTermination()

Я также включил необходимые библиотеки для выполнения кода.

У меня есть ошибка ниже, пожалуйста, дайте мне знать, как решить эту проблему.

Error:
 Error:(23, 27) wrong number of type parameters for overloaded method value createDirectStream with alternatives:
  [K, V, KD <: kafka.serializer.Decoder[K], VD <: kafka.serializer.Decoder[V]](jssc: org.apache.spark.streaming.api.java.JavaStreamingContext, keyClass: Class[K], valueClass: Class[V], keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], kafkaParams: java.util.Map[String,String], topics: java.util.Set[String])org.apache.spark.streaming.api.java.JavaPairInputDStream[K,V] <and>
  [K, V, KD <: kafka.serializer.Decoder[K], VD <: kafka.serializer.Decoder[V], R](jssc: org.apache.spark.streaming.api.java.JavaStreamingContext, keyClass: Class[K], valueClass: Class[V], keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], recordClass: Class[R], kafkaParams: java.util.Map[String,String], fromOffsets: java.util.Map[kafka.common.TopicAndPartition,Long], messageHandler: org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[K,V],R])org.apache.spark.streaming.api.java.JavaInputDStream[R] <and>
  [K, V, KD <: kafka.serializer.Decoder[K], VD <: kafka.serializer.Decoder[V]](ssc: org.apache.spark.streaming.StreamingContext, kafkaParams: Map[String,String], topics: Set[String])(implicit evidence$19: scala.reflect.ClassTag[K], implicit evidence$20: scala.reflect.ClassTag[V], implicit evidence$21: scala.reflect.ClassTag[KD], implicit evidence$22: scala.reflect.ClassTag[VD])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and>
  [K, V, KD <: kafka.serializer.Decoder[K], VD <: kafka.serializer.Decoder[V], R](ssc: org.apache.spark.streaming.StreamingContext, kafkaParams: Map[String,String], fromOffsets: Map[kafka.common.TopicAndPartition,Long], messageHandler: kafka.message.MessageAndMetadata[K,V] => R)(implicit evidence$14: scala.reflect.ClassTag[K], implicit evidence$15: scala.reflect.ClassTag[V], implicit evidence$16: scala.reflect.ClassTag[KD], implicit evidence$17: scala.reflect.ClassTag[VD], implicit evidence$18: scala.reflect.ClassTag[R])org.apache.spark.streaming.dstream.InputDStream[R]val stream = KafkaUtils.createDirectStream[String, String](

1 Ответ

0 голосов
/ 16 сентября 2018

Добавьте параметры типа для требуемого типа декодирования ключа и значения, например:

Измените:

KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](kafkaParams,topicsSet)
)

на

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](kafkaParams,topicsSet)
)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...