PySPARK UDF на withColumn для замены столбца - PullRequest
0 голосов
/ 21 октября 2019

Этот UDF написан для замены значения столбца переменной. Python 2.7;Spark 2.2.0

import pyspark.sql.functions as func

    def updateCol(col, st):
       return func.expr(col).replace(func.expr(col), func.expr(st))

  updateColUDF = func.udf(updateCol, StringType())

Переменные от L_1 до L_3 имеют обновленные столбцы для каждой строки. Вот как я это называю:

updatedDF = orig_df.withColumn("L1", updateColUDF("L1", func.format_string(L_1))). \
                withColumn("L2", updateColUDF("L2", func.format_string(L_2))). \
                withColumn("L3", updateColUDF("L3", 
                withColumn("NAME", func.format_string(name)). \
                withColumn("AGE", func.format_string(age)). \
                select("id", "ts", "L1", "L2", "L3",
                     "NAME", "AGE")

Ошибка:

return Column(sc._jvm.functions.expr(str))
AttributeError: 'NoneType' object has no attribute '_jvm'

Ответы [ 2 ]

1 голос
/ 21 октября 2019

Ошибка в том, что вы используете функции pyspark внутри udf. Также было бы очень полезно узнать содержание ваших переменных L1, L2 ...

Однако, если я понимаю, что вы хотите сделать правильно, вам не нужен udf. Я предполагаю, что L1, L2 и т. Д. Являются константами, верно? Если нет, дайте мне знать, чтобы настроить код соответствующим образом. Вот пример:

from pyspark import SparkConf
from pyspark.sql import SparkSession, functions as F


conf = SparkConf()
spark_session = SparkSession.builder \
    .config(conf=conf) \
    .appName('test') \
    .getOrCreate()

data = [{'L1': "test", 'L2': "data"}, {'L1': "other test", 'L2': "other data"}]
df = spark_session.createDataFrame(data)
df.show()

# +----------+----------+
# |        L1|        L2|
# +----------+----------+
# |      test|      data|
# |other test|other data|
# +----------+----------+

L1 = 'some other data'
updatedDF = df.withColumn(
    "L1",
    F.lit(L1)
)
updatedDF.show()
# +---------------+----------+
# |             L1|        L2|
# +---------------+----------+
# |some other data|      data|
# |some other data|other data|
# +---------------+----------+


# or if you need to replace the value in a more complex way
pattern = '\w+'
updatedDF = updatedDF.withColumn(
    "L1",
    F.regexp_replace(F.col("L1"), pattern, "testing replace")
)

updatedDF.show()
# +--------------------+----------+
# |                  L1|        L2|
# +--------------------+----------+
# |testing replace t...|      data|
# |testing replace t...|other data|
# +--------------------+----------+

# or even something more complicated:
# set L1 value to L2 column when L2 column equals to data, otherwise, just leave L2 as it is
updatedDF = df.withColumn(
    "L2",
    F.when(F.col('L2') == 'data', L1).otherwise(F.col('L2'))
)
updatedDF.show()

# +----------+---------------+
# |        L1|             L2|
# +----------+---------------+
# |      test|some other data|
# |other test|     other data|
# +----------+---------------+

Итак, ваш пример будет:

DF = orig_df.withColumn("L1", pyspark_func.lit(L_1))
...

Также, пожалуйста, убедитесь, что у вас есть активный сеанс зажигания до этот пункт

Надеюсь, это поможет.

Редактировать: Если L1, L2 и т. Д. Являются списками, то одним из вариантов является создание с ними фрейма данных и присоединение к исходному df. К сожалению, нам понадобятся индексы для объединения, и поскольку ваш фрейм данных довольно большой, я не думаю, что это очень эффективное решение. Мы также могли бы использовать трансляции и udf или трансляции и присоединяться.

Вот пример (как мне кажется, неоптимальный), как сделать объединение:

L1 = ['row 1 L1', 'row 2 L1']
L2 = ['row 1 L2', 'row 2 L2']

# create a df with indexes    
to_update_df = spark_session.createDataFrame([{"row_index": i, "L1": row[0], "L2": row[1]} for i, row in enumerate(zip(L1, L2))])

# add indexes to the initial df 
indexed_df = updatedDF.rdd.zipWithIndex().toDF()
indexed_df.show()
# +--------------------+---+
# | _1 | _2 |
# +--------------------+---+
# | [test, some other... | 0 |
# | [other test, othe... | 1 |
# +--------------------+---+

# bring the df back to its initial form
indexed_df = indexed_df.withColumn('row_number', F.col("_2"))\
    .withColumn('L1', F.col("_1").getItem('L1'))\
    .withColumn('L2', F.col("_1").getItem('L2')).\
    select('row_number', 'L1', 'L2')

indexed_df.show()
# +----------+----------+---------------+
# |row_number|        L1|             L2|
# +----------+----------+---------------+
# |         0|      test|some other data|
# |         1|other test|     other data|
# +----------+----------+---------------+

# join with your results and keep the updated columns
final_df = indexed_df.alias('initial_data').join(to_update_df.alias('other_data'), F.col('row_index')==F.col('row_number'), how='left')
final_df = final_df.select('initial_data.row_number', 'other_data.L1', 'other_data.L2')
final_df.show()

# +----------+--------+--------+
# |row_number|      L1|      L2|
# +----------+--------+--------+
# |         0|row 1 L1|row 1 L2|
# |         1|row 2 L1|row 2 L2|
# +----------+--------+--------+

Это ^ определенно может быть лучше вУсловия исполнения.

1 голос
/ 21 октября 2019

Попытка создать примерный фрейм данных и затем использовать функцию lit в PySpark.

Кажется, что работает нормально, это с помощью блокнота Databricks

Python2

...