Программно выбрать столбцы из кадра данных с помощью udf - PullRequest
1 голос
/ 18 июня 2019

Я новичок в pyspark.Я пытаюсь извлечь столбцы данных, используя конфигурационный файл, который содержит UDF.Если я определяю столбец select как список на клиенте, он работает, но если я импортирую список из файла конфигурации, список столбцов имеет тип string.Есть ли альтернативный способ.

открытие spark-shell с помощью pyspark.

*******************************************************************
version 2.2.0
Using Python version 2.7.16 (default, Mar 18 2019 18:38:44)
SparkSession available as 'spark'

*******************************************************************


jsonDF = spark.read.json("/tmp/people.json")
jsonDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

jsonDF.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)


jsonCurDF = jsonDF.filter(jsonDF.age.isNotNull()).cache()

# Define the UDF

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s


# Selecting the columns from a list.

colSelList = ['age', 'name', squared_udf('age')]
jsonCurDF.select(colSelList).show()

+---+------+----------------+
|age|  name|squared_udf(age)|
+---+------+----------------+
| 30|  Andy|             900|
| 19|Justin|             361|
+---+------+----------------+

# If I use an external config file 

colSelListStr = ["age", "name" , "squared_udf('age')"]
jsonCurDF.select(colSelListStr).show()

Вышеприведенная команда не выполняется "не удается разрешить" `squared_udf ('age') '

Попытка регистрации функции, попыткаselectExpr и использование функции столбца.

В colSelList вызов udf переводится в тип столбца.

print colSelList[2]
Column<squared_udf(age)

print colSelListStr[2]
squared_udf('age')

print column(colSelListStr[2])
Column<squared_udf('age')

Что я здесь не так делаю? или есть альтернативное решение?

1 Ответ

0 голосов
/ 18 июня 2019

Это потому, что squared_age считается строкой, а не функцией, когда вы передаете ее из списка. Есть несколько способов сделать это, и вам не нужно импортировать UDF для этого. Предположим, это список, который вам нужно выбрать

enter image description here

прямая передача этого списка приведет к ошибке, поскольку squared_age не содержится в этом фрейме данных

enter image description here

поэтому сначала вы берете все столбцы существующего df в список на

existing_cols = df.columns

enter image description here

а ваши это те столбцы, которые вам нужны enter image description here

теперь возьмите пересечение обоих списков это даст вам общий список элементов

intersection = list(set(existing_cols) & set(col_list)) 

сейчас попробуй вот так

newDF= df.select(intersection).rdd.map(lambda x: (x["age"], x["name"], x["age"]*x["age"])).toDF(col_list)

, который даст вам это

enter image description here

надеюсь, это помогло.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...