Присоединение Dataframes с тем же именем в pyspark - PullRequest
0 голосов
/ 03 октября 2018

У меня есть два кадра данных, которые были извлечены из двух файлов CSV.

+---+----------+-----------------+
| ID|  NUMBER  |  RECHARGE_AMOUNT|
+---+----------+-----------------+
|  1|9090909092|               30|
|  2|9090909093|               30|
|  3|9090909090|               30|
|  4|9090909094|               30|
+---+----------+-----------------+

и

+---+----------+-----------------+
| ID|  NUMBER  |  RECHARGE_AMOUNT|
+---+----------+-----------------+
|  1|9090909092|               40|
|  2|9090909093|               50|
|  3|9090909090|               60|
|  4|9090909094|               70|
+---+----------+-----------------+

Я пытаюсь объединить эти два данных с помощью NUMBER coumn с помощью pysparkкод dfFinal = dfFinal.join(df2, on=['NUMBER'], how='inner') и новый фрейм данных генерируется следующим образом.

+----------+---+-----------------+---+-----------------+
|  NUMBER  | ID|  RECHARGE_AMOUNT| ID|  RECHARGE_AMOUNT|
+----------+---+-----------------+---+-----------------+
|9090909092|  1|               30|  1|               40|
|9090909093|  2|               30|  2|               50|
|9090909090|  3|               30|  3|               60|
|9090909094|  4|               30|  4|               70|
+----------+---+-----------------+---+-----------------+

Но я не могу записать этот фрейм данных в файл, поскольку фрейм данных после объединения имеет дублирующийся столбец.Я использую следующий код.dfFinal.coalesce(1).write.format('com.databricks.spark.csv').save('/home/user/output',header = 'true') Есть ли способ избежать дублирования столбца после объединения в искру.Ниже приведен мой код pyspark.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("test1").getOrCreate()
files = ["/home/user/test1.txt", "/home/user/test2.txt"]
dfFinal = spark.read.load(files[0],format="csv", sep=",", inferSchema="false", header="true", mode="DROPMALFORMED")
dfFinal.show()
for i in range(1,len(files)):
    df2 = spark.read.load(files[i],format="csv", sep=",", inferSchema="false", header="true", mode="DROPMALFORMED")
    df2.show()
    dfFinal = dfFinal.join(df2, on=['NUMBER'], how='inner')
dfFinal.show()
dfFinal.coalesce(1).write.format('com.databricks.spark.csv').save('/home/user/output',header = 'true')

Мне нужно сгенерировать уникальное имя столбца.ie: если я дал два файла в массиве файлов с одинаковым заголовком, он должен сгенерироваться следующим образом.

+----------+----+-------------------+-----+-------------------+
|  NUMBER  |IDx |  RECHARGE_AMOUNTx | IDy |  RECHARGE_AMOUNTy |
+----------+----+-------------------+-----+-------------------+
|9090909092|  1 |               30  |  1  |               40  |
|9090909093|  2 |               30  |  2  |               50  |
|9090909090|  3 |               30  |  3  |               60  |
|9090909094|  4 |               30  |  4  |               70  |
+----------+---+-----------------+---+------------------------+

В панде я могу использовать аргумент suffixes, как показано ниже dfFinal = dfFinal.merge(df2,left_on='NUMBER',right_on='NUMBER',how='inner',suffixes=('x', 'y'),sort=True), который сгенерирует вышеуказанный кадр данных.Есть ли способ, которым я могу повторить это на pyspark.

1 Ответ

0 голосов
/ 03 октября 2018

Вы можете выбрать столбцы из каждого кадра данных и присвоить ему псевдоним.
Вот так.

dfFinal = dfFinal.join(df2, on=['NUMBER'], how='inner') \
                 .select('NUMBER',
                         dfFinal.ID.alias('ID_1'),
                         dfFinal.RECHARGE_AMOUNT.alias('RECHARGE_AMOUNT_1'),
                         df2.ID.alias('ID_2'),
                         df2.RECHARGE_AMOUNT.alias('RECHARGE_AMOUNT_2'))
...