Использование Apache Spark в плохих системах с кассандрой и Java - PullRequest
0 голосов
/ 02 ноября 2018

Я хочу использовать Apache Spark на моем кластере, который состоит из 5 плохих систем. Сначала я реализовал cassandra 3.11.3 на своих узлах, и все мои узлы в порядке.

После этого я вставил 100k записей в мои узлы с API Java, не используя Spark, и все тоже в порядке.

Теперь я хочу выполнить простой запрос, подобный следующему:

select * from myKeySpace.myTbl where field1='someValue';

Поскольку мои узлы слабые в аппаратном обеспечении, я хочу получить от myTbl всего несколько записей, например:

select * from myKeySpace.myTbl where field1='someValue' limit 20;

Я проверил это (A), но это очень медленно (и я не знаю причину):

Dataset<Row> df1 = sparkSession.sql("select * from myKeySpace.myTbl where field1='someValue' limit 20");

а также (B) я думаю, что Spark извлекает все данные и затем использует функцию предела, которая не является моей целью:

Dataset<Row> df1 = sparkSession.sql("select * from myKeySpace.myTbl where field1='someValue'").limit(20);

Думаю, я тоже могу использовать Spark core (C). Также я знаю, что метод под названием perPartitionLimit реализован в Кассандре 3.6 и выше (D).

Как вы знаете, поскольку у меня слабые узлы, я не хочу получать все записи из таблицы Кассандры, а затем использовать функцию предела или что-то в этом роде. Я хочу получить небольшое количество записей из моей таблицы, чтобы мои узлы могли с этим справиться.

Так что же является лучшим решением?

Обновление:

Я сделал предложение, данное @AKSW в комментарии:

SparkConf conf = new SparkConf()
                .setAppName("SparkTest")
                .set("spark.cassandra.connection.host","192.168.107.100");
long limit=20;
JavaSparkContext jsc = new JavaSparkContext(conf);

CassandraJavaRDD<CassandraRow> rdd1 = javaFunctions(jsc)
                    .cassandraTable("myKeySpace", "myTbl")
                    .select("id").perPartitionLimit(limit);

System.out.println("Count: " + rdd1.count()); //output is "Count: 100000" which is wrong!
jsc.stop();

но perPartitionLimit(limit), что limit=20 не работает и все записи извлекаются!

...