Укажите префикс для всех столбцов при выборе с помощью имени_структуры. * - PullRequest
1 голос
/ 29 января 2020

Ниже приводится таблица данных с именем temp_table: 'table_name'.
Как бы вы использовали spark. sql (), чтобы дать префикс всем столбцам?

root
 |-- MAIN_COL: struct (nullable = true)
 |    |-- a: string (nullable = true)
 |    |-- b: string (nullable = true)
 |    |-- c: string (nullable = true)
 |    |-- d: string (nullable = true)
 |    |-- f: long (nullable = true)
 |    |-- g: long (nullable = true)
 |    |-- h: long (nullable = true)
 |    |-- j: long (nullable = true)

Приведенный ниже запрос

spark.sql("select MAIN_COL.* from table_name")

возвращает столбцы с именами a, b, c ..., но как заставить их все выглядеть, например, pre_a, pre_b, pre_ c?
Хотите избежать выбирая и давая им псевдоним один за другим. Что если у меня 30 столбцов?

Я надеюсь, что пользовательский UDF сможет решить эту проблему, что используется в SQL, но на самом деле не уверен, как с этим справиться.

 # Generate a pandas DataFrame
import pandas as pd
a_dict={
    'a':[1,2,3,4,5],
    'b':[1,2,3,4,5],
    'c':[1,2,3,4,5],
    'e':list('abcde'),
    'f':list('abcde'),
    'g':list('abcde')
}
pandas_df=pd.DataFrame(a_dict)
# Create a Spark DataFrame from a pandas DataFrame using Arrow
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = spark.createDataFrame(pandas_df)

#struct
from pyspark.sql.functions import struct
main=df.select(struct(df.columns).alias("MAIN_COL"))

Ответы [ 3 ]

2 голосов
/ 29 января 2020

Вот один из способов go через поля и динамическое изменение их имен. Сначала используйте main.schema.fields[0].dataType.fields для доступа к целевым полям. Затем используйте python map, чтобы добавить pre_ к каждому полю:

from pyspark.sql.types import *
from pyspark.sql.functions import col

inner_fields = main.schema.fields[0].dataType.fields

# [StructField(a,LongType,true),
#  StructField(b,LongType,true),
#  StructField(c,LongType,true),
#  StructField(e,StringType,true),
#  StructField(f,StringType,true),
#  StructField(g,StringType,true)]

pre_cols = list(map(lambda sf: StructField(f"pre_{sf.name}", sf.dataType, sf.nullable), inner_fields))

new_schema = StructType(pre_cols)

main.select(col("MAIN_COL").cast(new_schema)).printSchema()

# root
#  |-- MAIN_COL: struct (nullable = false)
#  |    |-- pre_a: long (nullable = true)
#  |    |-- pre_b: long (nullable = true)
#  |    |-- pre_c: long (nullable = true)
#  |    |-- pre_e: string (nullable = true)
#  |    |-- pre_f: string (nullable = true)
#  |    |-- pre_g: string (nullable = true)

Наконец, вы можете использовать cast с новой схемой, как @ Mahe sh, уже упомянутой.

1 голос
/ 29 января 2020

Красота Spark, вы можете программно манипулировать метаданными

Это пример, который продолжает оригинальный фрагмент кода:

main.createOrReplaceTempView("table_name")

new_cols_select = ", ".join(["MAIN_COL." + col + " as pre_" + col for col in spark.sql("select MAIN_COL.* from table_name").columns])

new_df = spark.sql(f"select {new_cols_select} from table_name")

Из-за лени Spark и потому, что все манипуляции являются только метаданными этот код практически не снижает производительность и будет работать одинаково для 10 или 500 столбцов (на самом деле мы делаем что-то похожее на 1 тыс. столбцов).

Также возможно получить исходные имена столбцов в более элегантный способ с df.schema объектом

1 голос
/ 29 января 2020

вы можете попробовать это: добавьте все столбцы в соответствии с требованиями к schema2

val schema2 = new StructType()
    .add("pre_a",StringType)
    .add("pre_b",StringType)
    .add("pre_c",StringType) 

Теперь выберите столбец, используя:

df.select(col("MAIN_COL").cast(schema2)).show()

, он даст вам все обновленные имена столбцов .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...