Я использовал Kafka Avro Producer для записи avro-данных в тему kafka, и теперь я хочу прочитать эти данные, используя потоковую передачу. Я использовал API DirectKafkaStream для чтения данных avro, но он завершился ошибкой.
Это мой код для потокового воспроизведения, который считывает данные avro:
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setAppName("Kafka Streaming").setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet = new HashSet(Arrays.asList("test_topic"));
Map<String, String> kafkaParams = new HashMap();
//kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class.getName());
kafkaParams.put("value.deserializer", KafkaAvroDeserializer.class.getName());
kafkaParams.put("schema.registry.url", "http://localhost:7788");
kafkaParams.put("specific.avro.reader", "true");
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, Customer> directKafkaStream = KafkaUtils.createDirectStream(
jssc,
String.class,
Customer.class,
KafkaAvroDecoder.class,
KafkaAvroDecoder.class,
kafkaParams,
topicsSet
);
directKafkaStream.print();
jssc.start();
jssc.awaitTermination();}
Здесь Customer - это мой класс схемы, который я использовал при записи avro-данных в test_topic с помощью Kafka Avro Producer.
Когда я запускаю этот код, происходит сбой с ошибкой:
Error:(38, 78) java: no suitable method found for createDirectStream(org.apache.spark.streaming.api.java.JavaStreamingContext,java.lang.Class<java.lang.String>,java.lang.Class<com.example.Customer>,java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDecoder>,java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDecoder>,java.util.Map<java.lang.String,java.lang.String>,java.util.Set<java.lang.String>)
method org.apache.spark.streaming.kafka.KafkaUtils.<K,V,KD,VD>createDirectStream(org.apache.spark.streaming.api.java.JavaStreamingContext,java.lang.Class<K>,java.lang.Class<V>,java.lang.Class<KD>,java.lang.Class<VD>,java.util.Map<java.lang.String,java.lang.String>,java.util.Set<java.lang.String>) is not applicable
(inferred type does not conform to equality constraint(s)
inferred: java.lang.Object
equality constraints(s): java.lang.Object,java.lang.String)
method org.apache.spark.streaming.kafka.KafkaUtils.<K,V,KD,VD,R>createDirectStream(org.apache.spark.streaming.api.java.JavaStreamingContext,java.lang.Class<K>,java.lang.Class<V>,java.lang.Class<KD>,java.lang.Class<VD>,java.lang.Class<R>,java.util.Map<java.lang.String,java.lang.String>,java.util.Map<kafka.common.TopicAndPartition,java.lang.Long>,org.apache.spark.api.java.function.Function<kafka.message.MessageAndMetadata<K,V>,R>) is not applicable
(cannot infer type-variable(s) K,V,KD,VD,R
(actual and formal argument lists differ in length))
method org.apache.spark.streaming.kafka.KafkaUtils.<K,V,KD,VD>createDirectStream(org.apache.spark.streaming.StreamingContext,scala.collection.immutable.Map<java.lang.String,java.lang.String>,scala.collection.immutable.Set<java.lang.String>,scala.reflect.ClassTag<K>,scala.reflect.ClassTag<V>,scala.reflect.ClassTag<KD>,scala.reflect.ClassTag<VD>) is not applicable
(cannot infer type-variable(s) K,V,KD,VD
(argument mismatch; org.apache.spark.streaming.api.java.JavaStreamingContext cannot be converted to org.apache.spark.streaming.StreamingContext))
method org.apache.spark.streaming.kafka.KafkaUtils.<K,V,KD,VD,R>createDirectStream(org.apache.spark.streaming.StreamingContext,scala.collection.immutable.Map<java.lang.String,java.lang.String>,scala.collection.immutable.Map<kafka.common.TopicAndPartition,java.lang.Object>,scala.Function1<kafka.message.MessageAndMetadata<K,V>,R>,scala.reflect.ClassTag<K>,scala.reflect.ClassTag<V>,scala.reflect.ClassTag<KD>,scala.reflect.ClassTag<VD>,scala.reflect.ClassTag<R>) is not applicable
(cannot infer type-variable(s) K,V,KD,VD,R
(actual and formal argument lists differ in length))
Тот же код работает, когда я использую StringDecoder, а не KafkaAvroDecoder, но полученные данные представлены в нечитаемом формате (некоторые сумасшедшие символы). Но я хочу, чтобы данные были декодированы при получении, чтобы я мог понять их. Пожалуйста, предложите правильный способ сделать это.