Как сжать два столбца массива в Spark SQL - PullRequest
0 голосов
/ 21 января 2019

У меня есть датафрейм Pandas. Я попытался сначала объединить два столбца, содержащих строковые значения, в список, а затем, используя zip, соединял каждый элемент списка с помощью _. Мой набор данных, как показано ниже:

df['column_1']: 'abc, def, ghi'
df['column_2']: '1.0, 2.0, 3.0'

Я хотел объединить эти два столбца в третьем столбце, как показано ниже для каждой строки моего информационного кадра.

df['column_3']: [abc_1.0, def_2.0, ghi_3.0]

Я успешно сделал это в python, используя приведенный ниже код, но фрейм данных довольно большой, и его запуск на весь фрейм данных занимает очень много времени. Я хочу сделать то же самое в PySpark для эффективности. Я успешно прочитал данные в spark dataframe, но мне трудно определить, как реплицировать функции Pandas с помощью эквивалентных функций PySpark. Как я могу получить желаемый результат в PySpark?

df['column_3'] = df['column_2']
for index, row in df.iterrows():
  while index < 3:
    if isinstance(row['column_1'], str):      
      row['column_1'] = list(row['column_1'].split(','))
      row['column_2'] = list(row['column_2'].split(','))
      row['column_3'] = ['_'.join(map(str, i)) for i in zip(list(row['column_1']), list(row['column_2']))]

Я преобразовал два столбца в массивы в PySpark, используя приведенный ниже код

from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import col, split

crash.withColumn("column_1",
    split(col("column_1"), ",\s*").cast(ArrayType(StringType())).alias("column_1")
)
crash.withColumn("column_2",
    split(col("column_2"), ",\s*").cast(ArrayType(StringType())).alias("column_2")
)

Теперь все, что мне нужно, - это сжать каждый элемент массива в двух столбцах, используя '_'. Как я могу использовать zip с этим? Любая помощь приветствуется.

Ответы [ 2 ]

0 голосов
/ 21 января 2019

Spark-SQL эквивалент Python будет pyspark.sql.functions.arrays_zip:

pyspark.sql.functions.arrays_zip(*cols)

Функция сбора: возвращает объединенный массив структур, в которомN-ая структура содержит все N-ые значения входных массивов.

Так что, если у вас уже есть два массива:

from pyspark.sql.functions import split

df = (spark
    .createDataFrame([('abc, def, ghi', '1.0, 2.0, 3.0')])
    .toDF("column_1", "column_2")
    .withColumn("column_1", split("column_1", "\s*,\s*"))
    .withColumn("column_2", split("column_2", "\s*,\s*")))

Вы можете просто применить его к результату

from pyspark.sql.functions import arrays_zip

df_zipped = df.withColumn(
  "zipped", arrays_zip("column_1", "column_2")
)

df_zipped.select("zipped").show(truncate=False)
+------------------------------------+
|zipped                              |
+------------------------------------+
|[[abc, 1.0], [def, 2.0], [ghi, 3.0]]|
+------------------------------------+

Теперь для объединения результатов вы можете transform ( Как использовать функцию преобразования старшего порядка? , TypeError: Столбец не повторяется - Какперебрать ArrayType ()? ):

df_zipped_concat = df_zipped.withColumn(
    "zipped_concat",
     expr("transform(zipped, x -> concat_ws('_', x.column_1, x.column_2))")
) 

df_zipped_concat.select("zipped_concat").show(truncate=False)
+---------------------------+
|zipped_concat              |
+---------------------------+
|[abc_1.0, def_2.0, ghi_3.0]|
+---------------------------+

Примечание :

Введены функции высшего порядка transform и arrays_zipв Apache Spark 2.4.

0 голосов
/ 21 января 2019

Вы также можете использовать UDF для архивирования столбцов разделенного массива,

df = spark.createDataFrame([('abc,def,ghi','1.0,2.0,3.0')], ['col1','col2']) 
+-----------+-----------+
|col1       |col2       |
+-----------+-----------+
|abc,def,ghi|1.0,2.0,3.0|
+-----------+-----------+ ## Hope this is how your dataframe is

from pyspark.sql import functions as F
from pyspark.sql.types import *

def concat_udf(*args):
    return ['_'.join(x) for x in zip(*args)]

udf1 = F.udf(concat_udf,ArrayType(StringType()))
df = df.withColumn('col3',udf1(F.split(df.col1,','),F.split(df.col2,',')))
df.show(1,False)
+-----------+-----------+---------------------------+
|col1       |col2       |col3                       |
+-----------+-----------+---------------------------+
|abc,def,ghi|1.0,2.0,3.0|[abc_1.0, def_2.0, ghi_3.0]|
+-----------+-----------+---------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...