Как «где» на основе последнего StructType списка - PullRequest
0 голосов
/ 07 июня 2019

Предположим, у меня есть DataFrame столбца списка StructType с именем 'arr', который можно описать следующим json,

{
  "otherAttribute": "blabla...",
  "arr": [
     {
        "domain": "books",
        "others": "blabla..."
     }
     {
        "domain": "music",
        "others": "blabla..."
     }
  ]
}
{
  "otherAttribute": "blabla...",
  "arr": [
     {
        "domain": "music",
        "others": "blabla..."
     }
     {
        "domain": "furniture",
        "others": "blabla..."
     }
  ]
}
... ...

Мы хотим отфильтровать записи так, чтобы последний тип StructType в «arr» имел атрибут «domain», являющийся «music». В приведенном выше примере нам нужно вести первую запись, но отбрасывать вторую запись. Нужна помощь в написании такого предложения "где".

1 Ответ

1 голос
/ 07 июня 2019

Ответ основан на следующих данных:

+---------------+----------------------------------------------+
|other_attribute|arr                                           |
+---------------+----------------------------------------------+
|first          |[[books, ...], [music, ...]]                  |
|second         |[[books, ...], [music, ...], [furniture, ...]]|
|third          |[[football, ...], [soccer, ...]]              |
+---------------+----------------------------------------------+

arr вот массив структур. Каждый элемент arr имеет атрибуты domain и others (здесь заполнено ...).

Подход API DataFrame (F is pyspark.sql.functions):

df.filter(
    F.col("arr")[F.size(F.col("arr")) - 1]["domain"] == "music"
)

Способ SQL:

SELECT 
  other_attribute,
  arr
FROM df
WHERE arr[size(arr) - 1]['domain'] = 'music'

Таблица вывода будет выглядеть так:

+---------------+----------------------------+
|other_attribute|arr                         |
+---------------+----------------------------+
|first          |[[books, ...], [music, ...]]|
+---------------+----------------------------+

Полный код (предлагается использовать консоль PySpark):

import pyspark.sql.types as T
import pyspark.sql.functions as F

schema = T.StructType()\
    .add("other_attribute", T.StringType())\
    .add("arr", T.ArrayType(
        T.StructType()
            .add("domain", T.StringType())
            .add("others", T.StringType())
        )
    )

df = spark.createDataFrame([
    ["first", [["books", "..."], ["music", "..."]]],
    ["second", [["books", "..."], ["music", "..."], ["furniture", "..."]]],
    ["third", [["football", "..."], ["soccer", "..."]]]
], schema)

filtered = df.filter(
    F.col("arr")[F.size(F.col("arr")) - 1]["domain"] == "music"
)

filtered.show(100, False)

df.createOrReplaceTempView("df")

filtered_with_sql = spark.sql("""
    SELECT 
      other_attribute,
      arr
    FROM df
    WHERE arr[size(arr) - 1]['domain'] = 'music'
""")

filtered_with_sql.show(100, False)
...