Используйте список и замените колонку pyspark - PullRequest
0 голосов
/ 14 мая 2019

Предположим, у меня есть список new_id_acc = [6,8,1,2,4], и у меня есть PySpark DataFrame, например

id_acc  |  name  | 
  10    |  ABC   |
  20    |  XYZ   |
  21    |  KBC   |
  34    |  RAH   |
  19    |  SPD   |

Я хочу заменить столбец pyspark id_acc значением new_id_acc, как я могудостичь и сделать это.Я попытался и обнаружил, что lit () можно использовать, но для постоянного значения, но не нашел ничего, как это сделать для списка.

После замены я хочу, чтобы мой PySpark Dataframe выглядел следующим образом

id_acc  |  name  | 
   6    |  ABC   |
   8    |  XYZ   |
   1    |  KBC   |
   2    |  RAH   |
   4    |  SPD   |

Ответы [ 2 ]

1 голос
/ 14 мая 2019

Вероятно, длинный ответ, но это работает.

df = spark.sparkContext.parallelize([(10,'ABC'),(20,'XYZ'),(21,'KBC'),(34,'ABC'),(19,'SPD')]).toDF(('id_acc', 'name'))
df.show()
+------+----+
|id_acc|name|
+------+----+
|    10| ABC|
|    20| XYZ|
|    21| KBC|
|    34| ABC|
|    19| SPD|
+------+----+
new_id_acc = [6,8,1,2,4]
indx = ['ABC','XYZ','KBC','ABC','SPD']
from pyspark.sql.types import *
myschema= StructType([ StructField("indx", StringType(), True),StructField("new_id_ac", IntegerType(), True)])
df1=spark.createDataFrame(zip(indx,new_id_acc),schema = myschema)
df1.show()
+----+---------+
|indx|new_id_ac|
+----+---------+
| ABC|        6|
| XYZ|        8|
| KBC|        1|
| ABC|        2|
| SPD|        4|
+----+---------+
dfnew = df.join(df1, df.name == df1.indx,how='left').drop(df1.indx).select('new_id_ac','name').sort('name').dropDuplicates(['new_id_ac'])
dfnew.show()
+---------+----+
|new_id_ac|name|
+---------+----+
|        1| KBC|
|        6| ABC|
|        4| SPD|
|        8| XYZ|
|        2| ABC|
+---------+----+
0 голосов
/ 14 мая 2019

Идея состоит в том, чтобы создать столбец consecutive serial/row numbers, а затем использовать их для получения соответствующих значений из списка.

# Creating the requisite DataFrame
from pyspark.sql.functions import row_number,lit, udf
from pyspark.sql.window import Window
valuesCol = [(10,'ABC'),(20,'XYZ'),(21,'KBC'),(34,'RAH'),(19,'SPD')]
df = spark.createDataFrame(valuesCol,['id_acc','name'])
df.show()
+------+----+ 
|id_acc|name| 
+------+----+ 
|    10| ABC| 
|    20| XYZ| 
|    21| KBC| 
|    34| RAH| 
|    19| SPD| 
+------+----+ 

Вы можете создать ряд / серийные номера, как сделано здесь .

Обратите внимание, что A ниже это просто значение dummy, так как нам не нужно упорядочивать значения. Мы просто хотим row number.

w = Window().orderBy(lit('A'))
df = df.withColumn('serial_number', row_number().over(w))
df.show()
+------+----+-------------+ 
|id_acc|name|serial_number| 
+------+----+-------------+ 
|    10| ABC|            1| 
|    20| XYZ|            2| 
|    21| KBC|            3| 
|    34| RAH|            4| 
|    19| SPD|            5| 
+------+----+-------------+

В качестве последнего шага мы получим доступ к элементам из списка, предоставленного OP, используя row number. Для этого мы используем udf.

new_id_acc = [6,8,1,2,4]
mapping = udf(lambda x: new_id_acc[x-1])
df = df.withColumn('id_acc', mapping(df.serial_number)).drop('serial_number')
df.show()
+------+----+ 
|id_acc|name| 
+------+----+ 
|     6| ABC| 
|     8| XYZ| 
|     1| KBC| 
|     2| RAH| 
|     4| SPD| 
+------+----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...