У меня есть искорное задание, которое делает правильное объединение на основе двух таблиц, чтение и объединение происходит довольно быстро, но при попытке вставить результаты объединения в cassandra db это происходит очень медленно.Для вставки 1000 строк требуется более 30 минут, для записи 9 строк - 3 минуты.Пожалуйста, смотрите мою конфигурацию ниже.у нас есть 3 кассандры и искровой узел, и искра установлена для всех узлов.Я довольно новичок в Spark и не могу понять, что не так.я могу вставить данные того же размера с помощью драйвера dse менее чем за 1 секунду (более 2000 строк).Я ценю ваше время и помощь !!
Отправка Spark:
"dse -u " + username + " -p " + password + " spark-submit --class com.SparkJoin --executor-memory=20G " +
"SparkJoinJob-1.0-SNAPSHOT.jar " + filterMap.toString() + "
Версия Spark Core: 2.7.2
spark-cassandra-connector_2.11: 2.3.1
spark-sql_2.11: 2.3.1
Spark Conf
SparkConf conf = new SparkConf(true).setAppName("Appname");
conf.set("spark.cassandra.connection.host", host);
conf.set("spark.cassandra.auth.username", username);
conf.set("spark.cassandra.auth.password", password);
conf.set("spark.network.timeout", "600s");
conf.set("spark.cassandra.connection.keep_alive_ms", "25000");
conf.set("spark.cassandra.connection.timeout_ms", "5000000");
conf.set("spark.sql.broadcastTimeout", "5000000");
SparkContext sc = new SparkContext(conf);
SparkSession sparkSession = SparkSession.builder().sparkContext(sc).getOrCreate();
SQLContext sqlContext = sparkSession.sqlContext();
sqlContext.setConf("spark.cassandra.connection.host", host);
sqlContext.setConf("spark.cassandra.auth.username", username);
sqlContext.setConf("spark.cassandra.auth.password", password);
sqlContext.setConf("spark.network.timeout", "600s");
sqlContext.setConf("spark.cassandra.connection.keep_alive_ms", "2500000");
sqlContext.setConf("spark.cassandra.connection.timeout_ms", "5000000");
sqlContext.setConf("spark.sql.broadcastTimeout", "5000000");
sqlContext.setConf("spark.executor.heartbeatInterval", "5000000");
sqlContext.setConf("spark.sql.crossJoin.enabled", "true");
Выбор таблицы слева и справа;
Dataset<Row> resultsFrame = sqlContext.sql("select * from table where conditions");
return resultsFrame.map((MapFunction<Row, JavaObject>) row -> {
// some operations here
return obj;
}, Encoders.bean(JavaObject.class)
);
Регистрация
Dataset<Row> result = RigtTableJavaRDD.join(LeftTableJavaRDD,
(LeftTableJavaRDD.col("col1").minus(RigtTableJavaRDD.col("col2"))).
between(new BigDecimal("0").subtract(twoHundredMilliseconds), new BigDecimal("0").add(twoHundredMilliseconds))
.and(LeftTableJavaRDD.col("col5").equalTo(RigtTableJavaRDD.col("col6")))
, "right");
Вставить результат
CassandraJavaUtil.javaFunctions(resultRDD.javaRDD()).
writerBuilder("keyspace", "table", CassandraJavaUtil.mapToRow(JavaObject.class)).
saveToCassandra();