SPARK Суммирование значений и подсчет различных значений Java - PullRequest
0 голосов
/ 08 мая 2020

Мой набор данных выглядит следующим образом:

a,b,c,d
---------
1,2005,A,2
1,2005,A,3
1,2005,B,4
2,2005,A,4

Вывод должен быть сгруппирован по полям a, b, а затем суммировать d значения и подсчитывать различные c значения. Таким образом, результат должен быть:

1,2005,2,9
2,2005,1,4

EDIT

Мой код выглядит так:

    JavaRDD<String> csv = spark.read().texfile("path.csv").javaRDD();
    JavaRDD<String[]> rdd = csv.map(s -> s.split(","))   
    JavaPairRDD<String , Tuple2<Long, String>> tuple = rdd.mapToPair(x -> new Tuple2<>(x[0]+","+ x[1], new Tuple2<>(x[2], x[3])));
    JavaPairRDD<String , Tuple2<Long, String>> tuple2 = tuple.reduceByKey((x,y) -> x._2()+y._2());

Но я не знаю, как подсчитать различные c значений.

1 Ответ

0 голосов
/ 08 мая 2020

Задача ниже загружает CSV, аналогичный введенному вами

a,b,c,d
1,2005,A,2
1,2005,A,3
1,2005,B,4
2,2005,A,4

, и выполняет необходимые операции сопоставления и сокращения,

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import scala.Tuple4;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class SimpleDataframe {

    public static void main(String[] args) {
        SparkSession spark = Constant.getSparkSess();


        JavaRDD<Row> rdd = spark.read().option("header", "true").csv("src/main/resources/simple.csv").rdd().toJavaRDD();

        List<Tuple4<String, String, Integer, Integer>> output =
                rdd.mapToPair(   // Map input to key(String,String) and value (Collection,Num)
                (PairFunction<Row, Tuple2<String, String>, Tuple2<Set<String>, Integer>>) row -> {
                    Tuple2<String, String> tup1 = new Tuple2<>(row.getString(0), row.getString(1));
                    Set<String> set = new HashSet<>();
                    set.add(row.getString(2));
                    Tuple2<Set<String>, Integer> tup2 = new Tuple2<>(set, Integer.parseInt(row.getString(3)));
                    return new Tuple2<>(tup1, tup2);
                }
        ).reduceByKey((Function2<Tuple2<Set<String>, Integer>,   // Combine out by key to single tuple per unique tuple1
                        Tuple2<Set<String>, Integer>, Tuple2<Set<String>, Integer>>) (v1, v2) -> {
            Set<String> set = new HashSet<>();
            set.addAll(v1._1);
            set.addAll(v2._1);
            int num = v1._2 + v2._2;
            return new Tuple2<>(set, num);
        }) //// Simplest operation Mapping the combined result to required output
                        .map(tuple -> new Tuple4<>(tuple._1._1, tuple._1._2, tuple._2._1.size(), tuple._2._2))
                .collect();

        System.out.println(output);

    }
}


...