Я выполнял простую потоковую передачу Kafka-Spark с использованием Direct Stream, как это было сделано в https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html, и я компилирую в виде единого Java-файла (без Maven) и позаботился обо всех зависимостях отдельно.При компиляции так:
javac -cp "/opt/spark-2.4.3-bin-hadoop2.7/jars/*:/opt/kafka_2.11-2.2.0/libs/*" SparkStreamConsumer.java
,
появляется ошибка:
SparkStreamConsumer.java:33: error: incompatible types: no instance(s) of type variable(s) K,V exist so that InputDStream<ConsumerRecord<K,V>> conforms to JavaInputDStream<ConsumerRecord<String,String>>
KafkaUtils.createDirectStream(
^
where K,V are type-variables:
K extends Object declared in method <K,V>createDirectStream(StreamingContext,LocationStrategy,ConsumerStrategy<K,V>)
V extends Object declared in method <K,V>createDirectStream(StreamingContext,LocationStrategy,ConsumerStrategy<K,V>)
1 error
Пожалуйста, помогите!
Более того, кроме файлов, присутствующихв /opt/spark-2.4.3-bin-hadoop2.7/jars/
Я также добавил: spark-streaming-kafka-0-10_2.11-2.4.3.jar
скачано с https://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-kafka-0-10_2.11/2.4.3/spark-streaming-kafka-0-10_2.11-2.4.3.jar
Что я сделал:
-> Настройка Zookeeper (3.4.14)
-> Настройка Kafka (kafka_2.11-2.2.0)
-> Настройка Spark (2.4.3)
-> Сделана тема кафки "mytopic"
->Протестировано с использованием производителя и потребителя консоли, работает нормально.
Теперь я хотел заставить Spark потреблять, но ошибка не позволяет мне!
Код:
//SparkStreamConsumer.java
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
import scala.Tuple2;
public class SparkStreamConsumer{
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiverInJava");
StreamingContext ssc = new StreamingContext(conf, Durations.seconds(1));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:2181");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "spark-streaming-consumer-group");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
Collection<String> topics = Arrays.asList("mytopic");
//The problematic line:
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
kafkaStream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
kafkaStream.print();
ssc.start();
ssc.awaitTermination();
}
}
Спасибо!