Я пытаюсь сохранить данные из kafka в hdfs, используя потоковую передачу в java.Это мой кусок кода.
JavaInputDStream<ConsumerRecord<String, String>> directKafkaStream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
directKafkaStream.foreachRDD(rdd -> {
rdd.saveAsTextFile("hdfs://.../sampleTest.txt");
rdd.foreach(record -> {
System.out.println("Got the record : ");
});
});
ssc.start();
ssc.awaitTermination();
Это зависимости библиотеки sbt, которые я использую:
"org.apache.kafka" % "kafka-clients" % "0.8.2.0",
"org.apache.spark" %% "spark-streaming" % "2.2.0",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.0.0",
В потребительских стратегиях я подписываюсь на список тем и конфигурации kafka.Но когда я отправляю данные с использованием kafka, в hdfs не создается файл.Также, когда я запускаю jar-файл, он показывает, что sparkstreamingcontext запущен, но после этого не выводится подтверждающее сообщение.Я что-то здесь упускаю или это проблема несовпадения зависимостей Кафки?