Pyspark SQL: как создать новое значение из значения столбца плюс имя другого столбца? - PullRequest
0 голосов
/ 16 октября 2019

Я пытаюсь получить новое значение, которое приходит из значения столбца плюс имя другого столбца.

Например, учитывая это:

+----+---+----+----+
|base|  1|   2|   3|
+----+---+----+----+
|  10| AA|  aa|  Aa|
|  20| BB|  bb|  Bb|
|  30| CC|  cc|  Cc|
+----+---+----+----+

Я бы хотелполучить это:


          +---------+----+
          | new_base|   v| 
          +---------+----+
10 + 1 -> |       11|  AA|
10 + 2 -> |       12|  aa|
10 + 3 -> |       13|  Aa|
20 + 1 -> |       21|  BB| 
20 + 2 -> |       22|  bb|
20 + 3 -> |       23|  Bb| 
30 + 1 -> |       31|  CC|
30 + 2 -> |       32|  cc|
30 + 3 -> |       33|  Cc|
          +---------+----+

ПРИМЕЧАНИЕ: я пишу в Spark 2.4

Ответы [ 2 ]

0 голосов
/ 16 октября 2019

Еще один типичный случай использования уменьшение функция:

from functools import reduce
from pyspark.sql.functions import col

cols = df.columns[1:]

df_new = reduce(lambda d1,d2: d1.union(d2),
    [ df.select((col('base') + int(c)).astype('int').alias('new_base'), col(c).alias('v')) for c in cols ]
)

df_new.show()
+--------+---+
|new_base|  v|
+--------+---+
|      11| AA|
|      21| BB|
|      31| CC|
|      12| aa|
|      22| bb|
|      32| cc|
|      13| Aa|
|      23| Bb|
|      33| Cc|
+--------+---+
0 голосов
/ 16 октября 2019

Мы можем использовать функцию explode для ее решения.

# Importing requisite functions.
from pyspark.sql.functions import array, col, explode, struct, lit

# Creating the DataFrame
df = sqlContext.createDataFrame([(10,'AA','aa','Aa'),(20,'BB','bb','Bb'),(30,'CC','cc','Cc')],['base','1','2','3'])
df.show()
+----+---+---+---+
|base|  1|  2|  3|
+----+---+---+---+
|  10| AA| aa| Aa|
|  20| BB| bb| Bb|
|  30| CC| cc| Cc|
+----+---+---+---+

Написание функции для взрыва DataFrame.

def to_explode(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

Применениефункция ниже. Поскольку созданный столбец new_base имеет decimal, так как по умолчанию он имеет тип double, поэтому мы явно преобразуем его в integer, чтобы избежать суффикса к каждому номеру .0

df = to_explode(df, ['base'])
df = df.withColumn('new_base',col('base')+col('key'))\
       .select(col('new_base').cast(IntegerType()),'val')
df.show()
+--------+---+
|new_base|val|
+--------+---+
|      11| AA|
|      12| aa|
|      13| Aa|
|      21| BB|
|      22| bb|
|      23| Bb|
|      31| CC|
|      32| cc|
|      33| Cc|
+--------+---+
df.printSchema()
root
 |-- new_base: integer (nullable = true)
 |-- val: string (nullable = true)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...