Как я могу взять фрейм данных, содержащий списки строк, и создать еще один фрейм данных из этих списков в Pyspark? - PullRequest
0 голосов
/ 30 сентября 2019

Предположим, у меня есть фрейм данных, который выглядит следующим образом

+--------------------+
|        ColA        |
+--------------------+
| [val1, val2, val3] |
+--------------------+
| [val4, val5, val6] |
+--------------------+
| [val7, val8, val9] |
+--------------------+

Как я могу создать новый фрейм данных, который будет выглядеть следующим образом?

+------+------+------+
| Col1 | Col2 | Col3 |
+------+------+------+
| val1 | val2 | val3 |
+------+------+------+
| val4 | val5 | val6 |
+------+------+------+
| val7 | val8 | val9 |
+------+------+------+

Ответы [ 2 ]

1 голос
/ 30 сентября 2019

Этот код достаточно надежен, чтобы принимать любое количество элементов в массивах. Хотя OP имеет 3 элемента в каждом массиве. Мы начинаем с создания указанного DataFrame.

# Loading requisite packages.
from pyspark.sql.functions import col, explode, first, udf
df = sqlContext.createDataFrame([(['val1', 'val2', 'val3'],),
                                 (['val4', 'val5', 'val6'],),
                                 (['val7', 'val8', 'val9'],)],['ColA',])
df.show()
+------------------+
|              ColA|
+------------------+
|[val1, val2, val3]|
|[val4, val5, val6]|
|[val7, val8, val9]|
+------------------+

Поскольку мы хотим, чтобы каждый элемент отдельного массива был помечен как соответствующий столбец, поэтому в качестве первого шага мы попытаемся сопоставить имя столбца. и значение. Для этого мы создаем user defined function - (UDF) .

def func(c):
    return [['Col'+str(i+1),c[i]] for i in range(len(c))]
func_udf = udf(func,ArrayType(StructType([
      StructField('a', StringType()),
      StructField('b', StringType())
  ])))
df = df.withColumn('ColA_new',func_udf(col('ColA')))
df.show(truncate=False)
+------------------+---------------------------------------+
|ColA              |ColA_new                               |
+------------------+---------------------------------------+
|[val1, val2, val3]|[[Col1,val1], [Col2,val2], [Col3,val3]]|
|[val4, val5, val6]|[[Col1,val4], [Col2,val5], [Col3,val6]]|
|[val7, val8, val9]|[[Col1,val7], [Col2,val8], [Col3,val9]]|
+------------------+---------------------------------------+

Как только это будет сделано, мы explode DataFrame.

# Step 1: Explode the DataFrame
df=df.withColumn('vals', explode('ColA_new')).drop('ColA_new')
df.show()
+------------------+-----------+
|              ColA|       vals|
+------------------+-----------+
|[val1, val2, val3]|[Col1,val1]|
|[val1, val2, val3]|[Col2,val2]|
|[val1, val2, val3]|[Col3,val3]|
|[val4, val5, val6]|[Col1,val4]|
|[val4, val5, val6]|[Col2,val5]|
|[val4, val5, val6]|[Col3,val6]|
|[val7, val8, val9]|[Col1,val7]|
|[val7, val8, val9]|[Col2,val8]|
|[val7, val8, val9]|[Col3,val9]|
+------------------+-----------+

После взрыва мы извлекаем первый и второй элемент, которые были названы a и b соответственно в UDF.

df=df.withColumn('column_name', col('vals').getItem('a'))
df=df.withColumn('value', col('vals').getItem('b')).drop('vals')
df.show()
+------------------+-----------+-----+
|              ColA|column_name|value|
+------------------+-----------+-----+
|[val1, val2, val3]|       Col1| val1|
|[val1, val2, val3]|       Col2| val2|
|[val1, val2, val3]|       Col3| val3|
|[val4, val5, val6]|       Col1| val4|
|[val4, val5, val6]|       Col2| val5|
|[val4, val5, val6]|       Col3| val6|
|[val7, val8, val9]|       Col1| val7|
|[val7, val8, val9]|       Col2| val8|
|[val7, val8, val9]|       Col3| val9|
+------------------+-----------+-----+

В качестве последнего шагаМы pivot возвращаем DataFrame, чтобы получить окончательный DataFrame. Так как при повороте мы делаем aggregation, поэтому мы агрегируем на основе first(), который принимает первый элемент группы.

# Step 2: Pivot it back.
df = df.groupby('ColA').pivot('column_name').agg(first('value')).drop('ColA')
df.show()
+----+----+----+
|Col1|Col2|Col3|
+----+----+----+
|val1|val2|val3|
|val4|val5|val6|
|val7|val8|val9|
+----+----+----+
0 голосов
/ 30 сентября 2019

Вот еще три параметра, использующие либо map через RDD API, либо одно выражение выбора.

Сначала давайте создадим несколько примеров данных и извлечем имена столбцов из произвольной строки в наборе данных. Предварительным условием здесь является то, что все элементы в наборе данных должны иметь одинаковую длину:

from pyspark.sql import Row

df = spark.createDataFrame(
[[["val1", "val2", "val3"]],
[["val4", "val5", "val6"]],
[["val7", "val8", "val9"]]], ["ColA"])

# get the len of the 1st item, the length should be the same for all the items in the dataset
ar_len = len(df.first()["ColA"])

# generate col names
col_names = ["col" + str(i + 1) for i in range(0, ar_len)]

col_names
# ['col1', 'col2', 'col3']

Option1: Map + Row

import pyspark.sql.functions as f

cols = [f.col('ColA').getItem(i).alias(c) for i,c in enumerate(col_names)]

def to_row(l):
  # set the columns of the Row
  r = Row(*cols)

  # set the values of the row that we defined above
  r = r(*l[0])
  return r

df.rdd.map(to_row).toDF().show()

Сначала необходимо объявить список cols, который должен быть того же размера, что и элемент массива. Затем с помощью Row(*cols) создайте нужную схему Row. Наконец, с помощью r(*l[0]) мы устанавливаем значения ранее созданного элемента Row.

Option2: Карта + кортежи

df.rdd.map(lambda l: (*l[0],)).toDF(col_names).show()

Здесь мы просто распаковываем все элементы списка в новый кортеж.

Option3: выбрать оператор

import pyspark.sql.functions as f

cols = [f.col('ColA').getItem(i).alias(c) for i,c in enumerate(col_names)]

df.select(*cols).show()

Выход:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|val1|val2|val3|
|val4|val5|val6|
|val7|val8|val9|
+----+----+----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...