Я прошел тест spark2.3
с kafka1.1.0
, используя банки
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
хорошо работает.
пример кода:
SparkConf conf = new SparkConf().setAppName("stream test").setMaster("local[*]");
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(2));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "master:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("enable.auto.commit", false);
List<String> topics = Arrays.asList("A29");
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
JavaDStream<String> lines = stream.map(ConsumerRecord::value);
lines.print(30);
streamingContext.start();
streamingContext.awaitTermination();