Создайте новый столбец DataFrame pyspark, объединив значения другого столбца на основе условного - PullRequest
0 голосов
/ 09 мая 2018

У меня есть фрейм данных в pyspark, как показано ниже

df.show()

+-------+--------------------+--------------------+
| Dev_No|               model|              Tested|
+-------+--------------------+--------------------+
|BTA16C5|          Windows PC|                   N|
|BTA16C5|                 SRL|                   N|
|BTA16C5|     Hewlett Packard|                   N|
|CTA16C5|     Android Devices|                   Y|
|CTA16C5|     Hewlett Packard|                   N|
|4MY16A5|               Other|                   N|
|4MY16A5|               Other|                   N|
|4MY16A5|              Tablet|                   Y|
|4MY16A5|               Other|                   N|
|4MY16A5|           Cable STB|                   Y|
|4MY16A5|               Other|                   N|
|4MY16A5|          Windows PC|                   Y|
|4MY16A5|          Windows PC|                   Y|
|4MY16A5|         Smart Watch|                   Y|
+-------+--------------------+--------------------+

Теперь, используя вышеупомянутый фрейм данных, я хочу создать нижеприведенный фрейм данных с newcolumn с именем Tested_devices и заполнить столбецсо значениями где для каждого Dev_No выберите model, где Tested равно Y и заполните все значения через запятую.

df1.show()

+-------+--------------------+--------------------+------------------------------------------------------+
| Dev_No|               model|              Tested|                                        Tested_devices|
+-------+--------------------+--------------------+------------------------------------------------------+
|BTA16C5|          Windows PC|                   N|                                                      |
|BTA16C5|                 SRL|                   N|                                                      |  
|BTA16C5|     Hewlett Packard|                   N|                                                      |
|CTA16C5|     Android Devices|                   Y|                                       Android Devices|
|CTA16C5|     Hewlett Packard|                   N|                                                      |      
|4MY16A5|               Other|                   N|                                                      |
|4MY16A5|               Other|                   N|                                                      |
|4MY16A5|              Tablet|                   Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch| 
|4MY16A5|               Other|                   N|                                                      |
|4MY16A5|           Cable STB|                   Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5|               Other|                   N|                                                      |
|4MY16A5|          Windows PC|                   Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5|          Windows PC|                   Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5|         Smart Watch|                   Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
+-------+--------------------+--------------------+------------------------------------------------------+

Я пытался что-то подобное ниже, чтобы выбрать Dev_No и model где Tested равно Y

a = df.select("Dev_No", "model"), when(df.Tested == 'Y')

Я не могу получить результат.Это дало мне ошибку ниже

TypeError: when() takes exactly 2 arguments (1 given)

Как мне добиться того, чего я хочу

Ответы [ 2 ]

0 голосов
/ 10 мая 2018

прокомментировал для ясности и объяснения

pyspark> 1.6

#window function to group by Dev_No
from pyspark.sql import Window
windowSpec = Window.partitionBy("Dev_No")

from pyspark.sql import functions as f
from pyspark.sql import types as t
#udf function to change the collected list to string and also to check if Tested column is Y or N
@f.udf(t.StringType())
def populatedUdfFunc(tested, list):
    if(tested == "Y"):
        return ", ".join(list)
    else:
        return ""
#collecting models when Tested is Y using window function defined above
df.withColumn("Tested_devices", populatedUdfFunc(f.col("Tested"), f.collect_list(f.when(f.col("Tested") == "Y", f.col("model")).otherwise(None)).over(windowSpec))).show(truncate=False)

, который должен дать вам

