Динамический выбор столбцов с параметром, используя Scala - Spark - PullRequest
0 голосов
/ 08 февраля 2020

Мне нужно динамически выбирать столбцы из одной из двух таблиц, к которым я присоединяюсь. Имя одного из столбцов, которые нужно выбрать, передается в переменную. Вот подробности.

Имена таблиц передаются в переменные. Так же как и join_id и join_type.

//Creating scala variables for each table
var table_name_a = dbutils.widgets.get("table_name_a")
var table_name_b = dbutils.widgets.get("table_name_b")

//Create scala variable for Join Id
var join_id = dbutils.widgets.get("table_name_b") + "Id"

// Define join type
var join_type = dbutils.widgets.get("join_type")

Затем я присоединяюсь к таблицам. Я хочу выбрать все столбцы из таблицы A и только два столбца из таблицы B: один столбец называется «Описание» независимо от того, какая таблица B передана в указанном параметре; второй столбец имеет то же имя таблицы B, например, если имя таблицы B - Employee, я хочу выбрать столбец с именем «Employee» из таблицы B. Приведенный ниже код выбирает все столбцы из таблицы A и столбец Description из таблицы. B (с псевдонимом). Но мне все еще нужно выбрать другой столбец из таблицы B, имя которого совпадает с именем таблицы. Я не знаю заранее, сколько столбцов в таблице B имеет ни общее количество, ни порядок столбцов, ни их имена - поскольку таблица B передается как параметр.

// Joining Tables
var df_joined_tables = df_a
                                     .join(df_b,                                               
                                               df_a(join_id)===df_b(join_id),
                                              join_type
                                          ).select($"df_a.*",$"df_b.Description".alias(table_name_b + " Description"))

Мой вопрос: как мне передать переменную table_name_b в качестве столбца, который я пытаюсь выбрать из таблицы B?

Я попробовал приведенный ниже код, который явно неверен, поскольку в "$" df_b.table_name_b "table_name_b" должен быть содержимым параметра, а не именем самих столбцов.

var df_joined_tables = df_a
                                     .join(df_b,                                               
                                               df_a(join_id)===df_b(join_id),
                                              join_type
                                          ).select($"df_a.*",$"df_b.Description".alias(table_name_b + " Description"),$"df_b.table_name_b")

Затем я попробовал приведенный ниже код, и он выдает ошибку: "значение table_name_b не является членом org. apache. spark. sql .DataFrame "

var df_joined_tables = df_a
                                     .join(df_b,                                               
                                               df_a(join_id)===df_b(join_id),
                                              join_type
                                          ).select($"df_a.*",$"df_b.Description".alias(table_name_b + " Description"),df_b.table_name_b)

Как передать переменную table_name_b в качестве столбца, который нужно выбрать из таблицы B?

1 Ответ

1 голос
/ 08 февраля 2020

Вы можете построить List[org.apache.spark.sql.Column] и использовать его в своей функции select, как показано в следующем примере:

// sample input:
val df = Seq(
  ("A", 1, 6, 7),
  ("B", 2, 7, 6),
  ("C", 3, 8, 5),
  ("D", 4, 9, 4),
  ("E", 5, 8, 3)
).toDF("name", "col1", "col2", "col3")

df.printSchema()
val columnNames = List("col1", "col2") // string column names from your params
val columnsToSelect = columnNames.map(col(_)) // convert the required column names from string to column type
df.select(columnsToSelect: _*).show() // using the list of columns

// output:
+----+----+
|col1|col2|
+----+----+
|   1|   6|
|   2|   7|
|   3|   8|
|   4|   9|
|   5|   8|
+----+----+

Аналогичным образом может применяться для join '*

Обновление

Добавление другого примера:

val aliasTableA = "tableA"
val aliasTableB = "tableB"
val joinField = "name"

val df1 = Seq(
  ("A", 1, 6, 7),
  ("B", 2, 7, 6),
  ("C", 3, 8, 5),
  ("D", 4, 9, 4),
  ("E", 5, 8, 3)
).toDF("name", "col1", "col2", "col3")

val df2 = Seq(
  ("A", 11, 61, 71),
  ("B", 21, 71, 61),
  ("C", 31, 81, 51)
).toDF("name", "col_1", "col_2", "col_3")


df1.alias(aliasTableA)
  .join(df2.alias(aliasTableB), Seq(joinField))
  .selectExpr(s"${aliasTableA}.*", s"${aliasTableB}.col_1", s"${aliasTableB}.col_2").show()

// output:
+----+----+----+----+-----+-----+
|name|col1|col2|col3|col_1|col_2|
+----+----+----+----+-----+-----+
|   A|   1|   6|   7|   11|   61|
|   B|   2|   7|   6|   21|   71|
|   C|   3|   8|   5|   31|   81|
+----+----+----+----+-----+-----+
...