Репликация строк в фрейме данных Pyspark - PullRequest
1 голос
/ 27 апреля 2020

У меня есть следующий фрейм данных:

id, test, date
1, A, 01/20/2020
1, B, 01/25/2020
2, A, 02/20/2020
2, B, 02/25/2020
2, C, 02/25/2020

Поскольку количество различных тестов равно 3 (A, B, C), я хочу вставить строку для теста C для идентификатора 1 с датой "NA"

Результирующий кадр данных должен быть:

id, test, date
1, A, 01/20/2020
1, B, 01/25/2020
1, C, NA
2, A, 02/20/2020
2, B, 02/25/2020
2, C, 02/25/2020

Ответы [ 2 ]

2 голосов
/ 27 апреля 2020

Пример данных

data = [
    ('1', 'A', '01/20/2020'),
    ('1', 'B', '01/25/2020'),
    ('2', 'A', '02/20/2020'),
    ('2', 'B', '02/25/2020'),
    ('2', 'C', '02/25/2020'),
]
df = spark.createDataFrame(data, ['id', 'test', 'date'])

Сначала создайте перекрестную таблицу

# Solution
uniq_ids = df.select('id').distinct().coalesce(1)
uniq_tests = df.select('test').distinct().coalesce(1)
skeleton = (
    uniq_ids.
        crossJoin(
            uniq_tests
        )
)
+---+----+
| id|test|
+---+----+
|  1|   B|
|  2|   B|
|  1|   C|
|  2|   C|
|  1|   A|
|  2|   A|
+---+----+

Затем присоединитесь к ней слева

(
    skeleton.
        join(
            df,
            ['id', 'test'],
            'left'
        ).
        orderBy('id', 'test', 'date').
        show(truncate=False)
)
+---+----+----------+                                                           
|id |test|date      |
+---+----+----------+
|1  |A   |01/20/2020|
|1  |B   |01/25/2020|
|1  |C   |null      |
|2  |A   |02/20/2020|
|2  |B   |02/25/2020|
|2  |C   |02/25/2020|
+---+----+----------+

Рекомендация с реальными данными

  • Если у вас есть множество уникальных id с и уникальных test с, вы можете изменить количество разделов, которые вы coalesce(N).
1 голос
/ 27 апреля 2020

Для spark2.4+ очень масштабируемый способ (without any join) будет использовать groupBy, collect_list и array function / higher order functions для определения отсутствующих ссылок, добавьте эти ссылки и затем explode.. Он будет работать для любого порядка отсутствует A,B or C.

#sampledataframe
#df.show()
#+---+----+----------+
#| id|test|      date|
#+---+----+----------+
#|  1|   A|01/20/2020|
#|  2|   A|02/20/2020|
#|  2|   B|02/25/2020|
#|  2|   C|02/25/2020|
#+---+----+----------+


from pyspark.sql import functions as F
df.groupBy("id").agg(F.collect_list("test").alias("x"),F.collect_list("date").alias("col2"))\
                   .withColumn("zip", F.arrays_zip(F.col("x"),F.col("col2")))\
                   .withColumn("except", F.array_except(F.array(*(F.lit(x) for x in ['A','B','C'])),"x")).drop("x","col2")\
                   .withColumn("except", F.expr("""transform(except,x-> struct(x,'NA'))"""))\
                   .withColumn("zipped", F.explode(F.array_union("zip","except")))\
                   .select("id",F.col("zipped.x").alias("test"),F.col("zipped.col2").alias("date"))\
                   .show(truncate=False)

#+---+----+----------+
#|id |test|date      |
#+---+----+----------+
#|1  |A   |01/20/2020|
#|1  |B   |01/25/2020|
#|1  |C   |NA        |
#|2  |A   |02/20/2020|
#|2  |B   |02/25/2020|
#|2  |C   |02/25/2020|
#+---+----+----------+

Physical Plan for Join Solution: (как показано @cPak)

== Physical Plan ==
Sort [id#1206L ASC NULLS FIRST, test#1207 ASC NULLS FIRST, date#1339 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(id#1206L ASC NULLS FIRST, test#1207 ASC NULLS FIRST, date#1339 ASC NULLS FIRST, 200), [id=#1110]
   +- *(6) Project [id#1206L, test#1207, date#1339]
      +- SortMergeJoin [id#1206L, test#1207], [id#1337L, test#1338], LeftOuter
         :- Sort [id#1206L ASC NULLS FIRST, test#1207 ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(id#1206L, test#1207, 200), [id=#1101]
         :     +- CartesianProduct
         :        :- Coalesce 1
         :        :  +- *(2) HashAggregate(keys=[id#1206L], functions=[])
         :        :     +- Exchange hashpartitioning(id#1206L, 200), [id=#1089]
         :        :        +- *(1) HashAggregate(keys=[id#1206L], functions=[])
         :        :           +- *(1) Project [id#1206L]
         :        :              +- *(1) Scan ExistingRDD[id#1206L,test#1207,date#1208]
         :        +- Coalesce 1
         :           +- *(4) HashAggregate(keys=[test#1207], functions=[])
         :              +- Exchange hashpartitioning(test#1207, 200), [id=#1095]
         :                 +- *(3) HashAggregate(keys=[test#1207], functions=[])
         :                    +- *(3) Project [test#1207]
         :                       +- *(3) Scan ExistingRDD[id#1206L,test#1207,date#1208]
         +- Sort [id#1337L ASC NULLS FIRST, test#1338 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id#1337L, test#1338, 200), [id=#1104]
               +- *(5) Filter (isnotnull(id#1337L) && isnotnull(test#1338))
                  +- *(5) Scan ExistingRDD[id#1337L,test#1338,date#1339]

Physical Plan for Non-Join solution(using array functions+explode)

== Physical Plan ==
*(2) Project [id#1206L, zipped#1400.x AS test#1405, zipped#1400.col2 AS date#1406]
+- *(2) Generate explode(array_union(zip#1380, except#1394)), [id#1206L], false, [zipped#1400]
   +- *(2) Project [id#1206L, arrays_zip(x#1374, col2#1376) AS zip#1380, transform(array_except([A,B,C], x#1374), lambdafunction(named_struct(x, lambda x#1395, col2, NA), lambda x#1395, false)) AS except#1394]
      +- ObjectHashAggregate(keys=[id#1206L], functions=[collect_list(test#1207, 0, 0), collect_list(date#1208, 0, 0)])
         +- Exchange hashpartitioning(id#1206L, 200), [id=#1192]
            +- *(1) Project [id#1206L, date#1208, test#1207]
               +- *(1) Scan ExistingRDD[id#1206L,test#1207,date#1208]

Как мы видим, решение объединения производит много обменов данными в случайном порядке и использует декартово произведение (крайне неэффективно для больших данных). ). Движение данных будет чрезвычайно высоким и обременительным для любого кластера. По сравнению с решением для массива / разнесения, перемещение данных будет намного ниже, а обработка будет намного быстрее.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...