+-------+---------------+------+------------------------------------------------------+
|Dev_No |model          |Tested|Tested_devices                                        |
+-------+---------------+------+------------------------------------------------------+
|BTA16C5|Windows PC     |N     |                                                      |
|BTA16C5|SRL            |N     |                                                      |
|BTA16C5|Hewlett Packard|N     |                                                      |
|4MY16A5|Other          |N     |                                                      |
|4MY16A5|Other          |N     |                                                      |
|4MY16A5|Tablet         |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Other          |N     |                                                      |
|4MY16A5|Cable STB      |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Other          |N     |                                                      |
|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Smart Watch    |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|CTA16C5|Android Devices|Y     |Android Devices                                       |
|CTA16C5|Hewlett Packard|N     |                                                      |
+-------+---------------+------+------------------------------------------------------+

искра = 1,6

для pyspark 1.6, collect_list не будет работать с функцией window, и не определена функция collect_list, определенная в SqlContext . Так что вам придется обойтись без оконной функции и использовать HiveContext вместо SQLContext

from pyspark.sql import functions as f
from pyspark.sql import types as t
#udf function to change the collected list to string and also to check if Tested column is Y or N
def populatedUdfFunc(list):
    return ", ".join(list)

populateUdf = f.udf(populatedUdfFunc, t.StringType())

#collecting models when Tested is Y using window function defined above
tempdf = df.groupBy("Dev_No").agg(populateUdf(f.collect_list(f.when(f.col("Tested") == "Y", f.col("model")).otherwise(None))).alias("Tested_devices"))
df.join(
    tempdf,
    (df["Dev_No"] == tempdf["Dev_No"]) & (df["Tested"] == f.lit("Y")), "left").show(truncate=False)

Вы получите тот же вывод, что и выше

0 голосов
/ 09 мая 2018

Обновление

Для спарк 1.6 вам понадобится альтернативный подход. Один из способов сделать это без использования udf или каких-либо Window функций - создать второй временный DataFrame с собранными значениями и затем присоединить его обратно к исходному DataFrame.

Первая группа по и Dev_No и Tested и агрегирование с использованием concat_ws и collect_list. После агрегирования фильтруйте DataFrame только для протестированных устройств.

import pyspark.sql.functions as f

# create temporary DataFrame
df2 = df.groupBy('Dev_No', 'Tested')\
    .agg(f.concat_ws(", ", f.collect_list('model')).alias('Tested_devices'))\
    .where(f.col('Tested') == 'Y')

df2.show(truncate=False)
#+-------+------+------------------------------------------------------+
#|Dev_No |Tested|Tested_devices                                        |
#+-------+------+------------------------------------------------------+
#|CTA16C5|Y     |Android Devices                                       |
#|4MY16A5|Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#+-------+------+------------------------------------------------------+

Теперь выполните левое соединение df с df2, используя оба столбца Dev_No и Tested в качестве ключей объединения:

df.join(df2, on=['Dev_No', 'Tested'], how='left')\
    .select('Dev_No', 'model', 'Tested', 'Tested_devices')\
    .show(truncate=False)

Цель использования select в конце состоит в том, чтобы получить столбцы в том же порядке, что и исходный DataFrame для целей отображения. Вы можете удалить этот шаг, если захотите.

