Динамический цикл набора данных для всех имен столбцов - PullRequest
0 голосов
/ 05 апреля 2019

Я работаю над проектом, в котором у меня около 500 имен столбцов, но мне нужно применить функцию coalesce к каждому имени таблицы.

df1 схема

-id
-col1
...
-col500

df2 схема

-id
-col1
...
-col500
Dataset<Row> newDS=  df1.join(df2, "id")
.select(
                df1.col("id"),
                functions.coalesce(df1.col("col1"),df2.col("col1")).as("col1"), 
                functions.coalesce(df1.col("col2"),df2.col("col2")).as("col2"),
...
functions.coalesce(df1.col("col500"),df2.col("col500")).as("col500"),
                )

        .show();

Что я пробовал

 Dataset<Row> j1 =  df1.join(df2, "id");
Dataset<Row> gh1 = spark.emptyDataFrame();


    String[] f =  df1.columns();
     for(String h : f)
     {
         if(h == "id")
             gh1 = j1.select(df1.col("id"));
        else{
            gh1 = j1.select(functions.coalesce(df1.col(h),df2.col(h)).as(h));

        }


     }

     gh1.show();

Ответы [ 3 ]

0 голосов
/ 05 апреля 2019

В Java вы можете передавать массив значений в методы, ожидающие переменного числа аргументов, поэтому вы можете переписать свой код следующим образом:

Column[] coalescedColumns = Stream.of(df1.columns())
             .map(name -> functions.coalesce(df1.col(name),df2.col(name)).as(name))
             .toArray(Column[]::new);

Dataset<Row> newDS = df1.join(df2, "id").select(coalescedColumns)

Я не исключал столбец id, так какcoalesce также будет работать, как и ожидалось, в этом столбце

0 голосов
/ 08 апреля 2019

df1.columns вернет String Array, поэтому не может вызывать потоки на нем, относится .

Column[] coalescedColumns = 
                Stream.of(df1.columns())
               .map(name -> functions.coalesce(df1.col(name),df2.col(name)).as(name))
                 .toArray(Column[]::new);

        Dataset<Row> newDS = df1.as("a").join(df2.as("b")).where("a.id == b.id").select(coalescedColumns);

0 голосов
/ 05 апреля 2019

Если я правильно понимаю, у вас есть два кадра данных с одной и той же схемой, и вы хотите объединить их 500 столбцов 2 на 2 без необходимости писать все.

Этого можно легко достичь, предоставив последовательность столбцовдо select.Кроме того, поскольку select не принимает последовательности столбцов, а, скорее, переменное число аргументов столбцов, вам необходимо добавить : _*, чтобы сообщить scala, что ему нужно обрабатывать все элементы последовательности как отдельные аргументы.

val cols = df1.columns.filter(_ != "id")
df1
    .join(df2, "id")
    .select(col("id") +: cols.map(n => coalesce(df1.col(n), df2.col(n)) as n) : _* )
...