У меня есть Dataframe
со строковым ключом и строковым значением JSON. Идея состоит в том, чтобы применить некоторую бизнес-логику для анализа JSON на основе ключей. Для каждого ключа может быть несколько JSON. Поэтому я решил сделать groupBy(col("key")).agg(collect_list("value"))
, но это то, чего я хотел бы избежать, поскольку я не хочу перетасовывать данные из нескольких узлов, поскольку логика синтаксического анализа не зависит от других JSON с тем же ключом.
Я хотел бы знать, имел ли кто-либо подобный вариант использования и имел ли лучший способ сделать это.
valueStream.foreachRDD(rawRDD -> {
if (!rawRDD.isEmpty()) {
SparkSession spark = JavaSparkSessionSingleton.getInstance(rawRDD.context().getConf());
JavaPairRDD<String,String> flattenedRawRDD = rawRDD.mapToPair(record -> {
ObjectMapper om = new ObjectMapper();
JsonNode root = om.readTree(record);
Map<String, JsonNode> flattenedMap = new FlatJsonGenerator(root).flatten();
JsonNode flattenedRootNode = om.convertValue(flattenedMap, JsonNode.class);
return new Tuple2<String,String>(flattenedRootNode.get("/name").asText(),flattenedRootNode.toString());
});
Dataset<Row> rawFlattenedDataRDD = spark.createDataset(flattenedRawRDD.rdd(), Encoders.tuple(Encoders.STRING(),Encoders.STRING())).toDF(“key”,”value”);
Dataset<Row> groupedDS = rawFlattenedDataRDD.groupBy(col(“key”)).agg(collect_list(“value”));
groupedDS.explain();
}
});
== Физический план ==
ObjectHashAggregate (keys= [ключ # 6], функции = [collect_list (значение # 7, 0, 0)]) + - разделение хэшей Exchange (ключ # 6, 200) + - ObjectHashAggregate (keys = [ключ # 6], functions = [частичный_коллект_ список ((значение # 7, 0, 0)]) + - * (1) Project [значение # 3 AS-ключ № 6, значение # 4 AS-значение № 7] + - * (1) SerializeFromObject [staticinvoke (класс org.apache.spark).unsafe.types.UTF8String, StringType, fromString, input [0, scala.Tuple2, true] ._ 1, true, false) Значение AS # 3, staticinvoke (класс org.apache.spark.unsafe.types.UTF8String, StringType,fromString, введите [0, scala.Tuple2, true] ._ 2, true, false) Значение AS # 4] + - Сканировать ExternalRDDScan [obj # 2]