Слияние двух информационных фреймов с использованием Pyspark - PullRequest
0 голосов
/ 03 июня 2019

У меня есть 2 DF для слияния:

DF1 -> содержит акции

Plant   Art_nr    Tot
A        X         5
B        Y         4

DF2 - Z содержит открытую поставку

Plant    Art_nr   Tot
A        X        1
C        Z        3

Я хотел бы получить DF3, где для каждой комбинации Plant и Art_nr: - если есть совпадение между DF1.Plant & Art_nr и DF2.Plant & Art_nr, я получаю разницу между DF1 и DF2 - если нет совпадения между DF1.Plant & Art_nr и DF2.Plant & Art_nr, я сохраняю исходные значения из DF1 и DF2

DF3 ->

Plant    Art_nr   Total
A        X        4
B        Y        4
C        Z        3

Я создал поле "Concat" в DF1 и DF2 для объединения Plant и Art_nr, и я попытался с полным объединением +, когда + в противном случае, но я не могу найти правильный синтаксис

DF1.join(DF2, ["Concat"],"full").withColumn("Total",when(DF1.Concat.isin(DF2.Concat)), DF1.Tot - DF2.Tot).otherwise(when(not(DF1.Concat.isin(DF2.Concat)), DF1.Tot)).show()

Любые предложения об альтернативных функциях, которые я мог бы использовать, или как их правильно использовать?

Ответы [ 3 ]

0 голосов
/ 04 июня 2019

Вы должны объединить оба фрейма данных и затем выполнить case (If-Else) выражение или coalesce функцию.

Это можно сделать несколькими способами, вот несколько примеров.

Option1: Использование функции coalesce в качестве альтернативы CASE-WHEN-NULL

from pyspark.sql.functions import coalesce, lit,abs

cond = [df1.Plant == df2.Plant, df1.Art_nr == df2.Art_nr]

df1.join(df2,cond,'full')  \
.select(coalesce(df1.Plant,df2.Plant).alias('Plant')
       ,coalesce(df1.Art_nr,df2.Art_nr).alias('Art_nr')
       ,abs(coalesce(df1.Tot,lit(0)) - coalesce(df2.Tot,lit(0))).alias('Tot')
       ).show()

Option2: Использовать case выражение в selectExpr()

cond = [df1.Plant == df2.Plant, df1.Art_nr == df2.Art_nr]

df1.alias('a').join(df2.alias('b'),cond,'full')  \
.selectExpr("CASE WHEN a.Plant IS NULL THEN b.Plant ELSE a.Plant END AS Plant",
            "CASE WHEN a.Art_nr IS NULL THEN b.Art_nr ELSE a.Art_nr END AS Art_nr",
            "abs(coalesce(a.Tot,0) - coalesce(b.Tot,0))  AS Tot") \
.show()

#+-----+------+---+
#|Plant|Art_nr|Tot|
#+-----+------+---+
#|    A|     X|  4|
#|    B|     Y|  4|
#|    C|     Z|  3|
#+-----+------+---+

Опция3: Использование when().otherwise()

from pyspark.sql.functions import when,coalesce, lit,abs

cond = [df1.Plant == df2.Plant, df1.Art_nr == df2.Art_nr]

df1.join(df2,cond,'full')  \
.select(when(df1.Plant.isNull(),df2.Plant).otherwise(df1.Plant).alias('Plant')
       ,when(df1.Art_nr.isNull(),df2.Art_nr).otherwise(df1.Art_nr).alias('Art_nr')
       ,abs(coalesce(df1.Tot,lit(0)) - coalesce(df2.Tot,lit(0))).alias('Tot')
       ).show()

Выход:

#+-----+------+---+
#|Plant|Art_nr|Tot|
#+-----+------+---+
#|    A|     X|  4|
#|    B|     Y|  4|
#|    C|     Z|  3|
#+-----+------+---+
0 голосов
/ 05 июня 2019

Я бы, вероятно, сделал объединение с groupBy и переформатировал бы его, чтобы избежать использования UDF и без больших блоков кода.

from pyspark.sql.functions import *

DF3 = DF1.union(DF2.withColumn("Tot", col("Tot") * (-1)))
DF3 = DF3.groupBy("Plant", "Art_nr").agg(sum("Tot").alias("Tot"))
DF3 = DF3.withColumn("Tot", abs(col("Tot")))

Я не уверен на 100%, нет ли побочных эффектов, которые я не рассматривал, и соответствует ли он вашим потребностям.

0 голосов
/ 03 июня 2019

Используйте Udf, кажется многословным, но дает больше ясности

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, array

def score(arr):
    if arr[0] is None:
        return int(arr[1])
    elif arr[1] is None:
        return int(arr[0])
    return (int(arr[0])-int(arr[1]))

udf_final = udf(lambda arr: score(arr), IntegerType())

DF1.join(DF2,"full").withColumn("final_score",udf_final(array("Tot","Total")))
...