Невозможно десериализовать avro-данные, используя прямой поток kafka в потоковой передаче искры. - PullRequest
0 голосов
/ 31 октября 2018

Я использовал Kafka Avro Producer для записи avro-данных в тему kafka, и теперь я хочу прочитать эти данные, используя потоковую передачу. Я использовал API DirectKafkaStream для чтения данных avro, но он завершился ошибкой.

Это мой код для потокового воспроизведения, который считывает данные avro:

    public static void main(String[] args) throws InterruptedException {

    SparkConf sparkConf = new SparkConf().setAppName("Kafka Streaming").setMaster("local[*]");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

    Set<String> topicsSet = new HashSet(Arrays.asList("test_topic"));
    Map<String, String> kafkaParams = new HashMap();
    //kafkaParams.put("metadata.broker.list", "localhost:9092");

    kafkaParams.put("bootstrap.servers", "localhost:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class.getName());
    kafkaParams.put("value.deserializer", KafkaAvroDeserializer.class.getName());
    kafkaParams.put("schema.registry.url", "http://localhost:7788");
    kafkaParams.put("specific.avro.reader", "true");

    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, Customer> directKafkaStream = KafkaUtils.createDirectStream(
        jssc,
        String.class,
        Customer.class,
        KafkaAvroDecoder.class,
        KafkaAvroDecoder.class,
        kafkaParams,
        topicsSet
    );

    directKafkaStream.print();
    jssc.start();
    jssc.awaitTermination();}

Здесь Customer - это мой класс схемы, который я использовал при записи avro-данных в test_topic с помощью Kafka Avro Producer.

Когда я запускаю этот код, происходит сбой с ошибкой:

Error:(38, 78) java: no suitable method found for createDirectStream(org.apache.spark.streaming.api.java.JavaStreamingContext,java.lang.Class<java.lang.String>,java.lang.Class<com.example.Customer>,java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDecoder>,java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDecoder>,java.util.Map<java.lang.String,java.lang.String>,java.util.Set<java.lang.String>)
method org.apache.spark.streaming.kafka.KafkaUtils.<K,V,KD,VD>createDirectStream(org.apache.spark.streaming.api.java.JavaStreamingContext,java.lang.Class<K>,java.lang.Class<V>,java.lang.Class<KD>,java.lang.Class<VD>,java.util.Map<java.lang.String,java.lang.String>,java.util.Set<java.lang.String>) is not applicable
  (inferred type does not conform to equality constraint(s)
    inferred: java.lang.Object
    equality constraints(s): java.lang.Object,java.lang.String)
method org.apache.spark.streaming.kafka.KafkaUtils.<K,V,KD,VD,R>createDirectStream(org.apache.spark.streaming.api.java.JavaStreamingContext,java.lang.Class<K>,java.lang.Class<V>,java.lang.Class<KD>,java.lang.Class<VD>,java.lang.Class<R>,java.util.Map<java.lang.String,java.lang.String>,java.util.Map<kafka.common.TopicAndPartition,java.lang.Long>,org.apache.spark.api.java.function.Function<kafka.message.MessageAndMetadata<K,V>,R>) is not applicable
  (cannot infer type-variable(s) K,V,KD,VD,R
    (actual and formal argument lists differ in length))
method org.apache.spark.streaming.kafka.KafkaUtils.<K,V,KD,VD>createDirectStream(org.apache.spark.streaming.StreamingContext,scala.collection.immutable.Map<java.lang.String,java.lang.String>,scala.collection.immutable.Set<java.lang.String>,scala.reflect.ClassTag<K>,scala.reflect.ClassTag<V>,scala.reflect.ClassTag<KD>,scala.reflect.ClassTag<VD>) is not applicable
  (cannot infer type-variable(s) K,V,KD,VD
    (argument mismatch; org.apache.spark.streaming.api.java.JavaStreamingContext cannot be converted to org.apache.spark.streaming.StreamingContext))
method org.apache.spark.streaming.kafka.KafkaUtils.<K,V,KD,VD,R>createDirectStream(org.apache.spark.streaming.StreamingContext,scala.collection.immutable.Map<java.lang.String,java.lang.String>,scala.collection.immutable.Map<kafka.common.TopicAndPartition,java.lang.Object>,scala.Function1<kafka.message.MessageAndMetadata<K,V>,R>,scala.reflect.ClassTag<K>,scala.reflect.ClassTag<V>,scala.reflect.ClassTag<KD>,scala.reflect.ClassTag<VD>,scala.reflect.ClassTag<R>) is not applicable
  (cannot infer type-variable(s) K,V,KD,VD,R
    (actual and formal argument lists differ in length))

Тот же код работает, когда я использую StringDecoder, а не KafkaAvroDecoder, но полученные данные представлены в нечитаемом формате (некоторые сумасшедшие символы). Но я хочу, чтобы данные были декодированы при получении, чтобы я мог понять их. Пожалуйста, предложите правильный способ сделать это.

...