Обновление
Для спарк 1.6 вам понадобится альтернативный подход. Один из способов сделать это без использования udf
или каких-либо Window
функций - создать второй временный DataFrame с собранными значениями и затем присоединить его обратно к исходному DataFrame.
Первая группа по и Dev_No
и Tested
и агрегирование с использованием concat_ws
и collect_list
. После агрегирования фильтруйте DataFrame только для протестированных устройств.
import pyspark.sql.functions as f
# create temporary DataFrame
df2 = df.groupBy('Dev_No', 'Tested')\
.agg(f.concat_ws(", ", f.collect_list('model')).alias('Tested_devices'))\
.where(f.col('Tested') == 'Y')
df2.show(truncate=False)
#+-------+------+------------------------------------------------------+
#|Dev_No |Tested|Tested_devices |
#+-------+------+------------------------------------------------------+
#|CTA16C5|Y |Android Devices |
#|4MY16A5|Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#+-------+------+------------------------------------------------------+
Теперь выполните левое соединение df
с df2
, используя оба столбца Dev_No
и Tested
в качестве ключей объединения:
df.join(df2, on=['Dev_No', 'Tested'], how='left')\
.select('Dev_No', 'model', 'Tested', 'Tested_devices')\
.show(truncate=False)
Цель использования select
в конце состоит в том, чтобы получить столбцы в том же порядке, что и исходный DataFrame для целей отображения. Вы можете удалить этот шаг, если захотите.
Это приведет к следующему выводу (тот же вывод, что и ниже (с concat_ws
):
#+-------+---------------+------+------------------------------------------------------+
#|Dev_No |model |Tested|Tested_devices |
#+-------+---------------+------+------------------------------------------------------+
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|CTA16C5|Hewlett Packard|N |null |
#|BTA16C5|Windows PC |N |null |
#|BTA16C5|SRL |N |null |
#|BTA16C5|Hewlett Packard|N |null |
#|CTA16C5|Android Devices|Y |Android Devices |
#|4MY16A5|Tablet |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Cable STB |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Smart Watch |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#+-------+---------------+------+------------------------------------------------------+
Оригинальный ответ : (Для более поздних версий Spark)
Этого можно добиться, используя два оператора pyspark.sql.functions.when()
- одно из них при вызове pyspark.sql.functions.collect_list()
над Window
, принимая преимущество в том, что значение по умолчанию null
не добавляется в список :
from pyspark.sql import Window
import pyspark.sql.functions as f
df.select(
"*",
f.when(
f.col("Tested") == "Y",
f.collect_list(
f.when(
f.col("Tested") == "Y",
f.col('model')
)
).over(Window.partitionBy("Dev_No"))
).alias("Tested_devices")
).show(truncate=False)
#+-------+---------------+------+--------------------------------------------------------+
#|Dev_No |model |Tested|Tested_devices |
#+-------+---------------+------+--------------------------------------------------------+
#|BTA16C5|Windows PC |N |null |
#|BTA16C5|SRL |N |null |
#|BTA16C5|Hewlett Packard|N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Other |N |null |
#|4MY16A5|Tablet |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Other |N |null |
#|4MY16A5|Cable STB |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Other |N |null |
#|4MY16A5|Windows PC |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Windows PC |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Smart Watch |Y |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|CTA16C5|Android Devices|Y |[Android Devices] |
#|CTA16C5|Hewlett Packard|N |null |
#+-------+---------------+------+--------------------------------------------------------+
Если вместо этого вы хотели, чтобы ваш вывод был точно таким же, как вы показали в своем вопросе - в виде строки значений, разделенных запятыми вместо списка и пустых строк вместо null
- вы могли бы слегка изменить это следующим образом:
Используйте pyspark.sql.functions.concat_ws
, чтобы объединить вывод collect_list
в строку. Я использую ", "
в качестве разделителя. Это эквивалентно выполнению ", ".join(some_list)
в python. Затем мы добавляем .otherwise(f.lit(""))
в конец внешнего вызова when()
, чтобы указать, что мы хотим вернуть буквальную пустую строку, если условие False
.
df.select(
"*",
f.when(
f.col("Tested") == "Y",
f.concat_ws(
", ",
f.collect_list(
f.when(
f.col("Tested") == "Y",
f.col('model')
)
).over(Window.partitionBy("Dev_No"))
)
).otherwise(f.lit("")).alias("Tested_devices")
).show(truncate=False)
#+-------+---------------+------+------------------------------------------------------+
#|Dev_No |model |Tested|Tested_devices |
#+-------+---------------+------+------------------------------------------------------+
#|BTA16C5|Windows PC |N | |
#|BTA16C5|SRL |N | |
#|BTA16C5|Hewlett Packard|N | |
#|4MY16A5|Other |N | |
#|4MY16A5|Other |N | |
#|4MY16A5|Tablet |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Other |N | |
#|4MY16A5|Cable STB |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Other |N | |
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Smart Watch |Y |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|CTA16C5|Android Devices|Y |Android Devices |
#|CTA16C5|Hewlett Packard|N | |
#+-------+---------------+------+------------------------------------------------------+
Используя синтаксис pyspark-sql
, первый приведенный выше пример эквивалентен:
df.registerTempTable("df")
query = """
SELECT *,
CASE
WHEN Tested = 'Y'
THEN COLLECT_LIST(
CASE
WHEN Tested = 'Y'
THEN model
END
) OVER (PARTITION BY Dev_No)
END AS Tested_devices
FROM df
"""
sqlCtx.sql(query).show(truncate=False)