Я пытаюсь применить разные функции к различным столбцам DataFrame в зависимости от условия. Когда я делаю это в al oop, fn1
успешно применяется на первой итерации. Но df
превращается в None
на второй итерации. Думаю, проблема в том, как я инициализирую df
в рамках al oop.
df = spark.createDataFrame([(10,4,2,3),(20,1,3,4),(30,7,4,5),(40,2,1,9)], schema=['id','metric_1','metric_2', 'metric_3'])
cols_info = [{'name':'metric_1','apply_func':'True','method':'fn1'},{'name':'metric_2','apply_func':'True','method':'fn2'}, {'name':'metric_3','apply_func':'True','method':'fn3'}]
def fn1(df, col):
return df.withColumn(col, F.pow(df[col], 2))
def fn2(df, col):
return df.withColumn(col, F.hash(df[col]))
def fn3(df, col):
return df.withColumn(col, F.log2(df[col]))
def process_data(df, columns):
for col in columns:
if col["apply_func"] == "True":
if column["method"] == "fn1":
df = fn1(df, col["name"])
if column["method"] == "fn2":
df = fn2(df, col["name"])
if column["method"] == "fn3":
df = fn3(df, col["name"])
return df
Как правильно применять такие преобразования с помощью Pyspark DataFrame API?