Как исправить ошибку потоковой передачи Kafka-Spark при использовании JavaInputDStream для Direct Stream? - PullRequest
0 голосов
/ 21 мая 2019

Я выполнял простую потоковую передачу Kafka-Spark с использованием Direct Stream, как это было сделано в https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html, и я компилирую в виде единого Java-файла (без Maven) и позаботился обо всех зависимостях отдельно.При компиляции так:

javac -cp "/opt/spark-2.4.3-bin-hadoop2.7/jars/*:/opt/kafka_2.11-2.2.0/libs/*" SparkStreamConsumer.java,

появляется ошибка:

SparkStreamConsumer.java:33: error: incompatible types: no instance(s) of type variable(s) K,V exist so that InputDStream<ConsumerRecord<K,V>> conforms to JavaInputDStream<ConsumerRecord<String,String>>
                KafkaUtils.createDirectStream(
                                             ^
  where K,V are type-variables:
    K extends Object declared in method <K,V>createDirectStream(StreamingContext,LocationStrategy,ConsumerStrategy<K,V>)
    V extends Object declared in method <K,V>createDirectStream(StreamingContext,LocationStrategy,ConsumerStrategy<K,V>)
1 error

Пожалуйста, помогите!

Более того, кроме файлов, присутствующихв /opt/spark-2.4.3-bin-hadoop2.7/jars/ Я также добавил: spark-streaming-kafka-0-10_2.11-2.4.3.jar скачано с https://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-kafka-0-10_2.11/2.4.3/spark-streaming-kafka-0-10_2.11-2.4.3.jar

Что я сделал:

-> Настройка Zookeeper (3.4.14)

-> Настройка Kafka (kafka_2.11-2.2.0)

-> Настройка Spark (2.4.3)

-> Сделана тема кафки "mytopic"

->Протестировано с использованием производителя и потребителя консоли, работает нормально.

Теперь я хотел заставить Spark потреблять, но ошибка не позволяет мне!

Код:

//SparkStreamConsumer.java

import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;
import scala.Tuple2;

public class SparkStreamConsumer{
    public static void main(String[] args){

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiverInJava");
        StreamingContext ssc = new StreamingContext(conf, Durations.seconds(1));

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:2181");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "spark-streaming-consumer-group");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", true);

        Collection<String> topics = Arrays.asList("mytopic");


        //The problematic line:
        JavaInputDStream<ConsumerRecord<String, String>> kafkaStream =
                KafkaUtils.createDirectStream(
                        ssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );

        kafkaStream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));

        kafkaStream.print();
        ssc.start();
        ssc.awaitTermination();
    }
}

Спасибо!

...