Функция CassandraJavaUtil.javaFunctions.writerBuilder не работает - PullRequest
0 голосов
/ 26 сентября 2019

Я пытаюсь записать данные потоковой передачи в Cassandra Nosql с помощью класса CassandraJavaUtil.Но некоторые сообщения об ошибках выбрасываются.Сначала ниже приведены коды потокового зажигания.

SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("SimpleExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

Collection<String> topics = Arrays.asList("outputTopic");

Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "spark-demo");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", true);

JavaStreamingContext ssc = new JavaStreamingContext(jsc, new Duration(1000));

JavaInputDStream<ConsumerRecord<String, String>> inputStream = 
  KafkaUtils.createDirectStream(ssc, 
  LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

JavaPairDStream<String, String> processedJavaPairStream = 
  inputStream.mapToPair(record-> new Tuple2<>(record.key(), 
  record.value())).mapValues(message -> message.replace('#', '='));

JavaDStream<Tuple2<String, String>> cassandraStream = 
  processedJavaPairStream.map(tuple2 -> tuple2);

cassandraStream.print(); // (1) It consumes the kafka messages perfectly

cassandraStream.foreachRDD(tuple2 -> 
  CassandraJavaUtil.javaFunctions(jsc).writerBuilder("my_keyspace", 
  "my_table", CassandraJavaUtil.mapToRow(Tuple2.class)).saveToCassandra()); // (2)

ssc.start();
ssc.awaitTermination();
ssc.close();

(1) строка печатает сообщения кафки правильно.Но (2) линия не работает.Он выдает следующие сообщения:

The method writerBuilder(String, String, CassandraJavaUtil.mapToRow(Tuple2.class)) is undefined for the type SparkContextJavaFunctions

Я изменил метод CassandraJavaUtil.mapToRow на CassandraJavaUtil.mapTupleToRow и выполнил, но он также выбрасывает исключения.Я новичок из Кассандры Ява Апи.Я застрял с этим пунктом.Нужен твой ответ.Спасибо.

...