Как скопировать данные из Кассандры в эластичный поиск 6.7 - PullRequest
0 голосов
/ 16 мая 2019

Мне нужно скопировать данные с Кассандры в Elastic search.В таблице Кассандры есть почти 5 ТБ данных.Я использую соединитель datastax "spark-cassandra-connector_2.11" для подключения к Cassandra.И пытается использовать Elastic Search Spark API.Но из-за огромных данных индексирование не начинается.Пожалуйста, исправьте меня, если ошибаетесь.

JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
CassandraJavaRDD<CassandraRow> cassandraJavaRDD = CassandraJavaUtil.javaFunctions(sparkContext).cassandraTable(keyspace,table);
JavaRDD<String> dataRDD = cassandraJavaRDD.select("value").map(new Function<CassandraRow, String>() {
    @Override
    public String call(CassandraRow cassandraRow) throws Exception {
        System.out.println("cassandraRow");
        return cassandraRow.getString("value");
    }
});
List<String> dataList = dataRDD.collect();

JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkContext, Seconds.apply(1));
Queue<JavaRDD<String>> microbatches = new LinkedList<JavaRDD<String>>();
microbatches.add(dataRDD);
JavaDStream<String> javaDStream = javaStreamingContext.queueStream(microbatches);
JavaEsSparkStreaming.saveJsonToEs(javaDStream, "index_spark/productDoc");
javaStreamingContext.start();

Есть ли другой способ сделать параллельный поток чтения и записи?

...