PySpark - как работать со списком списков в виде столбца данных - PullRequest
0 голосов
/ 30 марта 2020

Мои исходные данные - это файл JSON, а одно из полей представляет собой список списков (я сгенерировал файл с помощью другого скрипта python; идея состояла в том, чтобы создать список кортежей, но результат был " преобразован "в список списков); У меня есть список значений, и для каждого из этих значений я хочу отфильтровать свой DF таким образом, чтобы получить все строки, которые внутри списка списков имеют это значение; позвольте мне привести простой пример:

JSON строка: {"id": "D1", "class": "WARRIOR", "archetype": "Pirate Warrior", "matches": 140000, "duration": 6.2, "turns": 7.5, "winrate": 58.0, "cards": [["DRG_024", 2], ["CS2_146", 1], ["EX1_409", 1]]}

значение: "CS2_146"

ожидаемый результат: все строки, содержащие "CS2_146" в качестве первый элемент одного из вложенных списков

Ответы [ 2 ]

1 голос
/ 30 марта 2020

Вы можете использовать функции array_contains, но у вас есть вложенный массив, поэтому сначала вам нужно использовать flatted для создания одного массива.

from pyspark.sql.types import *
from pyspark.sql.functions import *
a={"id": "D1", "class": "WARRIOR", "archetype": "Pirate Warrior", "matches": 140000, 
  "duration": 6.2, "turns": 7.5, "winrate": 58.0, "cards": [["DRG_024", 2], 
  ["CS2_146", 1], ["EX1_409", 1]]}
df=spark.createDataFrame([a]) 
df.withColumn("t",array_contains(flatten("cards"),"CS2_146")).where(col("t")=="true").show()
1 голос
/ 30 марта 2020

Поскольку у вас nested array, нам нужно explode массивов, а затем на основе значения index мы можем filter выводить записи.

Example:

df.printSchema()
#root
# |-- archetype: string (nullable = true)
# |-- cards: array (nullable = true)
# |    |-- element: array (containsNull = true)
# |    |    |-- element: string (containsNull = true)
# |-- class: string (nullable = true)
# |-- duration: double (nullable = true)
# |-- id: string (nullable = true)
# |-- matches: long (nullable = true)
# |-- turns: double (nullable = true)
# |-- winrate: double (nullable = true)

df.show(truncate=False)
#+--------------+------------------------------------------+-------+--------+---+-------+-----+-------+
#|archetype     |cards                                     |class  |duration|id |matches|turns|winrate|
#+--------------+------------------------------------------+-------+--------+---+-------+-----+-------+
#|Pirate Warrior|[[DRG_024, 2], [CS2_146, 1], [EX1_409, 1]]|WARRIOR|6.2     |D1 |140000 |7.5  |58.0   |
#+--------------+------------------------------------------+-------+--------+---+-------+-----+-------+

#first explode cards array then explode the nested array with position
#finally filter on pos=0 and cards_arr="CS2_146"

df.selectExpr("*","explode(cards)").\
selectExpr("*","posexplode(col) as (pos,cards_arr)").filter((col("pos") == 0) & (col("cards_arr") == "CS2_146")).show()
#+--------------+--------------------+-------+--------+---+-------+-----+-------+------------+---+---------+
#|     archetype|               cards|  class|duration| id|matches|turns|winrate|         col|pos|cards_arr|
#+--------------+--------------------+-------+--------+---+-------+-----+-------+------------+---+---------+
#|Pirate Warrior|[[DRG_024, 2], [C...|WARRIOR|     6.2| D1| 140000|  7.5|   58.0|[CS2_146, 1]|  0|  CS2_146|
#+--------------+--------------------+-------+--------+---+-------+-----+-------+------------+---+---------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...