Как динамически заполнить предложение select из dataframe? давая AnalysisException - PullRequest
0 голосов
/ 09 января 2020

Я использую spark- sql 2.4.1 и java 8.

 val country_df = Seq(
    ("us",2001),
    ("fr",2002),
    ("jp",2002),
    ("in",2001),
    ("fr",2003),
    ("jp",2002),
    ("in",2003)
    ).toDF("country","data_yr")

> val col_df = country_df.select("country").where($"data_yr" === 2001)

    val data_df = Seq(
    ("us_state_1","fr_state_1" ,"in_state_1","jp_state_1"),
    ("us_state_2","fr_state_2" ,"in_state_2","jp_state_1"),
    ("us_state_3","fr_state_3" ,"in_state_3","jp_state_1")
    ).toDF("us","fr","in","jp")

> data_df.select("us","in").show()

как динамически заполнить это предложение select (of data_df) из country_df для данного года?

т.е. с первого кадра данных я получу значения столбца, это столбцы, которые мне нужно выбрать из второго источника данных. Как это можно сделать?

Попробовал это:

List<String> aa = col_df.select(functions.lower(col("data_item_code"))).map(row -> row.mkString(" ",", "," "), Encoders.STRING()).collectAsList();
 data_df.select(aa.stream().map(s -> new Column(s)).toArray(Column[]::new));

Ошибка:

.AnalysisException: cannot resolve '` un `' given input columns: [abc,.....all columns ...]

Так что же такое что тут не так, и как это исправить?

Ответы [ 3 ]

1 голос
/ 10 января 2020
scala> val colname = col_df.rdd.collect.toList.map(x => x(0).toString).toSeq

scala> data_df.select(colname.head, colname.tail: _*).show()
+----------+----------+
|        us|        in|
+----------+----------+
|us_state_1|in_state_1|
|us_state_2|in_state_2|
|us_state_3|in_state_3|
+----------+----------+
1 голос
/ 12 января 2020

Используя pivot, вы можете получить значения в виде имен столбцов, например:

val selectCols = col_df.groupBy().pivot($"country").agg(lit(null)).columns
data_df.select(selectCols.head, selectCols.tail: _*)
1 голос
/ 09 января 2020

Вы можете попробовать использовать приведенный ниже код.

Выберите имя столбца из первого набора данных.

List<String> columns = country_df.select("country").where($"data_yr" === 2001).as(Encoders.STRING()).collectAsList();

Используйте имена столбцов в selectexpr во втором наборе данных.

public static Seq<String> convertListToSeq(List<String> inputList) {
        return JavaConverters.asScalaIteratorConverter(inputList.iterator()).asScala().toSeq();
}


//using selectExpr
data_df.selectExpr(convertListToSeq(columns)).show(true);
...