Это приведет к следующему выводу (тот же вывод, что и ниже (с concat_ws):

#+-------+---------------+------+------------------------------------------------------+
#|Dev_No |model          |Tested|Tested_devices                                        |
#+-------+---------------+------+------------------------------------------------------+
#|4MY16A5|Other          |N     |null                                                  |
#|4MY16A5|Other          |N     |null                                                  |
#|4MY16A5|Other          |N     |null                                                  |
#|4MY16A5|Other          |N     |null                                                  |
#|CTA16C5|Hewlett Packard|N     |null                                                  |
#|BTA16C5|Windows PC     |N     |null                                                  |
#|BTA16C5|SRL            |N     |null                                                  |
#|BTA16C5|Hewlett Packard|N     |null                                                  |
#|CTA16C5|Android Devices|Y     |Android Devices                                       |
#|4MY16A5|Tablet         |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Cable STB      |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Smart Watch    |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#+-------+---------------+------+------------------------------------------------------+

Оригинальный ответ : (Для более поздних версий Spark)

Этого можно добиться, используя два оператора pyspark.sql.functions.when() - одно из них при вызове pyspark.sql.functions.collect_list() над Window, принимая преимущество в том, что значение по умолчанию null не добавляется в список :

from pyspark.sql import Window
import pyspark.sql.functions as f

df.select(
    "*",
    f.when(
        f.col("Tested") == "Y",
        f.collect_list(
            f.when(
                f.col("Tested") == "Y",
                f.col('model')
            )
        ).over(Window.partitionBy("Dev_No"))
    ).alias("Tested_devices")
).show(truncate=False)
#+-------+---------------+------+--------------------------------------------------------+
#|Dev_No |model          |Tested|Tested_devices                                          |
#+-------+---------------+------+--------------------------------------------------------+
#|BTA16C5|Windows PC     |N     |null                                                    |
#|BTA16C5|SRL            |N     |null                                                    |
#|BTA16C5|Hewlett Packard|N     |null                                                    |
#|4MY16A5|Other          |N     |null                                                    |
#|4MY16A5|Other          |N     |null                                                    |
#|4MY16A5|Tablet         |Y     |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Other          |N     |null                                                    |
#|4MY16A5|Cable STB      |Y     |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Other          |N     |null                                                    |
#|4MY16A5|Windows PC     |Y     |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Windows PC     |Y     |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Smart Watch    |Y     |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|CTA16C5|Android Devices|Y     |[Android Devices]                                       |
#|CTA16C5|Hewlett Packard|N     |null                                                    |
#+-------+---------------+------+--------------------------------------------------------+

Если вместо этого вы хотели, чтобы ваш вывод был точно таким же, как вы показали в своем вопросе - в виде строки значений, разделенных запятыми вместо списка и пустых строк вместо null - вы могли бы слегка изменить это следующим образом:

Используйте pyspark.sql.functions.concat_ws, чтобы объединить вывод collect_list в строку. Я использую ", " в качестве разделителя. Это эквивалентно выполнению ", ".join(some_list) в python. Затем мы добавляем .otherwise(f.lit("")) в конец внешнего вызова when(), чтобы указать, что мы хотим вернуть буквальную пустую строку, если условие False.

df.select(
    "*",
    f.when(
        f.col("Tested") == "Y",
        f.concat_ws(
            ", ",
            f.collect_list(
                f.when(
                    f.col("Tested") == "Y",
                    f.col('model')
                )
            ).over(Window.partitionBy("Dev_No"))
        )
    ).otherwise(f.lit("")).alias("Tested_devices")
).show(truncate=False)
#+-------+---------------+------+------------------------------------------------------+
#|Dev_No |model          |Tested|Tested_devices                                        |
#+-------+---------------+------+------------------------------------------------------+
#|BTA16C5|Windows PC     |N     |                                                      |
#|BTA16C5|SRL            |N     |                                                      |
#|BTA16C5|Hewlett Packard|N     |                                                      |
#|4MY16A5|Other          |N     |                                                      |
#|4MY16A5|Other          |N     |                                                      |
#|4MY16A5|Tablet         |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Other          |N     |                                                      |
#|4MY16A5|Cable STB      |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Other          |N     |                                                      |
#|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Smart Watch    |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|CTA16C5|Android Devices|Y     |Android Devices                                       |
#|CTA16C5|Hewlett Packard|N     |                                                      |
#+-------+---------------+------+------------------------------------------------------+

Используя синтаксис pyspark-sql, первый приведенный выше пример эквивалентен:

df.registerTempTable("df")
query = """
 SELECT *, 
        CASE 
          WHEN Tested = 'Y' 
          THEN COLLECT_LIST(
            CASE 
              WHEN Tested = 'Y' 
              THEN model
            END
          ) OVER (PARTITION BY Dev_No) 
        END AS Tested_devices
   FROM df
"""
sqlCtx.sql(query).show(truncate=False)
...