Добавление пользовательского столбца в фрейм данных pyspark с использованием udf передавая столбцы в качестве аргумента - PullRequest
0 голосов
/ 30 ноября 2018

У меня есть искровой фрейм данных с двумя столбцами, и я пытаюсь добавить новый столбец, ссылаясь на новое значение для этих столбцов.Я беру эти значения из словаря, который содержит правильное значение для столбца

+--------------+--------------------+
|       country|                 zip|
+--------------+--------------------+
|        Brazil|                7541|
|United Kingdom|                5678|
|         Japan|                1234|
|       Denmark|                2345|
|        Canada|                4567|
|         Italy|                6031|
|        Sweden|                4205|
|        France|                6111|
|         Spain|                8555|
|         India|                2552|
+--------------+--------------------+

Правильное значение для страны должно быть Индия, а zip должно быть 1234, и это хранится в словаре

column_dict = {'country' : 'India', zip: 1234}

Я пытаюсь сделать новое значение столбца как «Бразилия: Индия, Zip: 1234», где значение столбца отличается от этих значений.

Я пробовал его следующим образомно он возвращает пустой столбец, но функция возвращает желаемое значение

     cols = list(df.columns)
     col_list = list(column_dict.keys())

def update(df, cols = cols , col_list = col_list):
   z = []
   for col1, col2 in zip(cols,col_list):
      if col1 == col2:
         if df.col1 != column_dict[col2]: 
            z.append("{'col':" + col2  + ", 'reco': " + str(column_dict[col2]) + "}")   
         else:
            z.append("{'col':" + col2  + ", 'reco': }")

my_udf = udf(lambda x: update(x, cols, col_list))
z = y.withColumn("NewValue", lit(my_udf(y, cols,col_list)))

Если я экспортирую тот же выходной информационный кадр в значение csv, то в этом случае части добавляются с '\'.Как получить точное значение функции в столбце?

1 Ответ

0 голосов
/ 01 декабря 2018

Простой способ - создать фрейм данных из ваших dictionary и union() в ваш основной фрейм данных, а затем groupby и получить значение last.здесь вы можете сделать это:

sc = SparkContext.getOrCreate()

newDf = sc.parallelize([
    {'country' : 'India', 'zip': 1234}
]).toDF()

newDF.show()

newDF:

+-------+----+
|country| zip|
+-------+----+
|  India|1234|
+-------+----+

и finalDF:

unionDF = df.union(newDF)

unionDF.show()
+--------------+--------------------+
|       country|                 zip|
+--------------+--------------------+
|        Brazil|                7541|
|United Kingdom|                5678|
|         Japan|                1234|
|       Denmark|                2345|
|        Canada|                4567|
|         Italy|                6031|
|        Sweden|                4205|
|        France|                6111|
|         Spain|                8555|
|         India|                2552|
|         India|                1234|
+--------------+--------------------+

и в конце сделайте groupby и last:

import pyspark.sql.functions as f

finalDF = unionDF.groupbby('country').agg(f.last('zip'))

finalDF.show()

+--------------+--------------------+
|       country|                 zip|
+--------------+--------------------+
|        Brazil|                7541|
|United Kingdom|                5678|
|         Japan|                1234|
|       Denmark|                2345|
|        Canada|                4567|
|         Italy|                6031|
|        Sweden|                4205|
|        France|                6111|
|         Spain|                8555|
|         India|                1234|
+--------------+--------------------+
...