Pyspark Объединение двух датафреймов с коллекцией - PullRequest
0 голосов
/ 18 декабря 2018

предположим, у меня есть следующие DataFrames.Как я могу выполнить соединение между ними двумя, чтобы у меня был окончательный вывод, в котором результирующий столбец (значение_2) учитывает количество записей, которые должны быть добавлены, на основе значения столбца ранжирования.

    import pyspark.sql.functions as f
    from pyspark.sql.window import Window

    l =[( 9    , 1,  'A' ),
        ( 9    , 2,  'B'  ),
        ( 9    , 3,  'C'  ),
        ( 9    , 4,  'D'  ),
        ( 10   , 1,  'A'  ),
        ( 10   , 2,  'B' )]

    df = spark.createDataFrame(l, ['prod','rank', 'value'])

+----+----+-----+
|prod|rank|value|
+----+----+-----+
|   9|   1|    A|
|   9|   2|    B|
|   9|   3|    C|
|   9|   4|    D|
|  10|   1|    A|
|  10|   2|    B|
+----+----+-----+


    sh =[( 9    , ['A','B','C','D'] ),
        ( 10   , ['A','B'])]

    sh = spark.createDataFrame(sh, ['prod', 'conc'])

    +----+------------+
    |prod|       value|
    +----+------------+
    |   9|[A, B, C, D]|
    |  10|      [A, B]|
    +----+------------+

Конечный желаемый результат:

+----+----+-----+---------+
|prod|rank|value| value_2 |
+----+----+-----+---------+
|   9|   1|    A|  A      |
|   9|   2|    B|  A,B    |
|   9|   3|    C|  A,B,C  |
|   9|   4|    D|  A,B,C,D|
|  10|   1|    A|  A      | 
|  10|   2|    B|  A,B    |
+----+----+-----+---------+

1 Ответ

0 голосов
/ 18 декабря 2018

Вы можете использовать функцию Window и делать это до агрегирования;В spark 2.4 +

df.select('*', 
  f.array_join(
    f.collect_list(df.value).over(Window.partitionBy('prod').orderBy('rank')), 
    ','
  ).alias('value_2')
).show()
+----+----+-----+-------+
|prod|rank|value|value_2|
+----+----+-----+-------+
|   9|   1|    A|      A|
|   9|   2|    B|    A,B|
|   9|   3|    C|  A,B,C|
|   9|   4|    D|A,B,C,D|
|  10|   1|    A|      A|
|  10|   2|    B|    A,B|
+----+----+-----+-------+

Или, если вам не нужно объединять массив в виде строк:

df.select('*', 
  f.collect_list(df.value).over(Window.partitionBy('prod').orderBy('rank')).alias('value_2')
).show()
+----+----+-----+------------+
|prod|rank|value|     value_2|
+----+----+-----+------------+
|   9|   1|    A|         [A]|
|   9|   2|    B|      [A, B]|
|   9|   3|    C|   [A, B, C]|
|   9|   4|    D|[A, B, C, D]|
|  10|   1|    A|         [A]|
|  10|   2|    B|      [A, B]|
+----+----+-----+------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...