Как запустить несколько искр Cassandra Query - PullRequest
1 голос
/ 03 июля 2019

Мне нужно выполнить такую ​​задачу ниже.Почему-то мне не хватает точки.Я знаю, что не могу использовать javasparkcontext, как это, и передавать javafunctions, так как существует проблема с сериализацией.

Мне нужно выполнить несколько запросов cassandra с размером cartesian.size ().Есть какой-нибудь совет?

JavaSparkContext jsc = new JavaSparkContext(conf);
    JavaRDD<DateTime> dateTimeJavaRDD = jsc.parallelize(dateTimes); //List<DateTime>
    JavaRDD<Integer> virtualPartitionJavaRDD = jsc.parallelize(virtualPartitions); //List<Integer>
    JavaPairRDD<DateTime, Integer> cartesian = dateTimeJavaRDD.cartesian(virtualPartitionJavaRDD);

    long c = cartesian.map(new Function<Tuple2<DateTime, Integer>, Long>() {
        @Override
        public Long call(Tuple2<DateTime, Integer> tuple2) throws Exception {
            return javaFunctions(jsc).cassandraTable("keyspace", "table").where("p1 = ? and  p2 = ?", tuple2._1(), tuple2._2()).count();
        }
    }).reduce((a,b) -> a + b);


    System.out.println("TOTAL ROW COUNT IS: " + c);

1 Ответ

1 голос
/ 03 июля 2019

Правильное решение должно заключаться в выполнении соединения между вашими данными и таблицей Casasndra. Существует функция joinWithCassandraTable , которая делает то, что вам нужно - вы просто генерируете RDD Tuple2, который содержит значения для p1 & p2, а затем вызываете таблицу joinWithCassandra, что-то вроде этого (не проверено, взято из моего примера здесь ):

JavaRDD<Tuple2<Integer, Integer>> trdd = cartesian.map(new Function<Tuple2<DateTime, Integer>, Tuple2<Integer, Integer>>() {
        @Override
        public Tuple2<Integer, Integer> call(Tuple2<DateTime, Integer> tuple2) throws Exception {
            return new Tuple2<Integer, Integer>(tuple2._1(), tuple2._2());
        }
    });
CassandraJavaPairRDD<Tuple2<Integer, Integer>, Tuple2<Integer, String>> joinedRDD =
     trdd.joinWithCassandraTable("test", "jtest",
     someColumns("p1", "p2"), someColumns("p1", "p2"),
     mapRowToTuple(Integer.class, String.class), mapTupleToRow(Integer.class));
// perform counting here...

...