Как использовать groupBy, collect_list, arrays_zip и взрываться вместе в pyspark для решения определенной бизнес-проблемы - PullRequest
1 голос
/ 15 апреля 2020

Я новичок в мире pyspark.
Хотите объединить два DataFrames df и df_sd в столбце days При объединении следует также использовать столбец Name из df DataFrame. Если для комбинации Name и days из df DataFrame нет подходящего значения, тогда оно должно иметь null. Пожалуйста, смотрите код ниже и желаемый результат для лучшего понимания.

 import findspark

findspark.init("/opt/spark")

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.sql.types import IntegerType

Mydata = Row("Name", "Number", "days")

spark = SparkSession \
    .builder \
    .appName("DataFrame Learning") \
    .getOrCreate()

sqlContext = SQLContext(spark)

mydata1 = Mydata("A", 100, 1)
mydata2 = Mydata("A", 200, 2)
mydata3 = Mydata("B", 300, 1)
mydata4 = Mydata("B", 400, 2)
mydata5 = Mydata("B", 500, 3)
mydata6 = Mydata("C", 600, 1)
myDataAll = [mydata1, mydata2, mydata3, mydata4, mydata5, mydata6]

STANDARD_TENORS = [1, 2, 3]

df_sd = spark.createDataFrame(STANDARD_TENORS, IntegerType())
df_sd = df_sd.withColumnRenamed("value", "days")

df_sd.show()

df = spark.createDataFrame(myDataAll)
df.show()
+----+
# |days|
# +----+
# |   1|
# |   2|
# |   3|
# +----+
# 
# +----+------+----+
# |Name|Number|days|
# +----+------+----+
# |   A|   100|   1|
# |   A|   200|   2|
# |   B|   300|   1|
# |   B|   400|   2|
# |   B|   500|   3|
# |   C|   600|   1|
# +----+------+----+

Ниже приведены ожидаемые результаты от присоединения


# +----+------+----+
# |Name|Number|days|
# +----+------+----+
# |   A|   100|   1|
# |   A|   200|   2|
# |   A|Null  |   3|
# |   B|   300|   1|
# |   B|   400|   2|
# |   B|   500|   3|
# |   C|   600|   1|
# |   C|Null  |   2|
# |   C|Null  |   3|
# +----+------+----+

.

1 Ответ

1 голос
/ 15 апреля 2020

Если df_sd не будет огромным списком, и у вас будет spark2.4, вы можете сделать это, создав новый столбец в df со списком дней (1 , 2,3), а затем используйте groupBy, collect_list, arrays_zip, & explode. OrderBy перед groupBy существует, чтобы гарантировать, что список будет собран в правильном порядке.

df.show()
+----+------+----+
|Name|Number|days|
+----+------+----+
|   A|   100|   1|
|   A|   200|   2|
|   B|   300|   1|
|   B|   400|   2|
|   B|   500|   3|
|   C|   600|   1|
+----+------+----+
STANDARD_TENORS #->  [1, 2, 3] 
                #-> should be ordered



from pyspark.sql import functions as F
df.withColumn("days2", F.array(*[F.lit(x) for x in STANDARD_TENORS]))\
  .orderBy("Name","days")\
  .groupBy("Name").agg(F.collect_list("Number").alias("Number")\
                      ,F.first("days2").alias("days"))\
  .withColumn("zipped", F.explode(F.arrays_zip("Number","days")))\
  .select("Name","zipped.*").orderBy("Name","days").show()


+----+------+----+
|Name|Number|days|
+----+------+----+
|   A|   200|   1|
|   A|   100|   2|
|   A|  null|   3|
|   B|   300|   1|
|   B|   400|   2|
|   B|   500|   3|
|   C|   600|   1|
|   C|  null|   2|
|   C|  null|   3|
+----+------+----+

Если вы хотите использовать join, вы можете сделать это аналогичным образом:

from pyspark.sql import functions as F
df_sd.agg(F.collect_list("days").alias("days")).join(\
df.orderBy("Name","days").groupBy("Name")\
.agg(F.collect_list("Number").alias("Number"),F.collect_list("days").alias("days1")),\
                      F.size("days")>=F.size("days1")).drop("days1")\
     .withColumn("zipped", F.explode(F.arrays_zip("Number","days")))\
     .select("Name","zipped.*")\
     .orderBy("Name","days")\
     .show()

UPDATE:

Обновлено Для обработки любого ордера или для любого значения в Number .. Я мог бы сделать код немного более краткий, но я сохранил его таким, чтобы вы могли видеть все те столбцы, которые я использовал, чтобы понять логи c. Не стесняйтесь задавать любые вопросы.

df.show()
#newsampledataframe
+----+------+----+
|Name|Number|days|
+----+------+----+
|   A|   100|   1|
|   A|   200|   2|
|   B|   300|   1|
|   B|   400|   2|
|   B|   500|   3|
|   C|   600|   3|
+----+------+----+
#STANDARD_TENORS = [1, 2, 3]

from pyspark.sql import functions as F
df.withColumn("days2", F.array(*[F.lit(x) for x in STANDARD_TENORS]))\
  .groupBy("Name").agg(F.collect_list("Number").alias("col1")\
                      ,F.first("days2").alias("days2"),F.collect_list("days").alias("x"))\
  .withColumn("days3", F.arrays_zip(F.col("col1"),F.col("x")))\
  .withColumn("days4", F.array_except("days2","x"))\
  .withColumn("day5", F.expr("""transform(days4,x-> struct(bigint(-1),x))"""))\
  .withColumn("days3", F.explode(F.array_union("days3","day5"))).select("Name","days3.*")\
  .withColumn("Number", F.when(F.col("col1")==-1, F.lit(None)).otherwise(F.col("col1"))).drop("col1")\
  .select("Name", "Number", F.col("x").alias("days"))\
  .orderBy("Name","days")\
  .show(truncate=False)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...