Я пытаюсь записать данные потоковой передачи в Cassandra Nosql с помощью класса CassandraJavaUtil.Но некоторые сообщения об ошибках выбрасываются.Сначала ниже приведены коды потокового зажигания.
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("SimpleExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
Collection<String> topics = Arrays.asList("outputTopic");
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "spark-demo");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", true);
JavaStreamingContext ssc = new JavaStreamingContext(jsc, new Duration(1000));
JavaInputDStream<ConsumerRecord<String, String>> inputStream =
KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaPairDStream<String, String> processedJavaPairStream =
inputStream.mapToPair(record-> new Tuple2<>(record.key(),
record.value())).mapValues(message -> message.replace('#', '='));
JavaDStream<Tuple2<String, String>> cassandraStream =
processedJavaPairStream.map(tuple2 -> tuple2);
cassandraStream.print(); // (1) It consumes the kafka messages perfectly
cassandraStream.foreachRDD(tuple2 ->
CassandraJavaUtil.javaFunctions(jsc).writerBuilder("my_keyspace",
"my_table", CassandraJavaUtil.mapToRow(Tuple2.class)).saveToCassandra()); // (2)
ssc.start();
ssc.awaitTermination();
ssc.close();
(1) строка печатает сообщения кафки правильно.Но (2) линия не работает.Он выдает следующие сообщения:
The method writerBuilder(String, String, CassandraJavaUtil.mapToRow(Tuple2.class)) is undefined for the type SparkContextJavaFunctions
Я изменил метод CassandraJavaUtil.mapToRow
на CassandraJavaUtil.mapTupleToRow
и выполнил, но он также выбрасывает исключения.Я новичок из Кассандры Ява Апи.Я застрял с этим пунктом.Нужен твой ответ.Спасибо.