Как взорвать несколько столбцов, разных типов и разной длины? - PullRequest
3 голосов
/ 08 июля 2019

У меня есть DF со столбцами разных временных циклов (1/6, 3/6, 6/6 и т. Д.), И я хотел бы «взорвать» все столбцы, чтобы создать новый DF, в котором каждая строкацикл 1/6.

from pyspark import Row 
from pyspark.sql import SparkSession 
from pyspark.sql.functions import explode, arrays_zip, col

spark = SparkSession.builder \
    .appName('DataFrame') \
    .master('local[*]') \
    .getOrCreate()

df = spark.createDataFrame([Row(a=1, b=[1, 2, 3, 4, 5, 6], c=[11, 22, 33], d=['foo'])])

|  a|                 b|           c|    d|
+---+------------------+------------+-----+
|  1|[1, 2, 3, 4, 5, 6]|[11, 22, 33]|[foo]|
+---+------------------+------------+-----+

Я делаю взрыв:

df2 = (df.withColumn("tmp", arrays_zip("b", "c", "d"))
       .withColumn("tmp", explode("tmp"))
       .select("a", col("tmp.b"), col("tmp.c"), "d"))

Но вывод не тот, что я хочу:

|  a|  b|   c|    d|
+---+---+----+-----+
|  1|  1|  11|[foo]|
|  1|  2|  22|[foo]|
|  1|  3|  33|[foo]|
|  1|  4|null|[foo]|
|  1|  5|null|[foo]|
|  1|  6|null|[foo]|
+---+---+----+-----+

Iхотел бы, чтобы это выглядело так:

|  a|  b|  c|  d|
+---+---+---+---+
|  1|  1| 11|foo|
|   |  2|   |   |
|   |  3| 22|   |
|   |  4|   |   |
|   |  5| 33|   |
|   |  6|   |   |
+---+---+---+---+

Я новичок в Spark, и с самого начала у меня были сложные темы!:)

Обновление 2019-07-15 : Может быть, у кого-то есть решение без использования UDF?-> ответил @ jxc

Обновление 2019-07-17 : Может быть, у кого-то есть решение, как изменить последовательности нулевых <-> значений в более сложном порядке?Как в c - Null, 11, Null, 22, Null, 33 или более сложной ситуации, как мы хотим, чтобы в столбце d первое значение было Null, затем foo, затем Null, Null, Null:

|  a|  b|  c|  d|
+---+---+---+---+
|  1|  1|   |   |
|   |  2| 11|foo|
|   |  3|   |   |
|   |  4| 22|   |
|   |  5|   |   |
|   |  6| 33|   |
+---+---+---+---+

Ответы [ 2 ]

4 голосов
/ 12 июля 2019

Вот один способ без использования udf:

ОБНОВЛЕНИЕ на 2019/07/17: скорректировал SQL-код и добавил N = 6 в качестве параметра в SQL.

ОБНОВЛЕНИЕ 2019/07/16: удален временный столбец t, заменен константой array(0,1,2,3,4,5) в функции transform .В этом случае мы можем работать со значением элементов массива напрямую, а не с их индексами.

ОБНОВЛЕНИЕ: Я удалил оригинальный метод, который использует функции String и преобразует типы данных в массивеэлементы все в строку и менее эффективны.Функции высшего порядка Spark SQL с Spark 2.4+ должны быть лучше оригинального метода.

Настройка

from pyspark.sql import functions as F, Row

df = spark.createDataFrame([ Row(a=1, b=[1, 2, 3, 4, 5, 6], c=['11', '22', '33'], d=['foo'], e=[111,222]) ])

>>> df.show()
+---+------------------+------------+-----+----------+
|  a|                 b|           c|    d|         e|
+---+------------------+------------+-----+----------+
|  1|[1, 2, 3, 4, 5, 6]|[11, 22, 33]|[foo]|[111, 222]|
+---+------------------+------------+-----+----------+

# columns you want to do array-explode
cols = df.columns

# number of array elements to set
N = 6

Использование функции высшего порядка SQL: transform

