Я пытаюсь получить все документы из индекса в Elasticsearch с помощью SparkSQL и отправить их в другой индекс с помощью библиотеки es-hadoop. В будущем он будет использоваться для промежуточной обработки, поэтому мне нужен SparkSQL для обработки данных.
Я разработал простой класс в Java, который должен выполнять эту операцию (если я правильно понимаю)
Logger.getLogger("org.apache").setLevel(Level.WARN);
System.setProperty("hadoop.home.dir", "/opt/test");
SparkConf conf = new SparkConf()
.setAppName("es-test")
.setMaster("local[10]");
SparkSession sparkSession = SparkSession.builder()
.config(conf)
.getOrCreate();
Map<String, String> sourceTaskConfig = new HashMap<>();
sourceTaskConfig.put("es.nodes", "localhost");
sourceTaskConfig.put("es.port.size", "9200");
sourceTaskConfig.put("es.resource", "index1");
Dataset<Row> dataframe = JavaEsSparkSQL.esDF(new SQLContext(sparkSession), sourceTaskConfig);
//Here will be data processing
Map<String, String> destinationTaskConfig = new HashMap<>();
destinationTaskConfig.put("es.nodes", "localhost");
destinationTaskConfig.put("es.port.size", "9200");
JavaEsSparkSQL.saveToEs(dataframe, "index2", destinationTaskConfig);
Я ожидаю, что все мои данные из index1 будут перемещены в index2, но на самом деле я получаю только 10000 документов и задание прекращаю. Я уже пытаюсь играть с ограничениями размера в конфигурации. Я буду более чем счастлив, если кто-то может объяснить мне, что я делаю неправильно. Большое спасибо