Альтернатива для spark groupBy для нечисловой операции агрегирования - PullRequest
0 голосов
/ 04 октября 2019

У меня есть Dataframe со строковым ключом и строковым значением JSON. Идея состоит в том, чтобы применить некоторую бизнес-логику для анализа JSON на основе ключей. Для каждого ключа может быть несколько JSON. Поэтому я решил сделать groupBy(col("key")).agg(collect_list("value")), но это то, чего я хотел бы избежать, поскольку я не хочу перетасовывать данные из нескольких узлов, поскольку логика синтаксического анализа не зависит от других JSON с тем же ключом.

Я хотел бы знать, имел ли кто-либо подобный вариант использования и имел ли лучший способ сделать это.

valueStream.foreachRDD(rawRDD -> {
    if (!rawRDD.isEmpty()) {
        SparkSession spark = JavaSparkSessionSingleton.getInstance(rawRDD.context().getConf());
        JavaPairRDD<String,String> flattenedRawRDD = rawRDD.mapToPair(record -> {
            ObjectMapper om = new ObjectMapper();
            JsonNode root = om.readTree(record);
            Map<String, JsonNode> flattenedMap = new FlatJsonGenerator(root).flatten();
            JsonNode flattenedRootNode = om.convertValue(flattenedMap, JsonNode.class);
            return new Tuple2<String,String>(flattenedRootNode.get("/name").asText(),flattenedRootNode.toString());
        });

        Dataset<Row> rawFlattenedDataRDD = spark.createDataset(flattenedRawRDD.rdd(), Encoders.tuple(Encoders.STRING(),Encoders.STRING())).toDF(“key”,”value”);

        Dataset<Row> groupedDS = rawFlattenedDataRDD.groupBy(col(“key”)).agg(collect_list(“value”));

        groupedDS.explain();

    }
});

== Физический план ==

ObjectHashAggregate (keys= [ключ # 6], функции = [collect_list (значение # 7, 0, 0)]) + - разделение хэшей Exchange (ключ # 6, 200) + - ObjectHashAggregate (keys = [ключ # 6], functions = [частичный_коллект_ список ((значение # 7, 0, 0)]) + - * (1) Project [значение # 3 AS-ключ № 6, значение # 4 AS-значение № 7] + - * (1) SerializeFromObject [staticinvoke (класс org.apache.spark).unsafe.types.UTF8String, StringType, fromString, input [0, scala.Tuple2, true] ._ 1, true, false) Значение AS # 3, staticinvoke (класс org.apache.spark.unsafe.types.UTF8String, StringType,fromString, введите [0, scala.Tuple2, true] ._ 2, true, false) Значение AS # 4] + - Сканировать ExternalRDDScan [obj # 2]

...