Используйте функцию высшего порядка Spark SQL: transform (), выполните следующие действия:

  1. создайте следующий код Spark SQL, где {0}будет заменен на column_name, {1} будет заменен на N:

    stmt = '''
       CASE
          WHEN '{0}' in ('d') THEN
            transform(sequence(0,{1}-1), x -> IF(x == 1, `{0}`[0], NULL))
          WHEN size(`{0}`) <= {1}/2 AND size(`{0}`) > 1 THEN
            transform(sequence(0,{1}-1), x -> IF(((x+1)*size(`{0}`))%{1} == 0, `{0}`[int((x-1)*size(`{0}`)/{1})], NULL))
          ELSE `{0}`
        END AS `{0}`
    '''
    

    Примечание: только преобразование массиваопределяется, когда массив содержит более одного (если не указано в отдельном предложении WHEN) и <= N/2 элементов (в данном примере 1 < size <= 3).массивы с другим размером будут сохранены как есть.

  2. Запустите приведенный выше SQL с помощью selectExpr () для всех необходимых столбцов

    df1 = df.withColumn('a', F.array('a')) \
            .selectExpr(*[ stmt.format(c,N) for c in cols ])
    
    >>> df1.show()
    +---+------------------+----------------+-----------+---------------+
    |  a|                 b|               c|          d|              e|
    +---+------------------+----------------+-----------+---------------+
    |[1]|[1, 2, 3, 4, 5, 6]|[, 11,, 22,, 33]|[, foo,,,,]|[,, 111,,, 222]|
    +---+------------------+----------------+-----------+---------------+
    
  3. run arrays_zip и взорваться :

    df_new = df1.withColumn('vals', F.explode(F.arrays_zip(*cols))) \
                .select('vals.*') \
                .fillna('', subset=cols)
    
    >>> df_new.show()
    +----+---+---+---+----+
    |   a|  b|  c|  d|   e|
    +----+---+---+---+----+
    |   1|  1|   |   |null|
    |null|  2| 11|foo|null|
    |null|  3|   |   | 111|
    |null|  4| 22|   |null|
    |null|  5|   |   |null|
    |null|  6| 33|   | 222|
    +----+---+---+---+----+
    

    Примечание : fillna('', subset=cols) только измененные столбцы, содержащие строки

Inодна цепочка методов:

df_new = df.withColumn('a', F.array('a')) \
           .selectExpr(*[ stmt.format(c,N) for c in cols ]) \
           .withColumn('vals', F.explode(F.arrays_zip(*cols))) \
           .select('vals.*') \
           .fillna('', subset=cols)

Объяснение с помощью функции преобразования:

Функция преобразования (список ниже, отражает старый пересмотр требований)

transform(sequence(0,5), x -> IF((x*size({0}))%6 == 0, {0}[int(x*size({0})/6)], NULL))

Asупомянутое в сообщении, {0} будет заменено именем столбца.Здесь мы используем столбец - c, который содержит 3 элемента в качестве примера:

  • В функции преобразования sequence(0,5) создает постоянный массив array(0,1,2,3,4,5) с 6 элементами, а остальные задают лямбдуфункция с одним аргументом x, имеющая значение элементов.
  • IF (условие, true_value, false_value) : стандартная функция SQL
  • условиемы применили: (x*size(c))%6 == 0, где size(c)=3, если это условие истинно, оно вернет c [int (x * size (c) / 6)] , в противном случае,return NULL .поэтому для x от 0 до 5 у нас будет:

    ((0*3)%6)==0) true   -->  c[int(0*3/6)] = c[0]
    ((1*3)%6)==0) false  -->  NULL
    ((2*3)%6)==0) true   -->  c[int(2*3/6)] = c[1]
    ((3*3)%6)==0) false  -->  NULL
    ((4*3)%6)==0) true   -->  c[int(4*3/6)] = c[2]
    ((5*3)%6)==0) false  -->  NULL
    

Аналогично столбцу-e, который содержит массив из 2 элементов.

1 голос
/ 09 июля 2019

Чтобы получить вывод, вам нужно заменить col a на массив и вставить пустые значения в массив c.

from pyspark.sql.types import ArrayType, IntegerType
from pyspark.sql.functions import explode, arrays_zip, col, array

def fillArrayVals(a):
  for i in [1,3,5]:
    a.insert(i,None)
  return a

fillArrayValsUdf = udf(fillArrayVals, ArrayType(IntegerType(), True))    

df = spark.createDataFrame([Row(a=1, b=[1, 2, 3, 4, 5, 6], c=[11, 22, 33], d=['foo'])])
df = df.withColumn("a", array(col("a"))).withColumn("c", updateArrayUdf("c"))
df = df.withColumn("tmp", arrays_zip("a","b", "c", "d"))\
   .withColumn("tmp", explode("tmp"))\
   .select(col("tmp.a"), col("tmp.b"), col("tmp.c"), col("tmp.d"))

Приведенный выше код приводит к тому, что вы можете привести к строке, чтобы показывать пустые значения вместо нуля

+----+---+----+----+
|   a|  b|   c|   d|
+----+---+----+----+
|   1|  1|  11| foo|
|null|  2|null|null|
|null|  3|  22|null|
|null|  4|null|null|
|null|  5|  33|null|
|null|  6|null|null|
+----+---+----+----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...