Мне нужно скопировать данные с Кассандры в Elastic search.В таблице Кассандры есть почти 5 ТБ данных.Я использую соединитель datastax "spark-cassandra-connector_2.11" для подключения к Cassandra.И пытается использовать Elastic Search Spark API.Но из-за огромных данных индексирование не начинается.Пожалуйста, исправьте меня, если ошибаетесь.
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
CassandraJavaRDD<CassandraRow> cassandraJavaRDD = CassandraJavaUtil.javaFunctions(sparkContext).cassandraTable(keyspace,table);
JavaRDD<String> dataRDD = cassandraJavaRDD.select("value").map(new Function<CassandraRow, String>() {
@Override
public String call(CassandraRow cassandraRow) throws Exception {
System.out.println("cassandraRow");
return cassandraRow.getString("value");
}
});
List<String> dataList = dataRDD.collect();
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkContext, Seconds.apply(1));
Queue<JavaRDD<String>> microbatches = new LinkedList<JavaRDD<String>>();
microbatches.add(dataRDD);
JavaDStream<String> javaDStream = javaStreamingContext.queueStream(microbatches);
JavaEsSparkStreaming.saveJsonToEs(javaDStream, "index_spark/productDoc");
javaStreamingContext.start();
Есть ли другой способ сделать параллельный поток чтения и записи?